import { db } from "../../db/index"; import { xosJobQueue, xosAgentMetrics, type XosJob, type InsertXosJob, type XosAgentMetric, } from "@shared/schema"; import { eq, desc, and, sql, lte, isNull, asc } from "drizzle-orm"; export interface JobResult { success: boolean; data?: any; error?: string; } class JobQueueService { private processingInterval: NodeJS.Timeout | null = null; private readonly POLL_INTERVAL = 10000; private jobHandlers: Map Promise> = new Map(); registerHandler(type: string, handler: (job: XosJob) => Promise): void { this.jobHandlers.set(type, handler); console.log(`[JobQueue] Handler registrado para tipo: ${type}`); } async enqueue(job: InsertXosJob): Promise { const [created] = await db.insert(xosJobQueue).values({ ...job, status: "pending", attempts: 0, }).returning(); console.log(`[JobQueue] Job #${created.id} enfileirado: ${created.type}`); return created; } async enqueueMany(jobs: InsertXosJob[]): Promise { if (jobs.length === 0) return []; const created = await db.insert(xosJobQueue).values( jobs.map(j => ({ ...j, status: "pending", attempts: 0 })) ).returning(); console.log(`[JobQueue] ${created.length} jobs enfileirados em lote`); return created; } async claimJob(agentName: string, types?: string[]): Promise { const now = new Date(); let typeFilter = sql`TRUE`; if (types && types.length > 0) { typeFilter = sql`${xosJobQueue.type} IN (${sql.join(types.map(t => sql`${t}`), sql`, `)})`; } const claimed = await db.execute(sql` UPDATE xos_job_queue SET status = 'processing', assigned_agent = ${agentName}, started_at = ${now}, attempts = COALESCE(attempts, 0) + 1 WHERE id = ( SELECT id FROM xos_job_queue WHERE status = 'pending' AND (scheduled_at IS NULL OR scheduled_at <= ${now}) AND ${typeFilter} ORDER BY priority ASC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * `); const rows = (claimed as any).rows || claimed; if (!rows || rows.length === 0) return null; return rows[0] as XosJob; } async completeJob(jobId: number, result: JobResult): Promise { await db.update(xosJobQueue).set({ status: result.success ? "completed" : "failed", result: result.data || null, error: result.error || null, completedAt: new Date(), }).where(eq(xosJobQueue.id, jobId)); } async retryJob(jobId: number): Promise { const [job] = await db.select().from(xosJobQueue).where(eq(xosJobQueue.id, jobId)); if (!job) return false; if ((job.attempts || 0) >= (job.maxAttempts || 3)) { await db.update(xosJobQueue).set({ status: "dead_letter" }).where(eq(xosJobQueue.id, jobId)); return false; } await db.update(xosJobQueue).set({ status: "pending", assignedAgent: null, startedAt: null, error: null, }).where(eq(xosJobQueue.id, jobId)); return true; } async getJobs(status?: string, limit: number = 50): Promise { if (status) { return db.select().from(xosJobQueue) .where(eq(xosJobQueue.status, status)) .orderBy(desc(xosJobQueue.createdAt)) .limit(limit); } return db.select().from(xosJobQueue) .orderBy(desc(xosJobQueue.createdAt)) .limit(limit); } async getJobStats(): Promise<{ pending: number; processing: number; completed: number; failed: number; deadLetter: number; total: number; }> { const [stats] = await db.select({ pending: sql`count(*) filter (where status = 'pending')`, processing: sql`count(*) filter (where status = 'processing')`, completed: sql`count(*) filter (where status = 'completed')`, failed: sql`count(*) filter (where status = 'failed')`, deadLetter: sql`count(*) filter (where status = 'dead_letter')`, total: sql`count(*)`, }).from(xosJobQueue); return { pending: Number(stats.pending), processing: Number(stats.processing), completed: Number(stats.completed), failed: Number(stats.failed), deadLetter: Number(stats.deadLetter), total: Number(stats.total), }; } async cancelJob(jobId: number): Promise { await db.update(xosJobQueue).set({ status: "cancelled", completedAt: new Date() }).where(eq(xosJobQueue.id, jobId)); } async recordAgentMetrics(agentName: string, metrics: { tasksCompleted?: number; tasksFailed?: number; avgDurationMs?: number; skillsCreated?: number; policiesTriggered?: number; metadata?: any; }): Promise { const period = new Date().toISOString().slice(0, 13); await db.insert(xosAgentMetrics).values({ agentName, period, ...metrics, }); } async getAgentMetrics(agentName?: string, limit: number = 100): Promise { if (agentName) { return db.select().from(xosAgentMetrics) .where(eq(xosAgentMetrics.agentName, agentName)) .orderBy(desc(xosAgentMetrics.createdAt)) .limit(limit); } return db.select().from(xosAgentMetrics) .orderBy(desc(xosAgentMetrics.createdAt)) .limit(limit); } async getAgentSummary(): Promise> { const rows = await db.select({ agent: xosAgentMetrics.agentName, totalCompleted: sql`coalesce(sum(${xosAgentMetrics.tasksCompleted}), 0)`, totalFailed: sql`coalesce(sum(${xosAgentMetrics.tasksFailed}), 0)`, avgDuration: sql`coalesce(avg(${xosAgentMetrics.avgDurationMs}), 0)`, }).from(xosAgentMetrics) .groupBy(xosAgentMetrics.agentName); return rows.map(r => ({ agent: r.agent, totalCompleted: Number(r.totalCompleted), totalFailed: Number(r.totalFailed), avgDuration: Math.round(Number(r.avgDuration)), })); } async cleanupOldJobs(daysOld: number = 30): Promise { const cutoff = new Date(Date.now() - daysOld * 86400000); const result = await db.delete(xosJobQueue) .where(and( sql`${xosJobQueue.status} IN ('completed', 'cancelled', 'dead_letter')`, lte(xosJobQueue.completedAt, cutoff), )); return 0; } startProcessing(): void { if (this.processingInterval) return; this.processingInterval = setInterval(async () => { try { await this.processNextJobs(); } catch (error: any) { console.error("[JobQueue] Erro no processamento:", error.message); } }, this.POLL_INTERVAL); console.log("[JobQueue] Processamento automático iniciado (intervalo: 10s)"); } stopProcessing(): void { if (this.processingInterval) { clearInterval(this.processingInterval); this.processingInterval = null; console.log("[JobQueue] Processamento automático parado"); } } private async processNextJobs(): Promise { const handlerTypes = Array.from(this.jobHandlers.keys()); if (handlerTypes.length === 0) return; for (let i = 0; i < 5; i++) { const job = await this.claimJob("job-processor", handlerTypes); if (!job) break; const handler = this.jobHandlers.get(job.type); if (!handler) continue; try { const result = await handler(job); await this.completeJob(job.id, result); } catch (error: any) { const canRetry = (job.attempts || 0) < (job.maxAttempts || 3); await db.update(xosJobQueue).set({ status: canRetry ? "pending" : "dead_letter", error: error.message, assignedAgent: null, completedAt: canRetry ? null : new Date(), }).where(eq(xosJobQueue.id, job.id)); } } } } export const jobQueueService = new JobQueueService();