perf(nats): Implement bulk cache prewarming for runners to optimize startup performance
This commit is contained in:
@@ -3,7 +3,7 @@ import { config as configDotenv } from 'dotenv';
|
|||||||
import { CountryCode } from 'libphonenumber-js';
|
import { CountryCode } from 'libphonenumber-js';
|
||||||
import ValidatorJS from 'validator';
|
import ValidatorJS from 'validator';
|
||||||
|
|
||||||
configDotenv();
|
configDotenv();
|
||||||
export const config = {
|
export const config = {
|
||||||
internal_port: parseInt(process.env.APP_PORT) || 4010,
|
internal_port: parseInt(process.env.APP_PORT) || 4010,
|
||||||
development: process.env.NODE_ENV === "production",
|
development: process.env.NODE_ENV === "production",
|
||||||
@@ -11,16 +11,17 @@ export const config = {
|
|||||||
jwt_secret: process.env.JWT_SECRET || "secretjwtsecret",
|
jwt_secret: process.env.JWT_SECRET || "secretjwtsecret",
|
||||||
station_token_secret: process.env.STATION_TOKEN_SECRET || "",
|
station_token_secret: process.env.STATION_TOKEN_SECRET || "",
|
||||||
nats_url: process.env.NATS_URL || "nats://localhost:4222",
|
nats_url: process.env.NATS_URL || "nats://localhost:4222",
|
||||||
phone_validation_countrycode: getPhoneCodeLocale(),
|
nats_prewarm: process.env.NATS_PREWARM === "true",
|
||||||
postalcode_validation_countrycode: getPostalCodeLocale(),
|
phone_validation_countrycode: getPhoneCodeLocale(),
|
||||||
version: process.env.VERSION || require('../package.json').version,
|
postalcode_validation_countrycode: getPostalCodeLocale(),
|
||||||
seedTestData: getDataSeeding(),
|
version: process.env.VERSION || require('../package.json').version,
|
||||||
app_url: process.env.APP_URL || "http://localhost:8080",
|
seedTestData: getDataSeeding(),
|
||||||
privacy_url: process.env.PRIVACY_URL || "/privacy",
|
app_url: process.env.APP_URL || "http://localhost:8080",
|
||||||
imprint_url: process.env.IMPRINT_URL || "/imprint",
|
privacy_url: process.env.PRIVACY_URL || "/privacy",
|
||||||
mailer_url: process.env.MAILER_URL || "",
|
imprint_url: process.env.IMPRINT_URL || "/imprint",
|
||||||
mailer_key: process.env.MAILER_KEY || ""
|
mailer_url: process.env.MAILER_URL || "",
|
||||||
}
|
mailer_key: process.env.MAILER_KEY || ""
|
||||||
|
}
|
||||||
let errors = 0
|
let errors = 0
|
||||||
if (typeof config.internal_port !== "number") {
|
if (typeof config.internal_port !== "number") {
|
||||||
consola.error("Error: APP_PORT is not a number")
|
consola.error("Error: APP_PORT is not a number")
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
import { Application } from "express";
|
import { Application } from "express";
|
||||||
|
import consola from "consola";
|
||||||
|
import { config } from "../config";
|
||||||
import NatsClient from "../nats/NatsClient";
|
import NatsClient from "../nats/NatsClient";
|
||||||
|
import { warmAll } from "../nats/RunnerKV";
|
||||||
import databaseLoader from "./database";
|
import databaseLoader from "./database";
|
||||||
import expressLoader from "./express";
|
import expressLoader from "./express";
|
||||||
import openapiLoader from "./openapi";
|
import openapiLoader from "./openapi";
|
||||||
@@ -11,6 +14,15 @@ import openapiLoader from "./openapi";
|
|||||||
export default async (app: Application) => {
|
export default async (app: Application) => {
|
||||||
await databaseLoader();
|
await databaseLoader();
|
||||||
await NatsClient.connect();
|
await NatsClient.connect();
|
||||||
|
|
||||||
|
if (config.nats_prewarm) {
|
||||||
|
consola.info("Prewarming NATS runner cache...");
|
||||||
|
const startTime = Date.now();
|
||||||
|
await warmAll();
|
||||||
|
const duration = Date.now() - startTime;
|
||||||
|
consola.success(`NATS runner cache prewarmed in ${duration}ms`);
|
||||||
|
}
|
||||||
|
|
||||||
await openapiLoader(app);
|
await openapiLoader(app);
|
||||||
await expressLoader(app);
|
await expressLoader(app);
|
||||||
return app;
|
return app;
|
||||||
|
|||||||
@@ -126,3 +126,65 @@ export async function warmRunner(runnerId: number): Promise<RunnerKVEntry> {
|
|||||||
await setRunnerEntry(runnerId, entry);
|
await setRunnerEntry(runnerId, entry);
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bulk cache prewarming: loads all runners from the database and populates the KV cache.
|
||||||
|
* Uses 3 efficient queries and parallel KV writes to minimize startup time.
|
||||||
|
*
|
||||||
|
* Call from loader during startup (if NATS_PREWARM=true) to eliminate DB reads on the hot
|
||||||
|
* path from the very first scan.
|
||||||
|
*/
|
||||||
|
export async function warmAll(): Promise<void> {
|
||||||
|
const connection = getConnection();
|
||||||
|
|
||||||
|
// Query 1: All runners
|
||||||
|
const runners = await connection
|
||||||
|
.getRepository(Runner)
|
||||||
|
.createQueryBuilder('runner')
|
||||||
|
.select(['runner.id', 'runner.firstname', 'runner.lastname'])
|
||||||
|
.getMany();
|
||||||
|
|
||||||
|
// Query 2: Total valid distance per runner
|
||||||
|
const distanceResults = await connection
|
||||||
|
.getRepository(TrackScan)
|
||||||
|
.createQueryBuilder('scan')
|
||||||
|
.select('scan.runner', 'runnerId')
|
||||||
|
.addSelect('COALESCE(SUM(track.distance), 0)', 'total')
|
||||||
|
.innerJoin('scan.track', 'track')
|
||||||
|
.where('scan.valid = :valid', { valid: true })
|
||||||
|
.groupBy('scan.runner')
|
||||||
|
.getRawMany();
|
||||||
|
|
||||||
|
// Query 3: Latest valid scan timestamp per runner
|
||||||
|
const latestResults = await connection
|
||||||
|
.getRepository(TrackScan)
|
||||||
|
.createQueryBuilder('scan')
|
||||||
|
.select('scan.runner', 'runnerId')
|
||||||
|
.addSelect('MAX(scan.timestamp)', 'latestTimestamp')
|
||||||
|
.where('scan.valid = :valid', { valid: true })
|
||||||
|
.groupBy('scan.runner')
|
||||||
|
.getRawMany();
|
||||||
|
|
||||||
|
// Build lookup maps
|
||||||
|
const distanceMap = new Map<number, number>();
|
||||||
|
distanceResults.forEach((row: any) => {
|
||||||
|
distanceMap.set(parseInt(row.runnerId, 10), parseInt(row.total, 10));
|
||||||
|
});
|
||||||
|
|
||||||
|
const latestMap = new Map<number, number>();
|
||||||
|
latestResults.forEach((row: any) => {
|
||||||
|
latestMap.set(parseInt(row.runnerId, 10), parseInt(row.latestTimestamp, 10));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Write all entries in parallel
|
||||||
|
const writePromises = runners.map((runner) => {
|
||||||
|
const entry: RunnerKVEntry = {
|
||||||
|
displayName: `${runner.firstname} ${runner.lastname}`,
|
||||||
|
totalDistance: distanceMap.get(runner.id) ?? 0,
|
||||||
|
latestTimestamp: latestMap.get(runner.id) ?? 0,
|
||||||
|
};
|
||||||
|
return setRunnerEntry(runner.id, entry);
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all(writePromises);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user