import { createTransport } from "nodemailer"; import { Queue, Worker, QueueEvents } from "bullmq"; import { config } from "../config/env"; import Redis from "ioredis"; import { Attachment } from "nodemailer/lib/mailer"; interface EmailJob { to: string; subject: string; html: string; attachments: Attachment[]; text: string; } export const QUEUE_NAME = "{lfkmailqueue}"; const connection = new Redis(config.redis.url, { maxRetriesPerRequest: null, }); export const emailQueue = new Queue(QUEUE_NAME, { connection, defaultJobOptions: { attempts: 3, backoff: { type: "exponential", delay: 1000, }, }, }); const transporter = createTransport({ host: config.smtp.host, port: config.smtp.port, auth: { user: config.smtp.user, pass: config.smtp.pass, }, }); const worker = new Worker( QUEUE_NAME, async (job) => { await transporter.sendMail({ from: { address: config.email.from, name: "Lauf für Kaya!" }, replyTo: config.email.replyTo, to: job.data.to, subject: job.data.subject, text: job.data.text, html: job.data.html, attachments: job.data.attachments, }); }, { connection, concurrency: 5, limiter: { max: 250, duration: 1000 * 60, // 250 emails per minute }, } ); const queueEvents = new QueueEvents(QUEUE_NAME, { connection }); worker.on("completed", (job) => { console.log(`Email job ${job.id} completed`); }); worker.on("failed", (job, error) => { console.error(`Email job ${job?.id} failed:`, error); }); queueEvents.on("waiting", ({ jobId }) => { console.log(`Job ${jobId} is waiting`); }); process.on("SIGTERM", async () => { await worker.close(); await connection.quit(); });