fix(ops): /health DB+Redis checks, validated env.REDIS_URL across workers, error_events 90d retention
Three audit-pass-#3 findings, all in the "wakes you at 3am" category.
- /api/public/health now runs DB SELECT 1 + Redis PING in parallel and
returns 503 + a degraded payload when either fails. Anonymous probes
(no X-Intake-Secret) still get a flat {status:'ok'} so generic uptime
monitors keep working; authenticated probes see the dep results.
- All worker entrypoints (ai, bulk, documents, email, export, import,
maintenance, notifications, reports, webhooks) and src/lib/redis.ts
now use env.REDIS_URL (Zod-validated at boot) instead of
process.env.REDIS_URL!. Previously a missing env let the app start
silently and fail at first job pickup.
- maintenance worker gains an `error-events-retention` case that
delete()s rows older than 90 days from error_events. scheduler.ts
registers it at 06:00 daily. Closes the contract from migration
0040 which declared the table "pruned at 90 days" but had no
implementation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -310,7 +311,7 @@ export const aiWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.ai.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -19,7 +20,7 @@ export const documentsWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.documents.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -56,7 +57,7 @@ export const emailWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.email.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -26,7 +27,7 @@ export const exportWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.export.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -14,7 +15,7 @@ export const importWorker = new Worker(
|
||||
// - Note: maxAttempts=1 - imports are idempotent, user retries manually
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.import.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
import { and, eq, lt, isNotNull } from 'drizzle-orm';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
@@ -6,12 +7,16 @@ import { db } from '@/lib/db';
|
||||
import { formSubmissions } from '@/lib/db/schema/documents';
|
||||
import { gdprExports } from '@/lib/db/schema/gdpr';
|
||||
import { aiUsageLedger } from '@/lib/db/schema/ai-usage';
|
||||
import { errorEvents } from '@/lib/db/schema/system';
|
||||
import { logger } from '@/lib/logger';
|
||||
import { getStorageBackend } from '@/lib/storage';
|
||||
import { QUEUE_CONFIGS } from '@/lib/queue';
|
||||
|
||||
/** AI usage rows older than this are deleted by the retention job. */
|
||||
const AI_USAGE_RETENTION_DAYS = 90;
|
||||
/** error_events rows older than this are pruned. Migration 0040 declares
|
||||
* this contract; the worker had no implementation until now. */
|
||||
const ERROR_EVENTS_RETENTION_DAYS = 90;
|
||||
|
||||
export const maintenanceWorker = new Worker(
|
||||
'maintenance',
|
||||
@@ -113,12 +118,27 @@ export const maintenanceWorker = new Worker(
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'error-events-retention': {
|
||||
// Honor the contract from migration 0040: error_events older than
|
||||
// ERROR_EVENTS_RETENTION_DAYS get dropped. Otherwise the table
|
||||
// grows unbounded and the admin error log becomes unusable.
|
||||
const cutoff = new Date(Date.now() - ERROR_EVENTS_RETENTION_DAYS * 24 * 60 * 60 * 1000);
|
||||
const result = await db
|
||||
.delete(errorEvents)
|
||||
.where(lt(errorEvents.createdAt, cutoff))
|
||||
.returning({ requestId: errorEvents.requestId });
|
||||
logger.info(
|
||||
{ deleted: result.length, retentionDays: ERROR_EVENTS_RETENTION_DAYS },
|
||||
'Error events retention sweep complete',
|
||||
);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.maintenance.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -79,7 +80,7 @@ export const notificationsWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.notifications.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
@@ -21,10 +22,7 @@ export const reportsWorker = new Worker(
|
||||
.select()
|
||||
.from(scheduledReports)
|
||||
.where(
|
||||
and(
|
||||
eq(scheduledReports.isActive, true),
|
||||
lte(scheduledReports.nextRunAt, new Date()),
|
||||
),
|
||||
and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())),
|
||||
);
|
||||
|
||||
for (const report of dueReports) {
|
||||
@@ -64,7 +62,7 @@ export const reportsWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.reports.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { env } from '@/lib/env';
|
||||
import { createHmac } from 'node:crypto';
|
||||
import { lookup } from 'node:dns/promises';
|
||||
|
||||
@@ -277,7 +278,7 @@ export const webhooksWorker = new Worker(
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
concurrency: QUEUE_CONFIGS.webhooks.concurrency,
|
||||
},
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user