/** * Storage backend migration core. The CLI in `scripts/migrate-storage.ts` and * the admin API at `/api/v1/admin/storage/migrate` both call `runMigration()` * here, so behaviour is identical regardless of trigger. * * See docs/berth-recommender-and-pdf-plan.md §4.7a + §14.9a for the contract. */ import { createHash } from 'node:crypto'; import { statfs } from 'node:fs/promises'; import { Readable } from 'node:stream'; import { and, eq, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { systemSettings } from '@/lib/db/schema/system'; import { FilesystemBackend } from './filesystem'; import { resetStorageBackendCache, type StorageBackend, type StorageBackendName } from './index'; import { S3Backend } from './s3'; // ─── tables to walk ───────────────────────────────────────────────────────── export interface StorageKeyTable { table: string; /** Column name holding the storage key (always `storage_key` going forward). */ keyColumn: string; /** Primary-key column for per-row progress markers. */ pkColumn: string; /** Optional content-type column (lets the target backend persist Content-Type). */ contentTypeColumn?: string; } /** * Tables that hold blob references the migration script must walk. * * Column naming is intentionally inconsistent across the schema for historical * reasons: * - `files.storage_path` (oldest table, named before §4.7a rename) * - `berth_pdf_versions.storage_key` (Phase 6b - followed the new convention) * - `brochure_versions.storage_key` (Phase 6b) * - `gdpr_exports.storage_key` (worker-uploaded export bundle) * * None of these tables carry a per-row content-type column today * (`files.mime_type` exists but isn't the same semantics - it's the * original-upload mime, not the stored object's Content-Type header). The * migration falls back to `application/octet-stream` when * `contentTypeColumn` is omitted; the byte stream is what matters for the * sha256-verified round-trip and the original Content-Type is already * persisted on the source object's S3 metadata. * * The `report_snapshots` table called out in the audit does not exist yet. * Add it here when it lands. */ export const TABLES_WITH_STORAGE_KEYS: StorageKeyTable[] = [ { table: 'files', keyColumn: 'storage_path', pkColumn: 'id' }, { table: 'berth_pdf_versions', keyColumn: 'storage_key', pkColumn: 'id' }, { table: 'brochure_versions', keyColumn: 'storage_key', pkColumn: 'id' }, { table: 'gdpr_exports', keyColumn: 'storage_key', pkColumn: 'id' }, // Last-resort recovery: pg_dump artefacts from the BackupService. The // audit caught these were missing - flipping the storage backend used // to silently orphan every backup, dark-blacking the recovery path. { table: 'backup_jobs', keyColumn: 'storage_path', pkColumn: 'id' }, ]; const ADVISORY_LOCK_KEY = 0xc7000a01; // ─── helpers ──────────────────────────────────────────────────────────────── interface CliArgs { from: StorageBackendName; to: StorageBackendName; dryRun: boolean; } export function parseArgs(argv: string[]): CliArgs { const args: Partial = { dryRun: false }; for (let i = 0; i < argv.length; i++) { const a = argv[i]; if (a === '--dry-run') args.dryRun = true; else if (a === '--from') args.from = argv[++i] as StorageBackendName; else if (a === '--to') args.to = argv[++i] as StorageBackendName; } if (!args.from || !args.to || (args.from !== 's3' && args.from !== 'filesystem')) { throw new Error('Usage: --from s3|filesystem --to s3|filesystem [--dry-run]'); } if (args.to !== 's3' && args.to !== 'filesystem') { throw new Error('--to must be s3 or filesystem'); } if (args.from === args.to) { throw new Error('--from and --to must differ'); } return args as CliArgs; } async function ensureProgressTable(): Promise { await db.execute(sql` CREATE TABLE IF NOT EXISTS _storage_migration_progress ( table_name text NOT NULL, row_pk text NOT NULL, storage_key text NOT NULL, sha256 text NOT NULL, size_bytes bigint NOT NULL, migrated_at timestamptz NOT NULL DEFAULT now(), PRIMARY KEY (table_name, row_pk) ) `); } function rowsOf(result: unknown): unknown[] { if (Array.isArray(result)) return result; const r = result as { rows?: unknown[] } | null; return r?.rows ?? []; } async function isRowMigrated(tableName: string, pk: string): Promise { const res = await db.execute(sql` SELECT 1 FROM _storage_migration_progress WHERE table_name = ${tableName} AND row_pk = ${pk} LIMIT 1 `); return rowsOf(res).length > 0; } async function markRowMigrated( tableName: string, pk: string, key: string, sha256: string, sizeBytes: number, ): Promise { await db.execute(sql` INSERT INTO _storage_migration_progress (table_name, row_pk, storage_key, sha256, size_bytes) VALUES (${tableName}, ${pk}, ${key}, ${sha256}, ${sizeBytes}) ON CONFLICT (table_name, row_pk) DO NOTHING `); } export interface RowRef { tableName: string; pk: string; key: string; contentType: string; } async function listKeysFor(tbl: StorageKeyTable): Promise { const ctSelect = tbl.contentTypeColumn ? `, ${tbl.contentTypeColumn} as content_type` : ''; const result = await db.execute( sql.raw( `SELECT ${tbl.pkColumn} as pk, ${tbl.keyColumn} as key${ctSelect} FROM ${tbl.table} WHERE ${tbl.keyColumn} IS NOT NULL`, ), ); const rows = rowsOf(result) as Array<{ pk: unknown; key: unknown; content_type?: unknown }>; return rows.map((r) => ({ tableName: tbl.table, pk: String(r.pk), key: String(r.key), contentType: typeof r.content_type === 'string' && r.content_type.length > 0 ? r.content_type : 'application/octet-stream', })); } /** * Inventory every blob reference across all blob-bearing tables. Used by the * full-backup exporter (Phase 4a) to enumerate what to bundle. `excludeTables` * lets the exporter drop `backup_jobs` so a full export doesn't recursively * include prior backup artefacts. */ export async function collectStorageRefs(opts?: { excludeTables?: string[] }): Promise { const exclude = new Set(opts?.excludeTables ?? []); const all: RowRef[] = []; for (const tbl of TABLES_WITH_STORAGE_KEYS) { if (exclude.has(tbl.table)) continue; all.push(...(await listKeysFor(tbl))); } return all; } // ─── streaming + sha256 verify ────────────────────────────────────────────── /** * Stream a file from `source` -> `target` while computing sha256 of the bytes * actually written. Re-fetches the target object and verifies a second time * to catch storage-side corruption. */ export async function copyAndVerify( source: StorageBackend, target: StorageBackend, ref: RowRef, ): Promise<{ sha256: string; sizeBytes: number }> { const stream = await source.get(ref.key); const chunks: Buffer[] = []; for await (const chunk of stream as Readable) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string)); } const buffer = Buffer.concat(chunks); const sha256 = createHash('sha256').update(buffer).digest('hex'); const putResult = await target.put(ref.key, buffer, { contentType: ref.contentType, sha256, sizeBytes: buffer.length, }); if (putResult.sha256 !== sha256) { throw new Error(`sha256 mismatch on put for ${ref.tableName}/${ref.pk}`); } // Re-fetch from the target and verify a second time. const verifyStream = await target.get(ref.key); const verifyChunks: Buffer[] = []; for await (const chunk of verifyStream as Readable) { verifyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string)); } const verifyBuf = Buffer.concat(verifyChunks); const verifySha = createHash('sha256').update(verifyBuf).digest('hex'); if (verifySha !== sha256) { throw new Error(`sha256 mismatch after round-trip for ${ref.tableName}/${ref.pk} (${ref.key})`); } return { sha256, sizeBytes: buffer.length }; } // ─── pre-flight ───────────────────────────────────────────────────────────── async function freeBytesAt(rootPath: string): Promise { const s = await statfs(rootPath); return Number(s.bavail) * Number(s.bsize); } async function flipBackendSetting(target: StorageBackendName, userId: string): Promise { const existing = await db.query.systemSettings.findFirst({ where: and(eq(systemSettings.key, 'storage_backend'), isNull(systemSettings.portId)), }); if (existing) { await db .update(systemSettings) .set({ value: target, updatedBy: userId, updatedAt: new Date() }) .where(and(eq(systemSettings.key, 'storage_backend'), isNull(systemSettings.portId))); } else { await db.insert(systemSettings).values({ key: 'storage_backend', value: target, portId: null, updatedBy: userId, }); } resetStorageBackendCache(); } // ─── main ─────────────────────────────────────────────────────────────────── export interface MigrationOptions { from: StorageBackendName; to: StorageBackendName; dryRun: boolean; /** Skip the file copy and just flip the active backend pointer. * Existing files become inaccessible until they're migrated later * or the backend is reverted. Rare - surfaced in the admin UI as * a clearly-warned alternative to switch + migrate. */ skipMigration?: boolean; /** Override for tests. */ source?: StorageBackend; target?: StorageBackend; /** Audit user id. */ userId?: string; } export interface MigrationResult { rowsConsidered: number; rowsMigrated: number; rowsSkippedAlreadyDone: number; totalBytes: number; flipped: boolean; dryRun: boolean; } export async function runMigration(opts: MigrationOptions): Promise { const lockResult = await db.execute(sql`SELECT pg_try_advisory_lock(${ADVISORY_LOCK_KEY}) as ok`); const lockRows = rowsOf(lockResult) as Array<{ ok: boolean }>; if (!lockRows[0]?.ok) { throw new Error('Could not acquire storage migration advisory lock'); } try { await ensureProgressTable(); let rowsConsidered = 0; let rowsMigrated = 0; let rowsSkippedAlreadyDone = 0; let totalBytes = 0; // Skip-migration shortcut: don't touch storage at all, just flip // the active-backend pointer. Existing files become unreachable // until a future migration. Surfaced as a clearly-warned option // in the admin UI; almost never the right choice. if (opts.skipMigration && !opts.dryRun) { await flipBackendSetting(opts.to, opts.userId ?? 'cli:migrate-storage'); return { rowsConsidered: 0, rowsMigrated: 0, rowsSkippedAlreadyDone: 0, totalBytes: 0, flipped: true, dryRun: false, }; } const source = opts.source ?? (await buildBackendForMigration(opts.from)); const target = opts.target ?? (await buildBackendForMigration(opts.to)); for (const tbl of TABLES_WITH_STORAGE_KEYS) { const refs = await listKeysFor(tbl); rowsConsidered += refs.length; // Pre-flight free-disk check when target is filesystem. if (opts.to === 'filesystem' && target instanceof FilesystemBackend) { const heads = await Promise.all( refs.map((r) => source.head(r.key).then((h) => h?.sizeBytes ?? 0)), ); const sumBytes = heads.reduce((a, b) => a + b, 0); const free = await freeBytesAt(process.cwd()); if (free < sumBytes * 1.2) { throw new Error( `Insufficient disk: need ${Math.round(sumBytes / 1e6)}MB + 20% margin, have ${Math.round(free / 1e6)}MB free`, ); } } for (const ref of refs) { if (await isRowMigrated(ref.tableName, ref.pk)) { rowsSkippedAlreadyDone += 1; continue; } if (opts.dryRun) { const head = await source.head(ref.key); totalBytes += head?.sizeBytes ?? 0; continue; } const { sha256, sizeBytes } = await copyAndVerify(source, target, ref); await markRowMigrated(ref.tableName, ref.pk, ref.key, sha256, sizeBytes); rowsMigrated += 1; totalBytes += sizeBytes; } } let flipped = false; if (!opts.dryRun) { await flipBackendSetting(opts.to, opts.userId ?? 'cli:migrate-storage'); flipped = true; } return { rowsConsidered, rowsMigrated, rowsSkippedAlreadyDone, totalBytes, flipped, dryRun: opts.dryRun, }; } finally { await db.execute(sql`SELECT pg_advisory_unlock(${ADVISORY_LOCK_KEY})`); } } async function buildBackendForMigration(name: StorageBackendName): Promise { if (name === 'filesystem') { return FilesystemBackend.create({ root: process.env.STORAGE_FILESYSTEM_ROOT ?? './storage', proxyHmacSecretEncrypted: null, }); } return S3Backend.create({}); }