From bbf6ea6c0fdffa11dacdf4b9afb6160ce54e197d Mon Sep 17 00:00:00 2001 From: Nicolai Ort Date: Fri, 20 Feb 2026 19:15:35 +0100 Subject: [PATCH] feat(data): Added nats jetstream dependency --- .env.example | 4 +- docker-compose.yml | 44 ++-- package.json | 2 + pnpm-lock.yaml | 26 ++- scripts/benchmark_scan_intake.ts | 367 +++++++++++++++++++++++++++++++ src/config.ts | 1 + src/loaders/index.ts | 2 + src/nats/NatsClient.ts | 60 +++++ tsconfig.json | 1 + 9 files changed, 489 insertions(+), 18 deletions(-) create mode 100644 scripts/benchmark_scan_intake.ts create mode 100644 src/nats/NatsClient.ts diff --git a/.env.example b/.env.example index 5a0dec3..020126a 100644 --- a/.env.example +++ b/.env.example @@ -8,4 +8,6 @@ DB_NAME=./test.sqlite NODE_ENV=production POSTALCODE_COUNTRYCODE=DE SEED_TEST_DATA=false -SELFSERVICE_URL=bla \ No newline at end of file +SELFSERVICE_URL=bla +STATION_TOKEN_SECRET= +NATS_URL=nats://localhost:4222 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5927459..1d170fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,21 +1,30 @@ services: - backend_server: - build: . + nats: + image: mirror.gcr.io/library/nats:alpine + command: ["--jetstream", "--store_dir", "/data"] ports: - - 4010:4010 - environment: - APP_PORT: 4010 - DB_TYPE: sqlite - DB_HOST: bla - DB_PORT: bla - DB_USER: bla - DB_PASSWORD: bla - DB_NAME: ./db.sqlite - NODE_ENV: production - POSTALCODE_COUNTRYCODE: DE - SEED_TEST_DATA: "true" - MAILER_URL: https://dev.lauf-fuer-kaya.de/mailer - MAILER_KEY: asdasd + - "4222:4222" + - "8222:8222" + volumes: + - nats_data:/data + + # backend_server: + # build: . + # ports: + # - 4010:4010 + # environment: + # APP_PORT: 4010 + # DB_TYPE: sqlite + # DB_HOST: bla + # DB_PORT: bla + # DB_USER: bla + # DB_PASSWORD: bla + # DB_NAME: ./db.sqlite + # NODE_ENV: production + # POSTALCODE_COUNTRYCODE: DE + # SEED_TEST_DATA: "true" + # MAILER_URL: https://dev.lauf-fuer-kaya.de/mailer + # MAILER_KEY: asdasd # APP_PORT: 4010 # DB_TYPE: postgres # DB_HOST: backend_db @@ -32,3 +41,6 @@ services: # POSTGRES_USER: lfk # ports: # - 5432:5432 + +volumes: + nats_data: diff --git a/package.json b/package.json index 1b9e8de..39ee33c 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "jsonwebtoken": "8.5.1", "libphonenumber-js": "1.9.9", "mysql": "2.18.1", + "nats": "^2.29.3", "pg": "8.5.1", "reflect-metadata": "0.1.13", "routing-controllers": "0.9.0-alpha.6", @@ -81,6 +82,7 @@ "test:ci:generate_env": "ts-node scripts/create_testenv.ts", "test:ci:run": "start-server-and-test dev http://localhost:4010/api/docs/openapi.json test", "test:ci": "npm run test:ci:generate_env && npm run test:ci:run", + "benchmark": "ts-node scripts/benchmark_scan_intake.ts", "seed": "ts-node ./node_modules/typeorm/cli.js schema:sync && ts-node ./node_modules/typeorm-seeding/dist/cli.js seed", "openapi:export": "ts-node scripts/openapi_export.ts", "licenses:export": "license-exporter --markdown", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97e13a5..63b5ed7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,6 +59,9 @@ importers: mysql: specifier: 2.18.1 version: 2.18.1 + nats: + specifier: ^2.29.3 + version: 2.29.3 pg: specifier: 8.5.1 version: 8.5.1 @@ -2451,7 +2454,7 @@ packages: koa-router@7.4.0: resolution: {integrity: sha512-IWhaDXeAnfDBEpWS6hkGdZ1ablgr6Q6pGdXCyK38RbzuH4LkUOpPqPw+3f8l8aTDrQmBQ7xJc0bs2yV4dzcO+g==} engines: {node: '>= 4'} - deprecated: '**IMPORTANT 10x+ PERFORMANCE UPGRADE**: Please upgrade to v12.0.1+ as we have fixed an issue with debuglog causing 10x slower router benchmark performance, see https://github.com/koajs/router/pull/173' + deprecated: 'Please use @koa/router instead, starting from v9! ' koa@2.16.0: resolution: {integrity: sha512-Afhqq0Vq3W7C+/rW6IqHVBDLzqObwZ07JaUNUEF8yCQ6afiyFE3RAy+i7V0E46XOWlH7vPWn/x0vsZwNy6PWxw==} @@ -2746,6 +2749,10 @@ packages: napi-build-utils@2.0.0: resolution: {integrity: sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==} + nats@2.29.3: + resolution: {integrity: sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA==} + engines: {node: '>= 14.0.0'} + natural-compare@1.4.0: resolution: {integrity: sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==} @@ -2763,6 +2770,10 @@ packages: nice-try@1.0.5: resolution: {integrity: sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==} + nkeys.js@1.1.0: + resolution: {integrity: sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==} + engines: {node: '>=10.0.0'} + node-abi@3.74.0: resolution: {integrity: sha512-c5XK0MjkGBrQPGYG24GBADZud0NCbznxNx0ZkS+ebUTrmV1qTDxPxSL8zEAPURXSbLRWVexxmP4986BziahL5w==} engines: {node: '>=10'} @@ -3858,6 +3869,9 @@ packages: tunnel-agent@0.6.0: resolution: {integrity: sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==} + tweetnacl@1.0.3: + resolution: {integrity: sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==} + type-detect@4.0.8: resolution: {integrity: sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==} engines: {node: '>=4'} @@ -7486,6 +7500,10 @@ snapshots: napi-build-utils@2.0.0: {} + nats@2.29.3: + dependencies: + nkeys.js: 1.1.0 + natural-compare@1.4.0: {} negotiator@0.6.3: {} @@ -7497,6 +7515,10 @@ snapshots: nice-try@1.0.5: {} + nkeys.js@1.1.0: + dependencies: + tweetnacl: 1.0.3 + node-abi@3.74.0: dependencies: semver: 7.7.1 @@ -8721,6 +8743,8 @@ snapshots: dependencies: safe-buffer: 5.2.1 + tweetnacl@1.0.3: {} + type-detect@4.0.8: {} type-fest@0.21.3: {} diff --git a/scripts/benchmark_scan_intake.ts b/scripts/benchmark_scan_intake.ts new file mode 100644 index 0000000..7ca7055 --- /dev/null +++ b/scripts/benchmark_scan_intake.ts @@ -0,0 +1,367 @@ +/** + * Scan Intake Benchmark Script + * + * Measures TrackScan creation performance before and after each optimisation phase. + * Run against a live dev server: npm run dev + * + * Usage: + * npx ts-node scripts/benchmark_scan_intake.ts + * npx ts-node scripts/benchmark_scan_intake.ts --base http://localhost:4010 + * + * What it measures: + * 1. Single sequential scans — baseline latency per request (p50/p95/p99/max) + * 2. Parallel scans (10 stations) — simulates 10 concurrent stations each submitting + * one scan at a time at the expected event rate + * (~1 scan/3s per station = ~3.3 scans/s total) + * + * The script self-provisions all required data (org, runners, cards, track, stations) + * and cleans up after itself. It authenticates via the station token, matching the + * real production auth path exactly. + * + * Output is printed to stdout in a copy-paste-friendly table format so results can + * be compared across phases. + */ + +import axios, { AxiosInstance } from 'axios'; + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +const BASE = (() => { + const idx = process.argv.indexOf('--base'); + return idx !== -1 ? process.argv[idx + 1] : 'http://localhost:4010'; +})(); + +const API = `${BASE}/api`; + +// Number of simulated scan stations +const STATION_COUNT = 10; + +// Sequential benchmark: total number of scans to send, one at a time +const SEQUENTIAL_SCAN_COUNT = 50; + +// Parallel benchmark: number of rounds. Each round fires STATION_COUNT scans concurrently. +// 20 rounds × 10 stations = 200 total scans, matching the expected event throughput pattern. +const PARALLEL_ROUNDS = 20; + +// Minimum lap time on the test track (seconds). Set low so most scans are valid. +// The benchmark measures submission speed, not business logic. +const TRACK_MINIMUM_LAP_TIME = 1; + +// Track distance (metres) +const TRACK_DISTANCE = 400; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +interface StationHandle { + id: number; + key: string; // cleartext token, used as Bearer token + cardCode: number; // EAN-13 barcode of the card assigned to this station's runner + axiosInstance: AxiosInstance; +} + +interface Percentiles { + p50: number; + p95: number; + p99: number; + max: number; + min: number; + mean: number; +} + +interface BenchmarkResult { + label: string; + totalScans: number; + totalTimeMs: number; + scansPerSecond: number; + latencies: Percentiles; + errors: number; +} + +// --------------------------------------------------------------------------- +// HTTP helpers +// --------------------------------------------------------------------------- + +const adminClient = axios.create({ + baseURL: API, + validateStatus: () => true, +}); + +async function adminLogin(): Promise { + const res = await adminClient.post('/auth/login', { username: 'demo', password: 'demo' }); + if (res.status !== 200) { + throw new Error(`Login failed: ${res.status} ${JSON.stringify(res.data)}`); + } + return res.data.access_token; +} + +function authedClient(token: string): AxiosInstance { + return axios.create({ + baseURL: API, + validateStatus: () => true, + headers: { authorization: `Bearer ${token}` }, + }); +} + +// --------------------------------------------------------------------------- +// Data provisioning +// --------------------------------------------------------------------------- + +async function provision(adminToken: string): Promise<{ + stations: StationHandle[]; + trackId: number; + orgId: number; + cleanup: () => Promise; +}> { + const client = authedClient(adminToken); + const createdIds: { type: string; id: number }[] = []; + + const create = async (path: string, body: object): Promise => { + const res = await client.post(path, body); + if (res.status !== 200) { + throw new Error(`POST ${path} failed: ${res.status} ${JSON.stringify(res.data)}`); + } + return res.data; + }; + + process.stdout.write('Provisioning test data... '); + + // Organisation + const org = await create('/organizations', { name: 'benchmark-org' }); + createdIds.push({ type: 'organizations', id: org.id }); + + // Track with a low minimumLapTime so re-scans within the benchmark are mostly valid + const track = await create('/tracks', { + name: 'benchmark-track', + distance: TRACK_DISTANCE, + minimumLapTime: TRACK_MINIMUM_LAP_TIME, + }); + createdIds.push({ type: 'tracks', id: track.id }); + + // One runner + card + station per simulated scan station + const stations: StationHandle[] = []; + + for (let i = 0; i < STATION_COUNT; i++) { + const runner = await create('/runners', { + firstname: `Bench`, + lastname: `Runner${i}`, + group: org.id, + }); + createdIds.push({ type: 'runners', id: runner.id }); + + const card = await create('/cards', { runner: runner.id }); + createdIds.push({ type: 'cards', id: card.id }); + + const station = await create('/stations', { + track: track.id, + description: `bench-station-${i}`, + }); + createdIds.push({ type: 'stations', id: station.id }); + + stations.push({ + id: station.id, + key: station.key, + cardCode: card.id, // the test spec uses card.id directly as the barcode value + axiosInstance: axios.create({ + baseURL: API, + validateStatus: () => true, + headers: { authorization: `Bearer ${station.key}` }, + }), + }); + } + + console.log(`done. (${STATION_COUNT} stations, ${STATION_COUNT} runners, ${STATION_COUNT} cards)`); + + const cleanup = async () => { + process.stdout.write('Cleaning up test data... '); + // Delete in reverse-dependency order + for (const item of [...createdIds].reverse()) { + await client.delete(`/${item.type}/${item.id}?force=true`); + } + console.log('done.'); + }; + + return { stations, trackId: track.id, orgId: org.id, cleanup }; +} + +// --------------------------------------------------------------------------- +// Single scan submission (returns latency in ms) +// --------------------------------------------------------------------------- + +async function submitScan(station: StationHandle): Promise<{ latencyMs: number; ok: boolean }> { + const start = performance.now(); + const res = await station.axiosInstance.post('/scans/trackscans', { + card: station.cardCode, + station: station.id, + }); + const latencyMs = performance.now() - start; + const ok = res.status === 200; + return { latencyMs, ok }; +} + +// --------------------------------------------------------------------------- +// Statistics +// --------------------------------------------------------------------------- + +function percentiles(latencies: number[]): Percentiles { + const sorted = [...latencies].sort((a, b) => a - b); + const at = (pct: number) => sorted[Math.floor((pct / 100) * sorted.length)] ?? sorted[sorted.length - 1]; + const mean = sorted.reduce((s, v) => s + v, 0) / sorted.length; + return { + p50: Math.round(at(50)), + p95: Math.round(at(95)), + p99: Math.round(at(99)), + max: Math.round(sorted[sorted.length - 1]), + min: Math.round(sorted[0]), + mean: Math.round(mean), + }; +} + +// --------------------------------------------------------------------------- +// Benchmark 1 — Sequential (single station, one scan at a time) +// --------------------------------------------------------------------------- + +async function benchmarkSequential(station: StationHandle): Promise { + const latencies: number[] = []; + let errors = 0; + + process.stdout.write(` Running ${SEQUENTIAL_SCAN_COUNT} sequential scans`); + const wallStart = performance.now(); + + for (let i = 0; i < SEQUENTIAL_SCAN_COUNT; i++) { + const { latencyMs, ok } = await submitScan(station); + latencies.push(latencyMs); + if (!ok) errors++; + if ((i + 1) % 10 === 0) process.stdout.write('.'); + } + + const totalTimeMs = performance.now() - wallStart; + console.log(' done.'); + + return { + label: 'Sequential (1 station)', + totalScans: SEQUENTIAL_SCAN_COUNT, + totalTimeMs, + scansPerSecond: (SEQUENTIAL_SCAN_COUNT / totalTimeMs) * 1000, + latencies: percentiles(latencies), + errors, + }; +} + +// --------------------------------------------------------------------------- +// Benchmark 2 — Parallel (10 stations, concurrent rounds) +// +// Models the real event scenario: every ~3 seconds each station submits one scan. +// We don't actually sleep between rounds — we fire each round as fast as the +// previous one completes, which gives us the worst-case sustained throughput +// (all stations submitting at maximum rate simultaneously). +// --------------------------------------------------------------------------- + +async function benchmarkParallel(stations: StationHandle[]): Promise { + const latencies: number[] = []; + let errors = 0; + + process.stdout.write(` Running ${PARALLEL_ROUNDS} rounds × ${STATION_COUNT} concurrent stations`); + const wallStart = performance.now(); + + for (let round = 0; round < PARALLEL_ROUNDS; round++) { + const results = await Promise.all(stations.map(s => submitScan(s))); + for (const { latencyMs, ok } of results) { + latencies.push(latencyMs); + if (!ok) errors++; + } + if ((round + 1) % 4 === 0) process.stdout.write('.'); + } + + const totalTimeMs = performance.now() - wallStart; + const totalScans = PARALLEL_ROUNDS * STATION_COUNT; + console.log(' done.'); + + return { + label: `Parallel (${STATION_COUNT} stations concurrent)`, + totalScans, + totalTimeMs, + scansPerSecond: (totalScans / totalTimeMs) * 1000, + latencies: percentiles(latencies), + errors, + }; +} + +// --------------------------------------------------------------------------- +// Output formatting +// --------------------------------------------------------------------------- + +function printResult(result: BenchmarkResult) { + const { label, totalScans, totalTimeMs, scansPerSecond, latencies, errors } = result; + console.log(`\n ${label}`); + console.log(` ${'─'.repeat(52)}`); + console.log(` Total scans : ${totalScans}`); + console.log(` Total time : ${totalTimeMs.toFixed(0)} ms`); + console.log(` Throughput : ${scansPerSecond.toFixed(2)} scans/sec`); + console.log(` Latency min : ${latencies.min} ms`); + console.log(` Latency mean : ${latencies.mean} ms`); + console.log(` Latency p50 : ${latencies.p50} ms`); + console.log(` Latency p95 : ${latencies.p95} ms`); + console.log(` Latency p99 : ${latencies.p99} ms`); + console.log(` Latency max : ${latencies.max} ms`); + console.log(` Errors : ${errors}`); +} + +function printSummary(results: BenchmarkResult[]) { + const now = new Date().toISOString(); + console.log('\n'); + console.log('═'.repeat(60)); + console.log(` SCAN INTAKE BENCHMARK RESULTS — ${now}`); + console.log(` Server: ${BASE}`); + console.log('═'.repeat(60)); + for (const r of results) { + printResult(r); + } + console.log('\n' + '═'.repeat(60)); + console.log(' Copy the block above to compare across phases.'); + console.log('═'.repeat(60) + '\n'); +} + +// --------------------------------------------------------------------------- +// Entry point +// --------------------------------------------------------------------------- + +async function main() { + console.log(`\nScan Intake Benchmark — target: ${BASE}\n`); + + let adminToken: string; + try { + adminToken = await adminLogin(); + } catch (err) { + console.error(`Could not authenticate. Is the server running at ${BASE}?\n`, err.message); + process.exit(1); + } + + const { stations, cleanup } = await provision(adminToken); + + const results: BenchmarkResult[] = []; + + try { + console.log('\nBenchmark 1 — Sequential'); + results.push(await benchmarkSequential(stations[0])); + + // Brief pause between benchmarks so the sequential scans don't skew + // the parallel benchmark's first-scan latency (minimumLapTime window) + await new Promise(r => setTimeout(r, (TRACK_MINIMUM_LAP_TIME + 1) * 1000)); + + console.log('\nBenchmark 2 — Parallel'); + results.push(await benchmarkParallel(stations)); + } finally { + await cleanup(); + } + + printSummary(results); +} + +main().catch(err => { + console.error('Benchmark failed:', err); + process.exit(1); +}); diff --git a/src/config.ts b/src/config.ts index 3f4754d..9f348a1 100644 --- a/src/config.ts +++ b/src/config.ts @@ -10,6 +10,7 @@ export const config = { testing: process.env.NODE_ENV === "test", jwt_secret: process.env.JWT_SECRET || "secretjwtsecret", station_token_secret: process.env.STATION_TOKEN_SECRET || "", + nats_url: process.env.NATS_URL || "nats://localhost:4222", phone_validation_countrycode: getPhoneCodeLocale(), postalcode_validation_countrycode: getPostalCodeLocale(), version: process.env.VERSION || require('../package.json').version, diff --git a/src/loaders/index.ts b/src/loaders/index.ts index 3057cf3..5972028 100644 --- a/src/loaders/index.ts +++ b/src/loaders/index.ts @@ -1,4 +1,5 @@ import { Application } from "express"; +import NatsClient from "../nats/NatsClient"; import databaseLoader from "./database"; import expressLoader from "./express"; import openapiLoader from "./openapi"; @@ -9,6 +10,7 @@ import openapiLoader from "./openapi"; */ export default async (app: Application) => { await databaseLoader(); + await NatsClient.connect(); await openapiLoader(app); await expressLoader(app); return app; diff --git a/src/nats/NatsClient.ts b/src/nats/NatsClient.ts new file mode 100644 index 0000000..b7573f0 --- /dev/null +++ b/src/nats/NatsClient.ts @@ -0,0 +1,60 @@ +import consola from 'consola'; +import { connect, JetStreamClient, KV, KvOptions, NatsConnection } from 'nats'; +import { config } from '../config'; + +/** + * Singleton NATS client. + * Call connect() once during app startup (after the DB loader). + * All other modules obtain the connection via getKV(). + */ +class NatsClient { + private connection: NatsConnection | null = null; + private js: JetStreamClient | null = null; + private kvBuckets: Map = new Map(); + + /** + * Establishes the NATS connection and JetStream context. + * Must be called once before any KV operations. + */ + public async connect(): Promise { + this.connection = await connect({ servers: config.nats_url }); + this.js = this.connection.jetstream(); + consola.success(`NATS connected to ${config.nats_url}`); + } + + /** + * Returns a KV bucket by name, creating it if it doesn't exist yet. + * Results are cached — repeated calls with the same name return the same instance. + */ + public async getKV(bucketName: string, options?: Partial): Promise { + if (this.kvBuckets.has(bucketName)) { + return this.kvBuckets.get(bucketName); + } + if (!this.js) { + throw new Error('NATS not connected. Call NatsClient.connect() first.'); + } + const kv = await this.js.views.kv(bucketName, options); + this.kvBuckets.set(bucketName, kv); + return kv; + } + + /** + * Gracefully closes the NATS connection. + * Call during app shutdown if needed. + */ + public async disconnect(): Promise { + if (this.connection) { + await this.connection.drain(); + this.connection = null; + this.js = null; + this.kvBuckets.clear(); + consola.info('NATS disconnected.'); + } + } + + public isConnected(): boolean { + return this.connection !== null; + } +} + +export default new NatsClient(); diff --git a/tsconfig.json b/tsconfig.json index b28ff11..2691754 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,6 +2,7 @@ "compilerOptions": { "target": "ES2020", "module": "commonjs", + "skipLibCheck": true, "rootDir": "./src", "outDir": "./dist", "esModuleInterop": true,