This commit is contained in:
Philipp Dormann 2024-11-21 18:27:45 +01:00
parent f6c8b2ee8c
commit e8c66e65e6
Signed by: philipp
GPG Key ID: 3BB9ADD52DCA4314
4 changed files with 58 additions and 41 deletions

BIN
bun.lockb

Binary file not shown.

View File

@ -14,6 +14,7 @@
"bullmq": "5.28.1", "bullmq": "5.28.1",
"hono": "4.6.11", "hono": "4.6.11",
"ioredis": "5.4.1", "ioredis": "5.4.1",
"nodemailer": "^6.9.16",
"zod": "3.23.8" "zod": "3.23.8"
}, },
"devDependencies": { "devDependencies": {

View File

@ -1,61 +1,80 @@
import { Queue, Worker, QueueEvents } from 'bullmq' import { createTransport } from "nodemailer";
import { EmailService } from '../services/email' import { Queue, Worker, QueueEvents } from "bullmq";
import { config } from '../config/env' import { config } from "../config/env";
import Redis from 'ioredis' import Redis from "ioredis";
interface EmailJob { interface EmailJob {
to: string to: string;
subject: string subject: string;
html: string html: string;
text: string text: string;
} }
const connection = new Redis(config.redis.url, { export const QUEUE_NAME = "{lfkmailqueue}";
maxRetriesPerRequest: null
})
export const emailQueue = new Queue<EmailJob>('email', { const connection = new Redis(config.redis.url, {
maxRetriesPerRequest: null,
});
export const emailQueue = new Queue<EmailJob>(QUEUE_NAME, {
connection, connection,
defaultJobOptions: { defaultJobOptions: {
attempts: 3, attempts: 3,
backoff: { backoff: {
type: 'exponential', type: "exponential",
delay: 1000 delay: 1000,
} },
},
});
const transporter = createTransport({
host: config.smtp.host,
port: config.smtp.port,
auth: {
user: config.smtp.user,
pass: config.smtp.pass
} }
}) })
const worker = new Worker<EmailJob>( const worker = new Worker<EmailJob>(
'email', QUEUE_NAME,
async (job) => { async (job) => {
const emailService = new EmailService() // const emailService = new EmailService();
await emailService.sendEmail(job.data) // await emailService.sendEmail(job.data);
console.log(job.data);
await transporter.sendMail({
from: config.email.from,
to: job.data.to,
subject: job.data.subject,
text: job.data.text,
html: job.data.html
})
}, },
{ {
connection, connection,
concurrency: 5, concurrency: 5,
limiter: { limiter: {
max: 100, max: 250,
duration: 1000 * 60 // 100 emails per minute duration: 1000 * 60, // 250 emails per minute
} },
} }
) );
const queueEvents = new QueueEvents('email', { connection }) const queueEvents = new QueueEvents(QUEUE_NAME, { connection });
worker.on('completed', (job) => { worker.on("completed", (job) => {
console.log(`Email job ${job.id} completed`) console.log(`Email job ${job.id} completed`);
}) });
worker.on('failed', (job, error) => { worker.on("failed", (job, error) => {
console.error(`Email job ${job?.id} failed:`, error) console.error(`Email job ${job?.id} failed:`, error);
}) });
queueEvents.on('waiting', ({ jobId }) => { queueEvents.on("waiting", ({ jobId }) => {
console.log(`Job ${jobId} is waiting`) // console.log(`Job ${jobId} is waiting`)
}) });
process.on('SIGTERM', async () => { process.on("SIGTERM", async () => {
await worker.close() await worker.close();
await connection.quit() await connection.quit();
}) });

View File

@ -18,11 +18,8 @@ export class EmailService {
// Add to queue instead of sending directly // Add to queue instead of sending directly
await emailQueue.add('send-email', email, { await emailQueue.add('send-email', email, {
removeOnComplete: true, removeOnComplete: true,
removeOnFail: 1000 // removeOnFail: 1000
}) })
// Log for development
await Bun.write('emails.log', JSON.stringify(email) + '\n', { append: true })
} }
async getQueueStatus() { async getQueueStatus() {