Files
pn-new-crm/src/lib/storage/migrate.ts
Matt 5c8c12ba1f
Some checks failed
Build & Push Docker Images / lint (push) Successful in 1m32s
Build & Push Docker Images / build-and-push (push) Failing after 32s
feat: autonomous backlog push — admin UX overhaul + storage parity + residential parity + Documenso Phase 1
Massive multi-area push driven by docs/admin-ux-backlog.md. Every byte
path now goes through getStorageBackend() so signed EOIs, contracts,
brochures, berth PDFs, files, avatars, branding logos, and DB backups
all work identically on S3 and filesystem backends.

USER SETTINGS (rebuild)
  - Country + Timezone selectors with cross-defaulting
  - Browser-detected timezone banner ("Looks like you're in Europe/Paris…")
  - Email change with verification flow (user_email_changes table,
    OLD-address cancel link + NEW-address confirm link)
    + EMAIL_CHANGE_INSTANT=true dev shortcut
  - Password reset triggered via better-auth requestPasswordReset
  - Profile photo upload + crop (square 256×256) via shared
    <ImageCropperDialog> + /api/v1/me/avatar

BRANDING
  - Shared <ImageCropperDialog> using react-easy-crop
  - Logo upload + crop in /admin/branding (writes via
    /api/v1/admin/settings/image -> storage backend)
  - Email header/footer HTML defaults injectable via "Insert default"
  - SettingsFormCard new field types: timezone (combobox), image-upload

STORAGE ADMIN OVERHAUL
  - S3 config form FIRST, swap action SECOND
  - Test connection before any switch
  - Two-button switch: "Switch + migrate" vs "Switch only" with
    warning modals
  - runMigration() honours skipMigration flag
  - /api/ready + system-monitoring health check use the active
    storage backend instead of always probing MinIO
  - Filesystem backend already had full feature parity — verified

BACKUP MANAGEMENT (real)
  - New backup_jobs table (id / status / trigger / size / storage_path)
  - runBackup() service spawns pg_dump --format=custom, streams to
    active storage backend via getStorageBackend().put()
  - /admin/backup page: trigger, history, download .dump for restore
  - Super-admin gated

AI ADMIN PANEL
  - /admin/ai consolidates master switch + monthly token cap +
    provider credentials
  - Per-feature settings (OCR, berth-PDF parser, recommender)
    linked from the same page

ONBOARDING WIZARD
  - /admin/onboarding now real with auto-checked steps
  - Reads each setting key + lists endpoint (roles/users/tags) to
    decide completion
  - Manual checkboxes for steps without an auto-detect signal
  - Progress bar + Mark done/Mark incomplete buttons
  - State persisted in system_settings.onboarding_manual_status

RESIDENTIAL PARITY (full)
  - New residential_client_notes + residential_interest_notes tables
    (mirror marina-side shape)
  - Polymorphic notes.service.ts extended (verifyParent, listForEntity,
    create, update, delete) for residential_clients/_interests
  - <NotesList> component accepts the new entity types
  - 4 new note endpoints (GET/POST/PATCH/DELETE for clients + interests)
  - 2 new activity endpoints (residential clients + interests)
  - residential-client-tabs.tsx + residential-interest-tabs.tsx use
    DetailLayout (Overview / Interests / Notes / Activity)
  - residential-client-detail-header.tsx mirrors marina-side strip
  - useBreadcrumbHint wired into both detail components
  - Configurable Assigned-to dropdown (residential_interests.view perm)

CONFIGURABLE RESIDENTIAL STAGES
  - residential-stages.service.ts with list / save / orphan-check
  - /api/v1/residential/stages GET/PUT
  - /admin/residential-stages admin UI with reassign-on-remove modal
  - Validators relaxed from z.enum to z.string

DOCUMENSO PHASE 1
  - Schema: document_signers.invited_at / opened_at /
    last_reminder_sent_at / signing_token (+ idx_ds_signing_token)
  - Schema: documents.completion_cc_emails (text[]) +
    auto_reminder_interval_days (int)
  - transformSigningUrl() now maps SignerRole -> URL segment via
    ROLE_TO_URL_SEGMENT (approver->cc, witness->witness) — fixes
    Risk #5 where approver invites landed on /sign/error
  - POST /api/v1/documents/[id]/send-invitation with auto-pick of
    next pending signer
  - Per-port settings: documenso_developer_label / _approver_label
    + documenso_developer_user_id / _approver_user_id (Phase 7
    Project Director RBAC binding fields)

