- maintenance worker now expires GDPR export bundles (db row + MinIO object) on the gdpr_exports.expires_at boundary, plus 90-day retention sweep on ai_usage_ledger; both jobs scheduled daily. - portId scoping added to listClientRelationships and listClientExports (defense-in-depth — parent-resource gates already prevent cross-tenant reads, but service layer should enforce on its own). - SELECT FOR UPDATE on parent client/company row inside add/update address transactions to serialize concurrent isPrimary toggles. - public /interests + /residential-inquiries endpoints swap their in-memory ipHits maps for the redis sliding-window limiter via the new rateLimiters.publicForm config (5/hr/IP), so the cap survives restarts and is shared across worker processes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
266 lines
9.2 KiB
TypeScript
266 lines
9.2 KiB
TypeScript
/**
|
|
* GDPR client-data export orchestration.
|
|
*
|
|
* `requestExport()` creates a row, queues a BullMQ job, and returns. The
|
|
* `processExportJob()` handler builds the bundle, ZIPs JSON+HTML into
|
|
* MinIO, optionally emails the client a download link, and updates the
|
|
* row to status='ready' or 'sent'.
|
|
*
|
|
* Bundles are kept for 30 days then expired by maintenance (the
|
|
* gdpr_exports.expires_at column is the cleanup target).
|
|
*/
|
|
|
|
import archiver from 'archiver';
|
|
import { eq, and } from 'drizzle-orm';
|
|
import { PassThrough } from 'node:stream';
|
|
|
|
import { db } from '@/lib/db';
|
|
import { gdprExports, type GdprExport } from '@/lib/db/schema/gdpr';
|
|
import { clients, clientContacts } from '@/lib/db/schema/clients';
|
|
import { ports } from '@/lib/db/schema/ports';
|
|
import { env } from '@/lib/env';
|
|
import { NotFoundError, ValidationError } from '@/lib/errors';
|
|
import { logger } from '@/lib/logger';
|
|
import { minioClient, getPresignedUrl } from '@/lib/minio';
|
|
import { getQueue } from '@/lib/queue';
|
|
import { createAuditLog } from '@/lib/audit';
|
|
import { buildClientBundle, renderBundleHtml } from '@/lib/services/gdpr-bundle-builder';
|
|
|
|
const EXPIRY_DAYS = 30;
|
|
const PRESIGN_EXPIRY_SECONDS = 7 * 24 * 60 * 60; // 7 days for the email link
|
|
|
|
interface RequestExportInput {
|
|
clientId: string;
|
|
portId: string;
|
|
requestedBy: string;
|
|
/** When true, the bundle is emailed to the client's primary address once ready. */
|
|
emailToClient: boolean;
|
|
/** Override recipient (e.g. lawyer or agent). When set, takes precedence over the client's primary email. */
|
|
emailOverride?: string | null;
|
|
ipAddress: string;
|
|
userAgent: string;
|
|
}
|
|
|
|
export interface RequestExportResult {
|
|
export: GdprExport;
|
|
}
|
|
|
|
export async function requestGdprExport(input: RequestExportInput): Promise<RequestExportResult> {
|
|
const client = await db.query.clients.findFirst({
|
|
where: eq(clients.id, input.clientId),
|
|
});
|
|
if (!client || client.portId !== input.portId) throw new NotFoundError('Client');
|
|
|
|
if (input.emailToClient && !input.emailOverride) {
|
|
const primary = await db.query.clientContacts.findFirst({
|
|
where: and(
|
|
eq(clientContacts.clientId, input.clientId),
|
|
eq(clientContacts.channel, 'email'),
|
|
eq(clientContacts.isPrimary, true),
|
|
),
|
|
});
|
|
if (!primary) {
|
|
throw new ValidationError(
|
|
'Client has no primary email contact — provide an emailOverride or add one before exporting.',
|
|
);
|
|
}
|
|
}
|
|
|
|
const [row] = await db
|
|
.insert(gdprExports)
|
|
.values({
|
|
portId: input.portId,
|
|
clientId: input.clientId,
|
|
requestedBy: input.requestedBy,
|
|
status: 'pending',
|
|
})
|
|
.returning();
|
|
if (!row) throw new Error('Failed to create export row');
|
|
|
|
void createAuditLog({
|
|
userId: input.requestedBy,
|
|
portId: input.portId,
|
|
action: 'request_gdpr_export',
|
|
entityType: 'client',
|
|
entityId: input.clientId,
|
|
metadata: { exportId: row.id, emailToClient: input.emailToClient },
|
|
ipAddress: input.ipAddress,
|
|
userAgent: input.userAgent,
|
|
});
|
|
|
|
await getQueue('export').add('gdpr-export', {
|
|
exportId: row.id,
|
|
portId: input.portId,
|
|
clientId: input.clientId,
|
|
emailToClient: input.emailToClient,
|
|
emailOverride: input.emailOverride ?? null,
|
|
});
|
|
|
|
return { export: row };
|
|
}
|
|
|
|
interface ProcessJobInput {
|
|
exportId: string;
|
|
portId: string;
|
|
clientId: string;
|
|
emailToClient: boolean;
|
|
emailOverride: string | null;
|
|
}
|
|
|
|
/**
|
|
* Worker entry point. Loads the bundle, ZIPs it, uploads to MinIO,
|
|
* (optionally) emails the client. Failures mark the row 'failed' with
|
|
* the truncated error.
|
|
*/
|
|
export async function processGdprExportJob(input: ProcessJobInput): Promise<void> {
|
|
await db
|
|
.update(gdprExports)
|
|
.set({ status: 'building' })
|
|
.where(eq(gdprExports.id, input.exportId));
|
|
|
|
try {
|
|
const bundle = await buildClientBundle(input.clientId, input.portId);
|
|
const json = JSON.stringify(bundle, null, 2);
|
|
const html = renderBundleHtml(bundle);
|
|
|
|
// Stream a ZIP into a buffer. Receipts/contracts are not included
|
|
// here — they live on file rows referenced by the bundle and would
|
|
// bloat the archive. Add them later if Article-15 requests demand.
|
|
const zip = archiver('zip', { zlib: { level: 9 } });
|
|
const sink = new PassThrough();
|
|
const chunks: Buffer[] = [];
|
|
sink.on('data', (c: Buffer) => chunks.push(c));
|
|
const done = new Promise<Buffer>((resolve, reject) => {
|
|
sink.on('end', () => resolve(Buffer.concat(chunks)));
|
|
sink.on('error', reject);
|
|
zip.on('error', reject);
|
|
});
|
|
zip.pipe(sink);
|
|
zip.append(json, { name: 'client.json' });
|
|
zip.append(html, { name: 'client.html' });
|
|
zip.append(
|
|
`Personal data export for client ${input.clientId}\nGenerated ${bundle.meta.generatedAt}\n`,
|
|
{ name: 'README.txt' },
|
|
);
|
|
await zip.finalize();
|
|
const buffer = await done;
|
|
|
|
const port = await db.query.ports.findFirst({ where: eq(ports.id, input.portId) });
|
|
const portSlug = port?.slug ?? 'unknown';
|
|
const storageKey = `${portSlug}/gdpr-exports/${input.clientId}/${input.exportId}.zip`;
|
|
|
|
await minioClient.putObject(env.MINIO_BUCKET, storageKey, buffer, buffer.length, {
|
|
'Content-Type': 'application/zip',
|
|
'Content-Disposition': `attachment; filename="gdpr-export-${input.clientId}.zip"`,
|
|
});
|
|
|
|
const expiresAt = new Date(Date.now() + EXPIRY_DAYS * 24 * 60 * 60 * 1000);
|
|
await db
|
|
.update(gdprExports)
|
|
.set({
|
|
status: 'ready',
|
|
storageKey,
|
|
sizeBytes: buffer.length,
|
|
readyAt: new Date(),
|
|
expiresAt,
|
|
})
|
|
.where(eq(gdprExports.id, input.exportId));
|
|
|
|
if (input.emailToClient) {
|
|
await emailExport(input, storageKey);
|
|
}
|
|
} catch (err) {
|
|
logger.error({ err, exportId: input.exportId }, 'GDPR export job failed');
|
|
await db
|
|
.update(gdprExports)
|
|
.set({
|
|
status: 'failed',
|
|
error: err instanceof Error ? err.message.slice(0, 1000) : 'Unknown error',
|
|
})
|
|
.where(eq(gdprExports.id, input.exportId));
|
|
throw err; // let BullMQ retry per the queue config
|
|
}
|
|
}
|
|
|
|
async function emailExport(input: ProcessJobInput, storageKey: string): Promise<void> {
|
|
// Resolve the recipient: explicit override beats primary contact.
|
|
let recipient = input.emailOverride;
|
|
if (!recipient) {
|
|
const primary = await db.query.clientContacts.findFirst({
|
|
where: and(
|
|
eq(clientContacts.clientId, input.clientId),
|
|
eq(clientContacts.channel, 'email'),
|
|
eq(clientContacts.isPrimary, true),
|
|
),
|
|
});
|
|
recipient = primary?.value ?? null;
|
|
}
|
|
if (!recipient) {
|
|
logger.warn(
|
|
{ exportId: input.exportId, clientId: input.clientId },
|
|
'GDPR export ready but no email recipient — skipping send',
|
|
);
|
|
return;
|
|
}
|
|
|
|
const url = await getPresignedUrl(storageKey, PRESIGN_EXPIRY_SECONDS);
|
|
const client = await db.query.clients.findFirst({ where: eq(clients.id, input.clientId) });
|
|
const name = client?.fullName ?? 'there';
|
|
const expiry = new Date(Date.now() + PRESIGN_EXPIRY_SECONDS * 1000).toUTCString();
|
|
|
|
const subject = 'Your personal data export is ready';
|
|
const html = `
|
|
<p>Hello ${escapeHtml(name)},</p>
|
|
<p>You requested a copy of the personal data we hold about you. The export is ready and contains:</p>
|
|
<ul>
|
|
<li><code>client.json</code> — machine-readable data dump</li>
|
|
<li><code>client.html</code> — same data as a printable web page</li>
|
|
</ul>
|
|
<p><a href="${url}">Download the export (ZIP, expires ${escapeHtml(expiry)})</a></p>
|
|
<p>If you have any questions, reply to this email.</p>
|
|
`;
|
|
const text = `Your personal data export is ready: ${url}\nThe link expires ${expiry}.`;
|
|
|
|
const { sendEmail } = await import('@/lib/email/index');
|
|
await sendEmail(recipient, subject, html, undefined, text, input.portId);
|
|
|
|
await db
|
|
.update(gdprExports)
|
|
.set({ status: 'sent', sentAt: new Date(), sentTo: recipient })
|
|
.where(eq(gdprExports.id, input.exportId));
|
|
}
|
|
|
|
function escapeHtml(s: unknown): string {
|
|
if (s === null || s === undefined) return '';
|
|
return String(s)
|
|
.replace(/&/g, '&')
|
|
.replace(/</g, '<')
|
|
.replace(/>/g, '>')
|
|
.replace(/"/g, '"')
|
|
.replace(/'/g, ''');
|
|
}
|
|
|
|
/** Lists exports for a client (most-recent first) — feeds the admin "history" UI. */
|
|
export async function listClientExports(clientId: string, portId: string) {
|
|
const client = await db.query.clients.findFirst({ where: eq(clients.id, clientId) });
|
|
if (!client || client.portId !== portId) throw new NotFoundError('Client');
|
|
|
|
return db.query.gdprExports.findMany({
|
|
where: and(eq(gdprExports.clientId, clientId), eq(gdprExports.portId, portId)),
|
|
orderBy: (t, { desc }) => [desc(t.createdAt)],
|
|
limit: 25,
|
|
});
|
|
}
|
|
|
|
/** Generates a fresh signed URL for an existing ready/sent export. */
|
|
export async function getExportDownloadUrl(exportId: string, portId: string): Promise<string> {
|
|
const row = await db.query.gdprExports.findFirst({
|
|
where: and(eq(gdprExports.id, exportId), eq(gdprExports.portId, portId)),
|
|
});
|
|
if (!row) throw new NotFoundError('Export');
|
|
if (!row.storageKey || (row.status !== 'ready' && row.status !== 'sent')) {
|
|
throw new ValidationError('Export is not ready to download');
|
|
}
|
|
return getPresignedUrl(row.storageKey, PRESIGN_EXPIRY_SECONDS);
|
|
}
|