feat(data): Added nats jetstream dependency

This commit is contained in:
2026-02-20 19:15:35 +01:00
parent 3584b3facf
commit bbf6ea6c0f
9 changed files with 489 additions and 18 deletions

View File

@@ -8,4 +8,6 @@ DB_NAME=./test.sqlite
NODE_ENV=production
POSTALCODE_COUNTRYCODE=DE
SEED_TEST_DATA=false
SELFSERVICE_URL=bla
SELFSERVICE_URL=bla
STATION_TOKEN_SECRET=<replace-with-random-secret-min-32-chars>
NATS_URL=nats://localhost:4222

View File

@@ -1,21 +1,30 @@
services:
backend_server:
build: .
nats:
image: mirror.gcr.io/library/nats:alpine
command: ["--jetstream", "--store_dir", "/data"]
ports:
- 4010:4010
environment:
APP_PORT: 4010
DB_TYPE: sqlite
DB_HOST: bla
DB_PORT: bla
DB_USER: bla
DB_PASSWORD: bla
DB_NAME: ./db.sqlite
NODE_ENV: production
POSTALCODE_COUNTRYCODE: DE
SEED_TEST_DATA: "true"
MAILER_URL: https://dev.lauf-fuer-kaya.de/mailer
MAILER_KEY: asdasd
- "4222:4222"
- "8222:8222"
volumes:
- nats_data:/data
# backend_server:
# build: .
# ports:
# - 4010:4010
# environment:
# APP_PORT: 4010
# DB_TYPE: sqlite
# DB_HOST: bla
# DB_PORT: bla
# DB_USER: bla
# DB_PASSWORD: bla
# DB_NAME: ./db.sqlite
# NODE_ENV: production
# POSTALCODE_COUNTRYCODE: DE
# SEED_TEST_DATA: "true"
# MAILER_URL: https://dev.lauf-fuer-kaya.de/mailer
# MAILER_KEY: asdasd
# APP_PORT: 4010
# DB_TYPE: postgres
# DB_HOST: backend_db
@@ -32,3 +41,6 @@ services:
# POSTGRES_USER: lfk
# ports:
# - 5432:5432
volumes:
nats_data:

View File

@@ -39,6 +39,7 @@
"jsonwebtoken": "8.5.1",
"libphonenumber-js": "1.9.9",
"mysql": "2.18.1",
"nats": "^2.29.3",
"pg": "8.5.1",
"reflect-metadata": "0.1.13",
"routing-controllers": "0.9.0-alpha.6",
@@ -81,6 +82,7 @@
"test:ci:generate_env": "ts-node scripts/create_testenv.ts",
"test:ci:run": "start-server-and-test dev http://localhost:4010/api/docs/openapi.json test",
"test:ci": "npm run test:ci:generate_env && npm run test:ci:run",
"benchmark": "ts-node scripts/benchmark_scan_intake.ts",
"seed": "ts-node ./node_modules/typeorm/cli.js schema:sync && ts-node ./node_modules/typeorm-seeding/dist/cli.js seed",
"openapi:export": "ts-node scripts/openapi_export.ts",
"licenses:export": "license-exporter --markdown",

26
pnpm-lock.yaml generated
View File