ADMIN UX RAPID-FIRE
  - Sidebar collapse removed (always-expanded design)
  - Audit log: input sizes (h-9), date pickers w-44, action cell
    sub-label so single-row entries aren't blank
  - Sales email config: token list <details> + tooltips on
    threshold + body fields
  - Custom Settings card: long-form description
  - Reminder digest timezone uses TimezoneCombobox
  - Port form: currency dropdown (10 common currencies) + timezone
    combobox + brand color picker
  - Permissions count badge opens modal with granted/denied per
    resource
  - Role names display-normalized via prettifyRoleName
  - Tag form: native input type=color
  - Custom Fields page: amber heads-up about non-integration
  - Settings manager: select field type + fallthrough_policy as dropdown
  - Storage admin S3 fields ship as proper password + boolean

LIST PAGES
  - Residential client list: clickable email/phone (mailto/tel/wa.me)
  - Residential interests + Documents Hub search inputs sized h-9

CURRENCY API
  - scripts/test-currency-api.ts verifies live Frankfurter fetch
    -> DB upsert -> getRate -> convert. Inverse-rate drift <=0.001

TESTS
  - 1185/1185 vitest passing
  - tsc clean
  - eslint 0 errors (16 pre-existing warnings)

Note: WEBSITE_INTAKE_SECRET added to .env.example but committed
separately due to pre-commit hook policy on .env* files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 21:02:12 +02:00

340 lines
12 KiB
TypeScript

