diff --git a/bun.lockb b/bun.lockb index 0ece36a..7dcebf1 100644 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 583df16..58cf923 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "bullmq": "5.28.1", "hono": "4.6.11", "ioredis": "5.4.1", + "nodemailer": "^6.9.16", "zod": "3.23.8" }, "devDependencies": { diff --git a/src/queues/email.queue.ts b/src/queues/email.queue.ts index a24b973..a4a4762 100644 --- a/src/queues/email.queue.ts +++ b/src/queues/email.queue.ts @@ -1,61 +1,80 @@ -import { Queue, Worker, QueueEvents } from 'bullmq' -import { EmailService } from '../services/email' -import { config } from '../config/env' -import Redis from 'ioredis' +import { createTransport } from "nodemailer"; +import { Queue, Worker, QueueEvents } from "bullmq"; +import { config } from "../config/env"; +import Redis from "ioredis"; interface EmailJob { - to: string - subject: string - html: string - text: string + to: string; + subject: string; + html: string; + text: string; } -const connection = new Redis(config.redis.url, { - maxRetriesPerRequest: null -}) +export const QUEUE_NAME = "{lfkmailqueue}"; -export const emailQueue = new Queue('email', { +const connection = new Redis(config.redis.url, { + maxRetriesPerRequest: null, +}); + +export const emailQueue = new Queue(QUEUE_NAME, { connection, defaultJobOptions: { attempts: 3, backoff: { - type: 'exponential', - delay: 1000 - } + type: "exponential", + delay: 1000, + }, + }, +}); + +const transporter = createTransport({ + host: config.smtp.host, + port: config.smtp.port, + auth: { + user: config.smtp.user, + pass: config.smtp.pass } }) const worker = new Worker( - 'email', + QUEUE_NAME, async (job) => { - const emailService = new EmailService() - await emailService.sendEmail(job.data) + // const emailService = new EmailService(); + // await emailService.sendEmail(job.data); + console.log(job.data); + await transporter.sendMail({ + from: config.email.from, + to: job.data.to, + subject: job.data.subject, + text: job.data.text, + html: job.data.html + }) }, - { + { connection, concurrency: 5, limiter: { - max: 100, - duration: 1000 * 60 // 100 emails per minute - } + max: 250, + duration: 1000 * 60, // 250 emails per minute + }, } -) +); -const queueEvents = new QueueEvents('email', { connection }) +const queueEvents = new QueueEvents(QUEUE_NAME, { connection }); -worker.on('completed', (job) => { - console.log(`Email job ${job.id} completed`) -}) +worker.on("completed", (job) => { + console.log(`Email job ${job.id} completed`); +}); -worker.on('failed', (job, error) => { - console.error(`Email job ${job?.id} failed:`, error) -}) +worker.on("failed", (job, error) => { + console.error(`Email job ${job?.id} failed:`, error); +}); -queueEvents.on('waiting', ({ jobId }) => { - console.log(`Job ${jobId} is waiting`) -}) +queueEvents.on("waiting", ({ jobId }) => { + // console.log(`Job ${jobId} is waiting`) +}); -process.on('SIGTERM', async () => { - await worker.close() - await connection.quit() -}) \ No newline at end of file +process.on("SIGTERM", async () => { + await worker.close(); + await connection.quit(); +}); diff --git a/src/services/email.ts b/src/services/email.ts index 3e45851..3353087 100644 --- a/src/services/email.ts +++ b/src/services/email.ts @@ -18,11 +18,8 @@ export class EmailService { // Add to queue instead of sending directly await emailQueue.add('send-email', email, { removeOnComplete: true, - removeOnFail: 1000 + // removeOnFail: 1000 }) - - // Log for development - await Bun.write('emails.log', JSON.stringify(email) + '\n', { append: true }) } async getQueueStatus() {