@@ -59,6 +59,9 @@ importers:
mysql:
specifier: 2.18.1
version: 2.18.1
nats:
specifier: ^2.29.3
version: 2.29.3
pg:
specifier: 8.5.1
version: 8.5.1
@@ -2451,7 +2454,7 @@ packages:
koa-router@7.4.0:
resolution: {integrity: sha512-IWhaDXeAnfDBEpWS6hkGdZ1ablgr6Q6pGdXCyK38RbzuH4LkUOpPqPw+3f8l8aTDrQmBQ7xJc0bs2yV4dzcO+g==}
engines: {node: '>= 4'}
deprecated: '**IMPORTANT 10x+ PERFORMANCE UPGRADE**: Please upgrade to v12.0.1+ as we have fixed an issue with debuglog causing 10x slower router benchmark performance, see https://github.com/koajs/router/pull/173'
deprecated: 'Please use @koa/router instead, starting from v9! '
koa@2.16.0:
resolution: {integrity: sha512-Afhqq0Vq3W7C+/rW6IqHVBDLzqObwZ07JaUNUEF8yCQ6afiyFE3RAy+i7V0E46XOWlH7vPWn/x0vsZwNy6PWxw==}
@@ -2746,6 +2749,10 @@ packages:
napi-build-utils@2.0.0:
resolution: {integrity: sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==}
nats@2.29.3:
resolution: {integrity: sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA==}
engines: {node: '>= 14.0.0'}
natural-compare@1.4.0:
resolution: {integrity: sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==}
@@ -2763,6 +2770,10 @@ packages:
nice-try@1.0.5:
resolution: {integrity: sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==}
nkeys.js@1.1.0:
resolution: {integrity: sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==}
engines: {node: '>=10.0.0'}
node-abi@3.74.0:
resolution: {integrity: sha512-c5XK0MjkGBrQPGYG24GBADZud0NCbznxNx0ZkS+ebUTrmV1qTDxPxSL8zEAPURXSbLRWVexxmP4986BziahL5w==}
engines: {node: '>=10'}
@@ -3858,6 +3869,9 @@ packages:
tunnel-agent@0.6.0:
resolution: {integrity: sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==}
tweetnacl@1.0.3:
resolution: {integrity: sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==}
type-detect@4.0.8:
resolution: {integrity: sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==}
engines: {node: '>=4'}
@@ -7486,6 +7500,10 @@ snapshots:
napi-build-utils@2.0.0: {}
nats@2.29.3:
dependencies:
nkeys.js: 1.1.0
natural-compare@1.4.0: {}
negotiator@0.6.3: {}
@@ -7497,6 +7515,10 @@ snapshots:
nice-try@1.0.5: {}
nkeys.js@1.1.0:
dependencies:
tweetnacl: 1.0.3
node-abi@3.74.0:
dependencies:
semver: 7.7.1
@@ -8721,6 +8743,8 @@ snapshots:
dependencies:
safe-buffer: 5.2.1
tweetnacl@1.0.3: {}
type-detect@4.0.8: {}
type-fest@0.21.3: {}

View File