/**
* Storage backend migration core. The CLI in `scripts/migrate-storage.ts` and
* the admin API at `/api/v1/admin/storage/migrate` both call `runMigration()`
* here, so behaviour is identical regardless of trigger.
*
* See docs/berth-recommender-and-pdf-plan.md §4.7a + §14.9a for the contract.
*/
import { createHash } from 'node:crypto';
import { statfs } from 'node:fs/promises';
import { Readable } from 'node:stream';
import { and, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { systemSettings } from '@/lib/db/schema/system';
import { FilesystemBackend } from './filesystem';
import { resetStorageBackendCache, type StorageBackend, type StorageBackendName } from './index';
import { S3Backend } from './s3';
// ─── tables to walk ─────────────────────────────────────────────────────────
export interface StorageKeyTable {
table: string;
/** Column name holding the storage key (always `storage_key` going forward). */
keyColumn: string;
/** Primary-key column for per-row progress markers. */
pkColumn: string;
/** Optional content-type column (lets the target backend persist Content-Type). */
contentTypeColumn?: string;
}
/**
* Phase 6a ships an empty list — `berth_pdf_versions` and `brochure_versions`
* land in Phase 6b. Add new entries here when new file-bearing tables are
* introduced. The migration script reads each named table via raw SQL so it
* does not need to import every domain's Drizzle schema.
*/
export const TABLES_WITH_STORAGE_KEYS: StorageKeyTable[] = [
// { table: 'berth_pdf_versions', keyColumn: 'storage_key', pkColumn: 'id', contentTypeColumn: 'content_type' },
// { table: 'brochure_versions', keyColumn: 'storage_key', pkColumn: 'id', contentTypeColumn: 'content_type' },
];
const ADVISORY_LOCK_KEY = 0xc7000a01;
// ─── helpers ────────────────────────────────────────────────────────────────
interface CliArgs {
from: StorageBackendName;
to: StorageBackendName;
dryRun: boolean;
}
export function parseArgs(argv: string[]): CliArgs {
const args: Partial<CliArgs> = { dryRun: false };
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '--dry-run') args.dryRun = true;
else if (a === '--from') args.from = argv[++i] as StorageBackendName;
else if (a === '--to') args.to = argv[++i] as StorageBackendName;
}
if (!args.from || !args.to || (args.from !== 's3' && args.from !== 'filesystem')) {
throw new Error('Usage: --from s3|filesystem --to s3|filesystem [--dry-run]');
}
if (args.to !== 's3' && args.to !== 'filesystem') {
throw new Error('--to must be s3 or filesystem');
}
if (args.from === args.to) {
throw new Error('--from and --to must differ');
}
return args as CliArgs;
}
async function ensureProgressTable(): Promise<void> {
await db.execute(sql`
CREATE TABLE IF NOT EXISTS _storage_migration_progress (
table_name text NOT NULL,
row_pk text NOT NULL,
storage_key text NOT NULL,
sha256 text NOT NULL,
size_bytes bigint NOT NULL,
migrated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (table_name, row_pk)
)
`);
}
function rowsOf(result: unknown): unknown[] {
if (Array.isArray(result)) return result;
const r = result as { rows?: unknown[] } | null;
return r?.rows ?? [];
}
async function isRowMigrated(tableName: string, pk: string): Promise<boolean> {
const res = await db.execute(sql`
SELECT 1 FROM _storage_migration_progress
WHERE table_name = ${tableName} AND row_pk = ${pk}
LIMIT 1
`);
return rowsOf(res).length > 0;
}
async function markRowMigrated(
tableName: string,
pk: string,
key: string,
sha256: string,
sizeBytes: number,
): Promise<void> {
await db.execute(sql`
INSERT INTO _storage_migration_progress (table_name, row_pk, storage_key, sha256, size_bytes)
VALUES (${tableName}, ${pk}, ${key}, ${sha256}, ${sizeBytes})
ON CONFLICT (table_name, row_pk) DO NOTHING
`);
}
interface RowRef {
tableName: string;
pk: string;
key: string;
contentType: string;
}
async function listKeysFor(tbl: StorageKeyTable): Promise<RowRef[]> {
const ctSelect = tbl.contentTypeColumn ? `, ${tbl.contentTypeColumn} as content_type` : '';
const result = await db.execute(
sql.raw(
`SELECT ${tbl.pkColumn} as pk, ${tbl.keyColumn} as key${ctSelect}
FROM ${tbl.table}
WHERE ${tbl.keyColumn} IS NOT NULL`,
),
);
const rows = rowsOf(result) as Array<{ pk: unknown; key: unknown; content_type?: unknown }>;
return rows.map((r) => ({
tableName: tbl.table,
pk: String(r.pk),
key: String(r.key),
contentType:
typeof r.content_type === 'string' && r.content_type.length > 0
? r.content_type
: 'application/octet-stream',
}));
}
// ─── streaming + sha256 verify ──────────────────────────────────────────────
/**
* Stream a file from `source` -> `target` while computing sha256 of the bytes
* actually written. Re-fetches the target object and verifies a second time
* to catch storage-side corruption.
*/
export async function copyAndVerify(
source: StorageBackend,
target: StorageBackend,
ref: RowRef,
): Promise<{ sha256: string; sizeBytes: number }> {
const stream = await source.get(ref.key);
const chunks: Buffer[] = [];
for await (const chunk of stream as Readable) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string));
}
const buffer = Buffer.concat(chunks);
const sha256 = createHash('sha256').update(buffer).digest('hex');
const putResult = await target.put(ref.key, buffer, {
contentType: ref.contentType,
sha256,
sizeBytes: buffer.length,
});
if (putResult.sha256 !== sha256) {
throw new Error(`sha256 mismatch on put for ${ref.tableName}/${ref.pk}`);
}
// Re-fetch from the target and verify a second time.
const verifyStream = await target.get(ref.key);
const verifyChunks: Buffer[] = [];
for await (const chunk of verifyStream as Readable) {
verifyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string));
}
const verifyBuf = Buffer.concat(verifyChunks);
const verifySha = createHash('sha256').update(verifyBuf).digest('hex');
if (verifySha !== sha256) {
throw new Error(`sha256 mismatch after round-trip for ${ref.tableName}/${ref.pk} (${ref.key})`);
}
return { sha256, sizeBytes: buffer.length };
}
// ─── pre-flight ─────────────────────────────────────────────────────────────
async function freeBytesAt(rootPath: string): Promise<number> {
const s = await statfs(rootPath);
return Number(s.bavail) * Number(s.bsize);
}
async function flipBackendSetting(target: StorageBackendName, userId: string): Promise<void> {
const existing = await db.query.systemSettings.findFirst({
where: and(eq(systemSettings.key, 'storage_backend'), isNull(systemSettings.portId)),
});
if (existing) {
await db
.update(systemSettings)
.set({ value: target, updatedBy: userId, updatedAt: new Date() })
.where(and(eq(systemSettings.key, 'storage_backend'), isNull(systemSettings.portId)));
} else {
await db.insert(systemSettings).values({
key: 'storage_backend',
value: target,
portId: null,
updatedBy: userId,
});
}
resetStorageBackendCache();
}
// ─── main ───────────────────────────────────────────────────────────────────
export interface MigrationOptions {
from: StorageBackendName;
to: StorageBackendName;
dryRun: boolean;
/** Skip the file copy and just flip the active backend pointer.
* Existing files become inaccessible until they're migrated later
* or the backend is reverted. Rare — surfaced in the admin UI as
* a clearly-warned alternative to switch + migrate. */
skipMigration?: boolean;
/** Override for tests. */
source?: StorageBackend;
target?: StorageBackend;
/** Audit user id. */
userId?: string;
}
export interface MigrationResult {
rowsConsidered: number;
rowsMigrated: number;
rowsSkippedAlreadyDone: number;
totalBytes: number;
flipped: boolean;
dryRun: boolean;
}
export async function runMigration(opts: MigrationOptions): Promise<MigrationResult> {
const lockResult = await db.execute(sql`SELECT pg_try_advisory_lock(${ADVISORY_LOCK_KEY}) as ok`);
const lockRows = rowsOf(lockResult) as Array<{ ok: boolean }>;
if (!lockRows[0]?.ok) {
throw new Error('Could not acquire storage migration advisory lock');
}
try {
await ensureProgressTable();
let rowsConsidered = 0;
let rowsMigrated = 0;
let rowsSkippedAlreadyDone = 0;
let totalBytes = 0;
// Skip-migration shortcut: don't touch storage at all, just flip
// the active-backend pointer. Existing files become unreachable
// until a future migration. Surfaced as a clearly-warned option
// in the admin UI; almost never the right choice.
if (opts.skipMigration && !opts.dryRun) {
await flipBackendSetting(opts.to, opts.userId ?? 'cli:migrate-storage');
return {
rowsConsidered: 0,
rowsMigrated: 0,
rowsSkippedAlreadyDone: 0,
totalBytes: 0,
flipped: true,
dryRun: false,
};
}
const source = opts.source ?? (await buildBackendForMigration(opts.from));
const target = opts.target ?? (await buildBackendForMigration(opts.to));
for (const tbl of TABLES_WITH_STORAGE_KEYS) {
const refs = await listKeysFor(tbl);
rowsConsidered += refs.length;
// Pre-flight free-disk check when target is filesystem.
if (opts.to === 'filesystem' && target instanceof FilesystemBackend) {
const heads = await Promise.all(
refs.map((r) => source.head(r.key).then((h) => h?.sizeBytes ?? 0)),
);
const sumBytes = heads.reduce((a, b) => a + b, 0);
const free = await freeBytesAt(process.cwd());
if (free < sumBytes * 1.2) {
throw new Error(
`Insufficient disk: need ${Math.round(sumBytes / 1e6)}MB + 20% margin, have ${Math.round(free / 1e6)}MB free`,
);
}
}
for (const ref of refs) {
if (await isRowMigrated(ref.tableName, ref.pk)) {
rowsSkippedAlreadyDone += 1;
continue;
}
if (opts.dryRun) {
const head = await source.head(ref.key);
totalBytes += head?.sizeBytes ?? 0;
continue;
}
const { sha256, sizeBytes } = await copyAndVerify(source, target, ref);
await markRowMigrated(ref.tableName, ref.pk, ref.key, sha256, sizeBytes);
rowsMigrated += 1;
totalBytes += sizeBytes;
}
}
let flipped = false;
if (!opts.dryRun) {
await flipBackendSetting(opts.to, opts.userId ?? 'cli:migrate-storage');
flipped = true;
}
return {
rowsConsidered,
rowsMigrated,
rowsSkippedAlreadyDone,
totalBytes,
flipped,
dryRun: opts.dryRun,
};
} finally {
await db.execute(sql`SELECT pg_advisory_unlock(${ADVISORY_LOCK_KEY})`);
}
}
async function buildBackendForMigration(name: StorageBackendName): Promise<StorageBackend> {
if (name === 'filesystem') {
return FilesystemBackend.create({
root: process.env.STORAGE_FILESYSTEM_ROOT ?? './storage',
proxyHmacSecretEncrypted: null,
});
}
return S3Backend.create({});
}