import { query } from "../db/postgres"; interface DeadLetterMessage { messageId: string; originalQueue: string; payload: any; error: string; retryCount: number; createdAt: string; } /** * Add message to dead letter queue */ export async function addToDLQ( queue: string, payload: any, error: string ): Promise { const messageId = `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; await query( `INSERT INTO dead_letter_queue (message_id, queue, payload, error, retry_count, created_at) VALUES ($1, $2, $3, $4, $5, $6)`, [messageId, queue, JSON.stringify(payload), error, 0, new Date().toISOString()] ); } /** * Get messages from DLQ for retry */ export async function getDLQMessages(queue: string, limit = 10): Promise { const result = await query( `SELECT * FROM dead_letter_queue WHERE queue = $1 AND retry_count < 3 ORDER BY created_at ASC LIMIT $2`, [queue, limit] ); return result.map((row: any) => ({ messageId: row.message_id, originalQueue: row.queue, payload: typeof row.payload === "string" ? JSON.parse(row.payload) : row.payload, error: row.error, retryCount: row.retry_count, createdAt: row.created_at ? (row.created_at instanceof Date ? row.created_at.toISOString() : String(row.created_at)) : new Date().toISOString(), })); } /** * Increment retry count */ export async function incrementRetryCount(messageId: string): Promise { await query( `UPDATE dead_letter_queue SET retry_count = retry_count + 1, updated_at = CURRENT_TIMESTAMP WHERE message_id = $1`, [messageId] ); }