feat(nats): Implement caching for card, runner, and station entries with improved key management
This commit is contained in:
67
src/nats/CardKV.ts
Normal file
67
src/nats/CardKV.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { KvEntry } from 'nats';
|
||||
import NatsClient from './NatsClient';
|
||||
|
||||
const BUCKET = 'card_state';
|
||||
/** 1 hour TTL in milliseconds — sliding window, reset on each access. */
|
||||
const TTL_MS = 60 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Cached card data stored in NATS KV.
|
||||
* Keyed by the stripped card id (rawBarcode % 200000000000).
|
||||
* TTL of 1 hour of inactivity — re-put on each access to slide the window.
|
||||
*/
|
||||
export interface CardKVEntry {
|
||||
runnerId: number;
|
||||
runnerDisplayName: string;
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
async function getBucket() {
|
||||
return NatsClient.getKV(BUCKET, { ttl: TTL_MS });
|
||||
}
|
||||
|
||||
function entryKey(cardId: number): string {
|
||||
return `card.${cardId}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cached CardKVEntry for the given stripped card id, or null on a miss.
|
||||
* On a cache hit the entry is re-put with a fresh TTL to slide the inactivity window.
|
||||
*/
|
||||
export async function getCardEntry(cardId: number): Promise<CardKVEntry | null> {
|
||||
const bucket = await getBucket();
|
||||
let entry: KvEntry | null = null;
|
||||
try {
|
||||
entry = await bucket.get(entryKey(cardId));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') {
|
||||
return null;
|
||||
}
|
||||
const value = JSON.parse(entry.string()) as CardKVEntry;
|
||||
// Re-put to slide the TTL window
|
||||
await bucket.put(entryKey(cardId), JSON.stringify(value));
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a CardKVEntry for the given stripped card id with a 1-hour TTL.
|
||||
*/
|
||||
export async function setCardEntry(cardId: number, entry: CardKVEntry): Promise<void> {
|
||||
const bucket = await getBucket();
|
||||
await bucket.put(entryKey(cardId), JSON.stringify(entry));
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the cached entry for the given stripped card id.
|
||||
* Call on card update (runner reassignment, enable/disable change) or delete.
|
||||
*/
|
||||
export async function deleteCardEntry(cardId: number): Promise<void> {
|
||||
const bucket = await getBucket();
|
||||
try {
|
||||
await bucket.delete(entryKey(cardId));
|
||||
} catch {
|
||||
// Entry may not exist in KV yet — that's fine
|
||||
}
|
||||
}
|
||||
128
src/nats/RunnerKV.ts
Normal file
128
src/nats/RunnerKV.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { KvEntry } from 'nats';
|
||||
import { getConnection } from 'typeorm';
|
||||
import { Runner } from '../models/entities/Runner';
|
||||
import { TrackScan } from '../models/entities/TrackScan';
|
||||
import NatsClient from './NatsClient';
|
||||
|
||||
const BUCKET = 'runner_state';
|
||||
|
||||
/**
|
||||
* Cached runner state stored in NATS KV.
|
||||
* Keyed by runner id. No TTL — entries are permanent until explicitly deleted.
|
||||
*/
|
||||
export interface RunnerKVEntry {
|
||||
/** "Firstname Lastname" — middlename omitted. */
|
||||
displayName: string;
|
||||
/** Sum of all valid scan distances in metres. */
|
||||
totalDistance: number;
|
||||
/** Unix seconds timestamp of the last valid scan. 0 if none. */
|
||||
latestTimestamp: number;
|
||||
}
|
||||
|
||||
/** Returned from getRunnerEntry — includes the KV revision for CAS updates. */
|
||||
export interface RunnerKVResult {
|
||||
entry: RunnerKVEntry;
|
||||
revision: number;
|
||||
}
|
||||
|
||||
async function getBucket() {
|
||||
return NatsClient.getKV(BUCKET);
|
||||
}
|
||||
|
||||
function entryKey(runnerId: number): string {
|
||||
return `runner.${runnerId}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cached RunnerKVEntry + revision for the given runner id, or null on a miss.
|
||||
* The revision is required for CAS (compare-and-swap) updates.
|
||||
*/
|
||||
export async function getRunnerEntry(runnerId: number): Promise<RunnerKVResult | null> {
|
||||
const bucket = await getBucket();
|
||||
let entry: KvEntry | null = null;
|
||||
try {
|
||||
entry = await bucket.get(entryKey(runnerId));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
entry: JSON.parse(entry.string()) as RunnerKVEntry,
|
||||
revision: entry.revision,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a RunnerKVEntry for the given runner id.
|
||||
* If revision is provided, performs a CAS update — returns false if the revision
|
||||
* has changed (concurrent write), true on success.
|
||||
* Without a revision, performs an unconditional put.
|
||||
*/
|
||||
export async function setRunnerEntry(runnerId: number, entry: RunnerKVEntry, revision?: number): Promise<boolean> {
|
||||
const bucket = await getBucket();
|
||||
try {
|
||||
if (revision !== undefined) {
|
||||
await bucket.update(entryKey(runnerId), JSON.stringify(entry), revision);
|
||||
} else {
|
||||
await bucket.put(entryKey(runnerId), JSON.stringify(entry));
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
// CAS conflict — revision has changed
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the cached entry for the given runner id.
|
||||
* Call on runner name update or when a scan's valid flag is changed via PUT /scans/:id.
|
||||
*/
|
||||
export async function deleteRunnerEntry(runnerId: number): Promise<void> {
|
||||
const bucket = await getBucket();
|
||||
try {
|
||||
await bucket.delete(entryKey(runnerId));
|
||||
} catch {
|
||||
// Entry may not exist in KV yet — that's fine
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DB fallback: loads a runner's display name, total valid distance, and latest valid
|
||||
* scan timestamp from the database, writes the result to KV, and returns it.
|
||||
*
|
||||
* Called on any KV cache miss during the scan intake flow.
|
||||
* Also handles the first-scan-ever case — latestTimestamp=0, totalDistance=0.
|
||||
*/
|
||||
export async function warmRunner(runnerId: number): Promise<RunnerKVEntry> {
|
||||
const connection = getConnection();
|
||||
|
||||
const runner = await connection.getRepository(Runner).findOne({ id: runnerId });
|
||||
const displayName = runner ? `${runner.firstname} ${runner.lastname}` : 'Unknown Runner';
|
||||
|
||||
const distanceResult = await connection
|
||||
.getRepository(TrackScan)
|
||||
.createQueryBuilder('scan')
|
||||
.select('COALESCE(SUM(track.distance), 0)', 'total')
|
||||
.innerJoin('scan.track', 'track')
|
||||
.where('scan.runner = :runnerId', { runnerId })
|
||||
.andWhere('scan.valid = :valid', { valid: true })
|
||||
.getRawOne();
|
||||
|
||||
const latestScan = await connection
|
||||
.getRepository(TrackScan)
|
||||
.findOne({
|
||||
where: { runner: { id: runnerId }, valid: true },
|
||||
order: { timestamp: 'DESC' },
|
||||
});
|
||||
|
||||
const entry: RunnerKVEntry = {
|
||||
displayName,
|
||||
totalDistance: parseInt(distanceResult?.total ?? '0', 10),
|
||||
latestTimestamp: latestScan?.timestamp ?? 0,
|
||||
};
|
||||
|
||||
await setRunnerEntry(runnerId, entry);
|
||||
return entry;
|
||||
}
|
||||
@@ -25,45 +25,64 @@ async function getBucket() {
|
||||
return NatsClient.getKV(BUCKET);
|
||||
}
|
||||
|
||||
function entryKey(prefix: string): string {
|
||||
return `station.${prefix}`;
|
||||
function prefixKey(prefix: string): string {
|
||||
return `station.prefix.${prefix}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cached StationKVEntry for the given prefix, or null on a cache miss.
|
||||
*/
|
||||
export async function getStationEntry(prefix: string): Promise<StationKVEntry | null> {
|
||||
function idKey(id: number): string {
|
||||
return `station.id.${id}`;
|
||||
}
|
||||
|
||||
async function getEntry(key: string): Promise<StationKVEntry | null> {
|
||||
const bucket = await getBucket();
|
||||
let entry: KvEntry | null = null;
|
||||
let raw: KvEntry | null = null;
|
||||
try {
|
||||
entry = await bucket.get(entryKey(prefix));
|
||||
raw = await bucket.get(key);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') {
|
||||
if (!raw || raw.operation === 'DEL' || raw.operation === 'PURGE') {
|
||||
return null;
|
||||
}
|
||||
return JSON.parse(entry.string()) as StationKVEntry;
|
||||
return JSON.parse(raw.string()) as StationKVEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a StationKVEntry for the given prefix.
|
||||
* Returns the cached StationKVEntry for the given token prefix, or null on a cache miss.
|
||||
*/
|
||||
export async function getStationEntry(prefix: string): Promise<StationKVEntry | null> {
|
||||
return getEntry(prefixKey(prefix));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cached StationKVEntry for the given station DB id, or null on a cache miss.
|
||||
* Used by the intake flow where only stationId is available after ScanAuth.
|
||||
*/
|
||||
export async function getStationEntryById(id: number): Promise<StationKVEntry | null> {
|
||||
return getEntry(idKey(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a StationKVEntry under both the prefix key and the id key.
|
||||
* No TTL — entries are permanent until explicitly deleted.
|
||||
*/
|
||||
export async function setStationEntry(prefix: string, entry: StationKVEntry): Promise<void> {
|
||||
const bucket = await getBucket();
|
||||
await bucket.put(entryKey(prefix), JSON.stringify(entry));
|
||||
const serialised = JSON.stringify(entry);
|
||||
await bucket.put(prefixKey(prefix), serialised);
|
||||
await bucket.put(idKey(entry.id), serialised);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the cached entry for the given prefix.
|
||||
* Removes the cached entries for the given prefix (and its id mirror).
|
||||
* Call this on station update or delete so the next request re-fetches from DB.
|
||||
*/
|
||||
export async function deleteStationEntry(prefix: string): Promise<void> {
|
||||
const bucket = await getBucket();
|
||||
try {
|
||||
await bucket.delete(entryKey(prefix));
|
||||
} catch {
|
||||
// Entry may not exist in KV yet — that's fine
|
||||
// Fetch the entry first so we can also delete the id-keyed mirror
|
||||
const entry = await getEntry(prefixKey(prefix));
|
||||
try { await bucket.delete(prefixKey(prefix)); } catch { /* not cached yet */ }
|
||||
if (entry) {
|
||||
try { await bucket.delete(idKey(entry.id)); } catch { /* not cached yet */ }
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user