@@ -0,0 +1,367 @@
/**
* Scan Intake Benchmark Script
*
* Measures TrackScan creation performance before and after each optimisation phase.
* Run against a live dev server: npm run dev
*
* Usage:
* npx ts-node scripts/benchmark_scan_intake.ts
* npx ts-node scripts/benchmark_scan_intake.ts --base http://localhost:4010
*
* What it measures:
* 1. Single sequential scans — baseline latency per request (p50/p95/p99/max)
* 2. Parallel scans (10 stations) — simulates 10 concurrent stations each submitting
* one scan at a time at the expected event rate
* (~1 scan/3s per station = ~3.3 scans/s total)
*
* The script self-provisions all required data (org, runners, cards, track, stations)
* and cleans up after itself. It authenticates via the station token, matching the
* real production auth path exactly.
*
* Output is printed to stdout in a copy-paste-friendly table format so results can
* be compared across phases.
*/
import axios, { AxiosInstance } from 'axios';
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
const BASE = (() => {
const idx = process.argv.indexOf('--base');
return idx !== -1 ? process.argv[idx + 1] : 'http://localhost:4010';
})();
const API = `${BASE}/api`;
// Number of simulated scan stations
const STATION_COUNT = 10;
// Sequential benchmark: total number of scans to send, one at a time
const SEQUENTIAL_SCAN_COUNT = 50;
// Parallel benchmark: number of rounds. Each round fires STATION_COUNT scans concurrently.
// 20 rounds × 10 stations = 200 total scans, matching the expected event throughput pattern.
const PARALLEL_ROUNDS = 20;
// Minimum lap time on the test track (seconds). Set low so most scans are valid.
// The benchmark measures submission speed, not business logic.
const TRACK_MINIMUM_LAP_TIME = 1;
// Track distance (metres)
const TRACK_DISTANCE = 400;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
interface StationHandle {
id: number;
key: string; // cleartext token, used as Bearer token
cardCode: number; // EAN-13 barcode of the card assigned to this station's runner
axiosInstance: AxiosInstance;
}
interface Percentiles {
p50: number;
p95: number;
p99: number;
max: number;
min: number;
mean: number;
}
interface BenchmarkResult {
label: string;
totalScans: number;
totalTimeMs: number;
scansPerSecond: number;
latencies: Percentiles;
errors: number;
}
// ---------------------------------------------------------------------------
// HTTP helpers
// ---------------------------------------------------------------------------
const adminClient = axios.create({
baseURL: API,
validateStatus: () => true,
});
async function adminLogin(): Promise<string> {
const res = await adminClient.post('/auth/login', { username: 'demo', password: 'demo' });
if (res.status !== 200) {
throw new Error(`Login failed: ${res.status} ${JSON.stringify(res.data)}`);
}
return res.data.access_token;
}
function authedClient(token: string): AxiosInstance {
return axios.create({
baseURL: API,
validateStatus: () => true,
headers: { authorization: `Bearer ${token}` },
});
}
// ---------------------------------------------------------------------------
// Data provisioning
// ---------------------------------------------------------------------------
async function provision(adminToken: string): Promise<{
stations: StationHandle[];
trackId: number;
orgId: number;
cleanup: () => Promise<void>;
}> {
const client = authedClient(adminToken);
const createdIds: { type: string; id: number }[] = [];
const create = async (path: string, body: object): Promise<any> => {
const res = await client.post(path, body);
if (res.status !== 200) {
throw new Error(`POST ${path} failed: ${res.status} ${JSON.stringify(res.data)}`);
}
return res.data;
};
process.stdout.write('Provisioning test data... ');
// Organisation
const org = await create('/organizations', { name: 'benchmark-org' });
createdIds.push({ type: 'organizations', id: org.id });
// Track with a low minimumLapTime so re-scans within the benchmark are mostly valid
const track = await create('/tracks', {
name: 'benchmark-track',
distance: TRACK_DISTANCE,
minimumLapTime: TRACK_MINIMUM_LAP_TIME,
});
createdIds.push({ type: 'tracks', id: track.id });
// One runner + card + station per simulated scan station
const stations: StationHandle[] = [];
for (let i = 0; i < STATION_COUNT; i++) {
const runner = await create('/runners', {
firstname: `Bench`,
lastname: `Runner${i}`,
group: org.id,
});
createdIds.push({ type: 'runners', id: runner.id });
const card = await create('/cards', { runner: runner.id });
createdIds.push({ type: 'cards', id: card.id });
const station = await create('/stations', {
track: track.id,
description: `bench-station-${i}`,
});
createdIds.push({ type: 'stations', id: station.id });
stations.push({
id: station.id,
key: station.key,
cardCode: card.id, // the test spec uses card.id directly as the barcode value
axiosInstance: axios.create({
baseURL: API,
validateStatus: () => true,
headers: { authorization: `Bearer ${station.key}` },
}),
});
}
console.log(`done. (${STATION_COUNT} stations, ${STATION_COUNT} runners, ${STATION_COUNT} cards)`);
const cleanup = async () => {
process.stdout.write('Cleaning up test data... ');
// Delete in reverse-dependency order
for (const item of [...createdIds].reverse()) {
await client.delete(`/${item.type}/${item.id}?force=true`);
}
console.log('done.');
};
return { stations, trackId: track.id, orgId: org.id, cleanup };
}
// ---------------------------------------------------------------------------
// Single scan submission (returns latency in ms)
// ---------------------------------------------------------------------------
async function submitScan(station: StationHandle): Promise<{ latencyMs: number; ok: boolean }> {
const start = performance.now();
const res = await station.axiosInstance.post('/scans/trackscans', {
card: station.cardCode,
station: station.id,
});
const latencyMs = performance.now() - start;
const ok = res.status === 200;
return { latencyMs, ok };
}
// ---------------------------------------------------------------------------
// Statistics
// ---------------------------------------------------------------------------
function percentiles(latencies: number[]): Percentiles {
const sorted = [...latencies].sort((a, b) => a - b);
const at = (pct: number) => sorted[Math.floor((pct / 100) * sorted.length)] ?? sorted[sorted.length - 1];
const mean = sorted.reduce((s, v) => s + v, 0) / sorted.length;
return {
p50: Math.round(at(50)),
p95: Math.round(at(95)),
p99: Math.round(at(99)),
max: Math.round(sorted[sorted.length - 1]),
min: Math.round(sorted[0]),
mean: Math.round(mean),
};
}
// ---------------------------------------------------------------------------
// Benchmark 1 — Sequential (single station, one scan at a time)
// ---------------------------------------------------------------------------
async function benchmarkSequential(station: StationHandle): Promise<BenchmarkResult> {
const latencies: number[] = [];
let errors = 0;
process.stdout.write(` Running ${SEQUENTIAL_SCAN_COUNT} sequential scans`);
const wallStart = performance.now();
for (let i = 0; i < SEQUENTIAL_SCAN_COUNT; i++) {
const { latencyMs, ok } = await submitScan(station);
latencies.push(latencyMs);
if (!ok) errors++;
if ((i + 1) % 10 === 0) process.stdout.write('.');
}
const totalTimeMs = performance.now() - wallStart;
console.log(' done.');
return {
label: 'Sequential (1 station)',
totalScans: SEQUENTIAL_SCAN_COUNT,
totalTimeMs,
scansPerSecond: (SEQUENTIAL_SCAN_COUNT / totalTimeMs) * 1000,
latencies: percentiles(latencies),
errors,
};
}
// ---------------------------------------------------------------------------
// Benchmark 2 — Parallel (10 stations, concurrent rounds)
//
// Models the real event scenario: every ~3 seconds each station submits one scan.
// We don't actually sleep between rounds — we fire each round as fast as the
// previous one completes, which gives us the worst-case sustained throughput
// (all stations submitting at maximum rate simultaneously).
// ---------------------------------------------------------------------------
async function benchmarkParallel(stations: StationHandle[]): Promise<BenchmarkResult> {
const latencies: number[] = [];
let errors = 0;
process.stdout.write(` Running ${PARALLEL_ROUNDS} rounds × ${STATION_COUNT} concurrent stations`);
const wallStart = performance.now();
for (let round = 0; round < PARALLEL_ROUNDS; round++) {
const results = await Promise.all(stations.map(s => submitScan(s)));
for (const { latencyMs, ok } of results) {
latencies.push(latencyMs);
if (!ok) errors++;
}
if ((round + 1) % 4 === 0) process.stdout.write('.');
}
const totalTimeMs = performance.now() - wallStart;
const totalScans = PARALLEL_ROUNDS * STATION_COUNT;
console.log(' done.');
return {
label: `Parallel (${STATION_COUNT} stations concurrent)`,
totalScans,
totalTimeMs,
scansPerSecond: (totalScans / totalTimeMs) * 1000,
latencies: percentiles(latencies),
errors,
};
}
// ---------------------------------------------------------------------------
// Output formatting
// ---------------------------------------------------------------------------
function printResult(result: BenchmarkResult) {
const { label, totalScans, totalTimeMs, scansPerSecond, latencies, errors } = result;
console.log(`\n ${label}`);
console.log(` ${'─'.repeat(52)}`);
console.log(` Total scans : ${totalScans}`);
console.log(` Total time : ${totalTimeMs.toFixed(0)} ms`);
console.log(` Throughput : ${scansPerSecond.toFixed(2)} scans/sec`);
console.log(` Latency min : ${latencies.min} ms`);
console.log(` Latency mean : ${latencies.mean} ms`);
console.log(` Latency p50 : ${latencies.p50} ms`);
console.log(` Latency p95 : ${latencies.p95} ms`);
console.log(` Latency p99 : ${latencies.p99} ms`);
console.log(` Latency max : ${latencies.max} ms`);
console.log(` Errors : ${errors}`);
}
function printSummary(results: BenchmarkResult[]) {
const now = new Date().toISOString();
console.log('\n');
console.log('═'.repeat(60));
console.log(` SCAN INTAKE BENCHMARK RESULTS — ${now}`);
console.log(` Server: ${BASE}`);
console.log('═'.repeat(60));
for (const r of results) {
printResult(r);
}
console.log('\n' + '═'.repeat(60));
console.log(' Copy the block above to compare across phases.');
console.log('═'.repeat(60) + '\n');
}
// ---------------------------------------------------------------------------
// Entry point
// ---------------------------------------------------------------------------
async function main() {
console.log(`\nScan Intake Benchmark — target: ${BASE}\n`);
let adminToken: string;
try {
adminToken = await adminLogin();
} catch (err) {
console.error(`Could not authenticate. Is the server running at ${BASE}?\n`, err.message);
process.exit(1);
}
const { stations, cleanup } = await provision(adminToken);
const results: BenchmarkResult[] = [];
try {
console.log('\nBenchmark 1 — Sequential');
results.push(await benchmarkSequential(stations[0]));
// Brief pause between benchmarks so the sequential scans don't skew
// the parallel benchmark's first-scan latency (minimumLapTime window)
await new Promise(r => setTimeout(r, (TRACK_MINIMUM_LAP_TIME + 1) * 1000));
console.log('\nBenchmark 2 — Parallel');
results.push(await benchmarkParallel(stations));
} finally {
await cleanup();
}
printSummary(results);
}
main().catch(err => {
console.error('Benchmark failed:', err);
process.exit(1);
});

