/** * Background job queue using BullMQ */ import { Queue, QueueOptions, Worker, Job, JobsOptions } from 'bullmq'; import IORedis from 'ioredis'; import { getEnv } from '@the-order/shared'; export interface JobQueueConfig { connection?: { host?: string; port?: number; password?: string; db?: number; }; defaultJobOptions?: JobsOptions; } export interface JobData { [key: string]: unknown; } export type JobHandler = (job: Job) => Promise; /** * Job Queue Manager */ export class JobQueue { private queues: Map = new Map(); private workers: Map = new Map(); private connection: IORedis; constructor(private config?: JobQueueConfig) { const env = getEnv(); const redisUrl = env.REDIS_URL; // Create Redis connection if (redisUrl) { this.connection = new IORedis(redisUrl, { maxRetriesPerRequest: null, enableReadyCheck: false, }); } else { // Use connection config or defaults this.connection = new IORedis({ host: config?.connection?.host || 'localhost', port: config?.connection?.port || 6379, password: config?.connection?.password, db: config?.connection?.db || 0, maxRetriesPerRequest: null, enableReadyCheck: false, }); } } /** * Create or get a queue */ createQueue(name: string, options?: QueueOptions): Queue { if (this.queues.has(name)) { return this.queues.get(name) as Queue; } const queue = new Queue(name, { connection: this.connection, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000, }, removeOnComplete: { age: 24 * 3600, // Keep completed jobs for 24 hours count: 1000, // Keep last 1000 completed jobs }, removeOnFail: { age: 7 * 24 * 3600, // Keep failed jobs for 7 days }, ...this.config?.defaultJobOptions, ...options?.defaultJobOptions, }, ...options, }); this.queues.set(name, queue); return queue; } /** * Create a worker for a queue */ createWorker( queueName: string, handler: JobHandler, options?: { concurrency?: number } ): Worker { if (this.workers.has(queueName)) { return this.workers.get(queueName) as Worker; } const worker = new Worker( queueName, async (job: Job) => { try { return await handler(job); } catch (error) { console.error(`Job ${job.id} failed:`, error); throw error; } }, { connection: this.connection, concurrency: options?.concurrency || 1, } ); // Set up event handlers worker.on('completed', (job: Job) => { console.log(`Job ${job.id} completed`); }); worker.on('failed', (job: Job | undefined, err: Error) => { console.error(`Job ${job?.id || 'unknown'} failed:`, err); }); this.workers.set(queueName, worker); return worker; } /** * Add a job to a queue */ async addJob( queueName: string, data: T, options?: JobsOptions ): Promise> { const queue = this.queues.get(queueName) || this.createQueue(queueName); // eslint-disable-next-line @typescript-eslint/no-explicit-any return queue.add('default' as any, data as any, options); } /** * Add a scheduled job */ async addScheduledJob( queueName: string, data: T, delay: number | Date, options?: JobsOptions ): Promise> { const queue = this.queues.get(queueName) || this.createQueue(queueName); // eslint-disable-next-line @typescript-eslint/no-explicit-any return queue.add('default' as any, data as any, { ...options, delay: typeof delay === 'number' ? delay : delay.getTime() - Date.now(), }); } /** * Add a recurring job (cron) */ async addRecurringJob( queueName: string, data: T, cronPattern: string, options?: JobsOptions ): Promise> { const queue = this.queues.get(queueName) || this.createQueue(queueName); // eslint-disable-next-line @typescript-eslint/no-explicit-any return queue.add('default' as any, data as any, { ...options, repeat: { pattern: cronPattern, }, }); } /** * Get job status */ async getJobStatus(queueName: string, jobId: string): Promise { const queue = this.queues.get(queueName); if (!queue) { throw new Error(`Queue ${queueName} not found`); } const job = await queue.getJob(jobId); if (!job) { return null; } return { id: job.id, name: job.name, data: job.data, state: await job.getState(), progress: job.progress, returnvalue: job.returnvalue, failedReason: job.failedReason, timestamp: job.timestamp, processedOn: job.processedOn, finishedOn: job.finishedOn, }; } /** * Close all queues and workers */ async close(): Promise { // Close all workers for (const worker of this.workers.values()) { await worker.close(); } this.workers.clear(); // Close all queues for (const queue of this.queues.values()) { await queue.close(); } this.queues.clear(); // Close Redis connection await this.connection.quit(); } } /** * Default job queue instance */ let defaultJobQueue: JobQueue | null = null; /** * Get or create default job queue */ export function getJobQueue(config?: JobQueueConfig): JobQueue { if (!defaultJobQueue) { defaultJobQueue = new JobQueue(config); } return defaultJobQueue; }