Files
backend/src/nats/RunnerKV.ts

191 lines
6.3 KiB
TypeScript

import { KvEntry } from 'nats';
import { getConnection } from 'typeorm';
import { Runner } from '../models/entities/Runner';
import { TrackScan } from '../models/entities/TrackScan';
import NatsClient from './NatsClient';
const BUCKET = 'runner_state';
/**
* Cached runner state stored in NATS KV.
* Keyed by runner id. No TTL — entries are permanent until explicitly deleted.
*/
export interface RunnerKVEntry {
/** "Firstname Lastname" — middlename omitted. */
displayName: string;
/** Sum of all valid scan distances in metres. */
distance: number;
/** Unix seconds timestamp of the last valid scan. 0 if none. */
latestTimestamp: number;
}
/** Returned from getRunnerEntry — includes the KV revision for CAS updates. */
export interface RunnerKVResult {
entry: RunnerKVEntry;
revision: number;
}
async function getBucket() {
return NatsClient.getKV(BUCKET);
}
function entryKey(runnerId: number): string {
return `runner.${runnerId}`;
}
/**
* Returns the cached RunnerKVEntry + revision for the given runner id, or null on a miss.
* The revision is required for CAS (compare-and-swap) updates.
*/
export async function getRunnerEntry(runnerId: number): Promise<RunnerKVResult | null> {
const bucket = await getBucket();
let entry: KvEntry | null = null;
try {
entry = await bucket.get(entryKey(runnerId));
} catch {
return null;
}
if (!entry || entry.operation === 'DEL' || entry.operation === 'PURGE') {
return null;
}
return {
entry: JSON.parse(entry.string()) as RunnerKVEntry,
revision: entry.revision,
};
}
/**
* Writes a RunnerKVEntry for the given runner id.
* If revision is provided, performs a CAS update — returns false if the revision
* has changed (concurrent write), true on success.
* Without a revision, performs an unconditional put.
*/
export async function setRunnerEntry(runnerId: number, entry: RunnerKVEntry, revision?: number): Promise<boolean> {
const bucket = await getBucket();
try {
if (revision !== undefined) {
await bucket.update(entryKey(runnerId), JSON.stringify(entry), revision);
} else {
await bucket.put(entryKey(runnerId), JSON.stringify(entry));
}
return true;
} catch {
// CAS conflict — revision has changed
return false;
}
}
/**
* Removes the cached entry for the given runner id.
* Call on runner name update or when a scan's valid flag is changed via PUT /scans/:id.
*/
export async function deleteRunnerEntry(runnerId: number): Promise<void> {
const bucket = await getBucket();
try {
await bucket.delete(entryKey(runnerId));
} catch {
// Entry may not exist in KV yet — that's fine
}
}
/**
* DB fallback: loads a runner's display name, total valid distance, and latest valid
* scan timestamp from the database, writes the result to KV, and returns it.
*
* Called on any KV cache miss during the scan intake flow.
* Also handles the first-scan-ever case — latestTimestamp=0, distance=0.
*/
export async function warmRunner(runnerId: number): Promise<RunnerKVEntry> {
const connection = getConnection();
const runner = await connection.getRepository(Runner).findOne({ id: runnerId });
const displayName = runner ? `${runner.firstname} ${runner.lastname}` : 'Unknown Runner';
const distanceResult = await connection
.getRepository(TrackScan)
.createQueryBuilder('scan')
.select('COALESCE(SUM(track.distance), 0)', 'total')
.innerJoin('scan.track', 'track')
.where('scan.runner = :runnerId', { runnerId })
.andWhere('scan.valid = :valid', { valid: true })
.getRawOne();
const latestScan = await connection
.getRepository(TrackScan)
.findOne({
where: { runner: { id: runnerId }, valid: true },
order: { timestamp: 'DESC' },
});
const entry: RunnerKVEntry = {
displayName,
distance: parseInt(distanceResult?.total ?? '0', 10),
latestTimestamp: latestScan?.timestamp ?? 0,
};
await setRunnerEntry(runnerId, entry);
return entry;
}
/**
* Bulk cache prewarming: loads all runners from the database and populates the KV cache.
* Uses 3 efficient queries and parallel KV writes to minimize startup time.
*
* Call from loader during startup (if NATS_PREWARM=true) to eliminate DB reads on the hot
* path from the very first scan.
*/
export async function warmAll(): Promise<void> {
const connection = getConnection();
// Query 1: All runners
const runners = await connection
.getRepository(Runner)
.createQueryBuilder('runner')
.select(['runner.id', 'runner.firstname', 'runner.lastname'])
.getMany();
// Query 2: Total valid distance per runner
const distanceResults = await connection
.getRepository(TrackScan)
.createQueryBuilder('scan')
.select('scan.runner', 'runnerId')
.addSelect('COALESCE(SUM(track.distance), 0)', 'total')
.innerJoin('scan.track', 'track')
.where('scan.valid = :valid', { valid: true })
.groupBy('scan.runner')
.getRawMany();
// Query 3: Latest valid scan timestamp per runner
const latestResults = await connection
.getRepository(TrackScan)
.createQueryBuilder('scan')
.select('scan.runner', 'runnerId')
.addSelect('MAX(scan.timestamp)', 'latestTimestamp')
.where('scan.valid = :valid', { valid: true })
.groupBy('scan.runner')
.getRawMany();
// Build lookup maps
const distanceMap = new Map<number, number>();
distanceResults.forEach((row: any) => {
distanceMap.set(parseInt(row.runnerId, 10), parseInt(row.total, 10));
});
const latestMap = new Map<number, number>();
latestResults.forEach((row: any) => {
latestMap.set(parseInt(row.runnerId, 10), parseInt(row.latestTimestamp, 10));
});
// Write all entries in parallel
const writePromises = runners.map((runner) => {
const entry: RunnerKVEntry = {
displayName: `${runner.firstname} ${runner.lastname}`,
distance: distanceMap.get(runner.id) ?? 0,
latestTimestamp: latestMap.get(runner.id) ?? 0,
};
return setRunnerEntry(runner.id, entry);
});
await Promise.all(writePromises);
}