View File

@@ -10,6 +10,7 @@ export const config = {
testing: process.env.NODE_ENV === "test",
jwt_secret: process.env.JWT_SECRET || "secretjwtsecret",
station_token_secret: process.env.STATION_TOKEN_SECRET || "",
nats_url: process.env.NATS_URL || "nats://localhost:4222",
phone_validation_countrycode: getPhoneCodeLocale(),
postalcode_validation_countrycode: getPostalCodeLocale(),
version: process.env.VERSION || require('../package.json').version,

View File

@@ -1,4 +1,5 @@
import { Application } from "express";
import NatsClient from "../nats/NatsClient";
import databaseLoader from "./database";
import expressLoader from "./express";
import openapiLoader from "./openapi";
@@ -9,6 +10,7 @@ import openapiLoader from "./openapi";
*/
export default async (app: Application) => {
await databaseLoader();
await NatsClient.connect();
await openapiLoader(app);
await expressLoader(app);
return app;

60
src/nats/NatsClient.ts Normal file
View File

@@ -0,0 +1,60 @@
import consola from 'consola';
import { connect, JetStreamClient, KV, KvOptions, NatsConnection } from 'nats';
import { config } from '../config';
/**
* Singleton NATS client.
* Call connect() once during app startup (after the DB loader).
* All other modules obtain the connection via getKV().
*/
class NatsClient {
private connection: NatsConnection | null = null;
private js: JetStreamClient | null = null;
private kvBuckets: Map<string, KV> = new Map();
/**
* Establishes the NATS connection and JetStream context.
* Must be called once before any KV operations.
*/
public async connect(): Promise<void> {
this.connection = await connect({ servers: config.nats_url });
this.js = this.connection.jetstream();
consola.success(`NATS connected to ${config.nats_url}`);
}
/**
* Returns a KV bucket by name, creating it if it doesn't exist yet.
* Results are cached — repeated calls with the same name return the same instance.
*/
public async getKV(bucketName: string, options?: Partial<KvOptions>): Promise<KV> {
if (this.kvBuckets.has(bucketName)) {
return this.kvBuckets.get(bucketName);
}
if (!this.js) {
throw new Error('NATS not connected. Call NatsClient.connect() first.');
}
const kv = await this.js.views.kv(bucketName, options);
this.kvBuckets.set(bucketName, kv);
return kv;
}
/**
* Gracefully closes the NATS connection.
* Call during app shutdown if needed.
*/
public async disconnect(): Promise<void> {
if (this.connection) {
await this.connection.drain();
this.connection = null;
this.js = null;
this.kvBuckets.clear();
consola.info('NATS disconnected.');
}
}
public isConnected(): boolean {
return this.connection !== null;
}
}
export default new NatsClient();

View File

@@ -2,6 +2,7 @@
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"skipLibCheck": true,
"rootDir": "./src",
"outDir": "./dist",
"esModuleInterop": true,