From b0c67598132deffce697f19c83bd4826420abe76 Mon Sep 17 00:00:00 2001 From: Nicolai Ort Date: Fri, 20 Feb 2026 19:36:23 +0100 Subject: [PATCH] feat(nats): Implement caching for card, runner, and station entries with improved key management --- src/nats/CardKV.ts | 67 ++++++++++++++++++++++ src/nats/RunnerKV.ts | 128 ++++++++++++++++++++++++++++++++++++++++++ src/nats/StationKV.ts | 53 +++++++++++------ 3 files changed, 231 insertions(+), 17 deletions(-) create mode 100644 src/nats/CardKV.ts create mode 100644 src/nats/RunnerKV.ts diff --git a/src/nats/CardKV.ts b/src/nats/CardKV.ts new file mode 100644 index 0000000..2a4f81f --- /dev/null +++ b/src/nats/CardKV.ts @@ -0,0 +1,67 @@ +import { KvEntry } from 'nats'; +import NatsClient from './NatsClient'; + +const BUCKET = 'card_state'; +/** 1 hour TTL in milliseconds — sliding window, reset on each access. */ +const TTL_MS = 60 * 60 * 1000; + +/** + * Cached card data stored in NATS KV. + * Keyed by the stripped card id (rawBarcode % 200000000000). + * TTL of 1 hour of inactivity — re-put on each access to slide the window. + */ +export interface CardKVEntry { + runnerId: number; + runnerDisplayName: string; + enabled: boolean; +} + +async function getBucket() { + return NatsClient.getKV(BUCKET, { ttl: TTL_MS }); +} + +function entryKey(cardId: number): string { + return `card.${cardId}`; +} + +/** + * Returns the cached CardKVEntry for the given stripped card id, or null on a miss. + * On a cache hit the entry is re-put with a fresh TTL to slide the inactivity window. + */ +export async function getCardEntry(cardId: number): Promise { + const bucket = await getBucket(); + let entry: KvEntry | null = null; + try { + entry = await bucket.get(entryKey(cardId)); + } catch { + return null; + } + if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') { + return null; + } + const value = JSON.parse(entry.string()) as CardKVEntry; + // Re-put to slide the TTL window + await bucket.put(entryKey(cardId), JSON.stringify(value)); + return value; +} + +/** + * Writes a CardKVEntry for the given stripped card id with a 1-hour TTL. + */ +export async function setCardEntry(cardId: number, entry: CardKVEntry): Promise { + const bucket = await getBucket(); + await bucket.put(entryKey(cardId), JSON.stringify(entry)); +} + +/** + * Removes the cached entry for the given stripped card id. + * Call on card update (runner reassignment, enable/disable change) or delete. + */ +export async function deleteCardEntry(cardId: number): Promise { + const bucket = await getBucket(); + try { + await bucket.delete(entryKey(cardId)); + } catch { + // Entry may not exist in KV yet — that's fine + } +} diff --git a/src/nats/RunnerKV.ts b/src/nats/RunnerKV.ts new file mode 100644 index 0000000..6ab9d62 --- /dev/null +++ b/src/nats/RunnerKV.ts @@ -0,0 +1,128 @@ +import { KvEntry } from 'nats'; +import { getConnection } from 'typeorm'; +import { Runner } from '../models/entities/Runner'; +import { TrackScan } from '../models/entities/TrackScan'; +import NatsClient from './NatsClient'; + +const BUCKET = 'runner_state'; + +/** + * Cached runner state stored in NATS KV. + * Keyed by runner id. No TTL — entries are permanent until explicitly deleted. + */ +export interface RunnerKVEntry { + /** "Firstname Lastname" — middlename omitted. */ + displayName: string; + /** Sum of all valid scan distances in metres. */ + totalDistance: number; + /** Unix seconds timestamp of the last valid scan. 0 if none. */ + latestTimestamp: number; +} + +/** Returned from getRunnerEntry — includes the KV revision for CAS updates. */ +export interface RunnerKVResult { + entry: RunnerKVEntry; + revision: number; +} + +async function getBucket() { + return NatsClient.getKV(BUCKET); +} + +function entryKey(runnerId: number): string { + return `runner.${runnerId}`; +} + +/** + * Returns the cached RunnerKVEntry + revision for the given runner id, or null on a miss. + * The revision is required for CAS (compare-and-swap) updates. + */ +export async function getRunnerEntry(runnerId: number): Promise { + const bucket = await getBucket(); + let entry: KvEntry | null = null; + try { + entry = await bucket.get(entryKey(runnerId)); + } catch { + return null; + } + if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') { + return null; + } + return { + entry: JSON.parse(entry.string()) as RunnerKVEntry, + revision: entry.revision, + }; +} + +/** + * Writes a RunnerKVEntry for the given runner id. + * If revision is provided, performs a CAS update — returns false if the revision + * has changed (concurrent write), true on success. + * Without a revision, performs an unconditional put. + */ +export async function setRunnerEntry(runnerId: number, entry: RunnerKVEntry, revision?: number): Promise { + const bucket = await getBucket(); + try { + if (revision !== undefined) { + await bucket.update(entryKey(runnerId), JSON.stringify(entry), revision); + } else { + await bucket.put(entryKey(runnerId), JSON.stringify(entry)); + } + return true; + } catch { + // CAS conflict — revision has changed + return false; + } +} + +/** + * Removes the cached entry for the given runner id. + * Call on runner name update or when a scan's valid flag is changed via PUT /scans/:id. + */ +export async function deleteRunnerEntry(runnerId: number): Promise { + const bucket = await getBucket(); + try { + await bucket.delete(entryKey(runnerId)); + } catch { + // Entry may not exist in KV yet — that's fine + } +} + +/** + * DB fallback: loads a runner's display name, total valid distance, and latest valid + * scan timestamp from the database, writes the result to KV, and returns it. + * + * Called on any KV cache miss during the scan intake flow. + * Also handles the first-scan-ever case — latestTimestamp=0, totalDistance=0. + */ +export async function warmRunner(runnerId: number): Promise { + const connection = getConnection(); + + const runner = await connection.getRepository(Runner).findOne({ id: runnerId }); + const displayName = runner ? `${runner.firstname} ${runner.lastname}` : 'Unknown Runner'; + + const distanceResult = await connection + .getRepository(TrackScan) + .createQueryBuilder('scan') + .select('COALESCE(SUM(track.distance), 0)', 'total') + .innerJoin('scan.track', 'track') + .where('scan.runner = :runnerId', { runnerId }) + .andWhere('scan.valid = :valid', { valid: true }) + .getRawOne(); + + const latestScan = await connection + .getRepository(TrackScan) + .findOne({ + where: { runner: { id: runnerId }, valid: true }, + order: { timestamp: 'DESC' }, + }); + + const entry: RunnerKVEntry = { + displayName, + totalDistance: parseInt(distanceResult?.total ?? '0', 10), + latestTimestamp: latestScan?.timestamp ?? 0, + }; + + await setRunnerEntry(runnerId, entry); + return entry; +} diff --git a/src/nats/StationKV.ts b/src/nats/StationKV.ts index 439ba69..e6ff7af 100644 --- a/src/nats/StationKV.ts +++ b/src/nats/StationKV.ts @@ -25,45 +25,64 @@ async function getBucket() { return NatsClient.getKV(BUCKET); } -function entryKey(prefix: string): string { - return `station.${prefix}`; +function prefixKey(prefix: string): string { + return `station.prefix.${prefix}`; } -/** - * Returns the cached StationKVEntry for the given prefix, or null on a cache miss. - */ -export async function getStationEntry(prefix: string): Promise { +function idKey(id: number): string { + return `station.id.${id}`; +} + +async function getEntry(key: string): Promise { const bucket = await getBucket(); - let entry: KvEntry | null = null; + let raw: KvEntry | null = null; try { - entry = await bucket.get(entryKey(prefix)); + raw = await bucket.get(key); } catch { return null; } - if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') { + if (!raw || raw.operation === 'DEL' || raw.operation === 'PURGE') { return null; } - return JSON.parse(entry.string()) as StationKVEntry; + return JSON.parse(raw.string()) as StationKVEntry; } /** - * Writes a StationKVEntry for the given prefix. + * Returns the cached StationKVEntry for the given token prefix, or null on a cache miss. + */ +export async function getStationEntry(prefix: string): Promise { + return getEntry(prefixKey(prefix)); +} + +/** + * Returns the cached StationKVEntry for the given station DB id, or null on a cache miss. + * Used by the intake flow where only stationId is available after ScanAuth. + */ +export async function getStationEntryById(id: number): Promise { + return getEntry(idKey(id)); +} + +/** + * Writes a StationKVEntry under both the prefix key and the id key. * No TTL — entries are permanent until explicitly deleted. */ export async function setStationEntry(prefix: string, entry: StationKVEntry): Promise { const bucket = await getBucket(); - await bucket.put(entryKey(prefix), JSON.stringify(entry)); + const serialised = JSON.stringify(entry); + await bucket.put(prefixKey(prefix), serialised); + await bucket.put(idKey(entry.id), serialised); } /** - * Removes the cached entry for the given prefix. + * Removes the cached entries for the given prefix (and its id mirror). * Call this on station update or delete so the next request re-fetches from DB. */ export async function deleteStationEntry(prefix: string): Promise { const bucket = await getBucket(); - try { - await bucket.delete(entryKey(prefix)); - } catch { - // Entry may not exist in KV yet — that's fine + // Fetch the entry first so we can also delete the id-keyed mirror + const entry = await getEntry(prefixKey(prefix)); + try { await bucket.delete(prefixKey(prefix)); } catch { /* not cached yet */ } + if (entry) { + try { await bucket.delete(idKey(entry.id)); } catch { /* not cached yet */ } } }