feat(documents): importer for organized S3/filesystem buckets

One-shot script that walks an existing organized bucket tree, builds
matching document_folders rows mirroring the path, then inserts
documents + files rows pointing at the existing storage keys verbatim
— no path rewrite. For migrating from a legacy MinIO bucket whose
folder structure is already the source of truth.

Idempotency:
  • Folders: sibling-name unique index swallows duplicate creates;
    we reuse the row on ConflictError.
  • Documents: skipped when (port_id, fileStoragePath) already exists.

Adds StorageBackend.listByPrefix (recursive readdir on filesystem;
listObjectsV2 stream-drain on s3) — the first one-shot caller, not
a hot path. Pure parseImportPath helper extracted to its own module
and unit-tested for trailing slashes, empty intermediate segments,
prefix mismatch, and special-character folder names (8 tests).

Audit log per imported doc carries source='organized-bucket-importer',
storageKey, and folderSegments so the documents inspector can filter
on imports later.

CLI:
  pnpm tsx scripts/import-organized-documents.ts \\
      --port-slug <slug> \\
      --bucket-prefix "legacy-imports/" \\
      (--dry-run | --apply) [--uploaded-by <userId>]

Folds in Prettier post-hook drift on documents.service.ts +
download handler — same lint-staged formatting the earlier commits
already absorbed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-10 16:53:51 +02:00
parent e790ff708b
commit ef63e86fde
8 changed files with 495 additions and 9 deletions

View File

@@ -0,0 +1,323 @@
/**
* Importer for an organized S3 / filesystem bucket whose folder structure
* already represents real organisation. Walks every key under `--bucket-prefix`,
* builds matching `document_folders` rows mirroring the path, then inserts
* `documents` + `files` rows pointing at the existing storage keys verbatim
* — no path rewrite. Use when migrating from a legacy MinIO bucket whose
* tree is the source of truth.
*
* Usage:
* pnpm tsx scripts/import-organized-documents.ts --port-slug <slug> \
* --bucket-prefix "legacy-imports/" --dry-run
* pnpm tsx scripts/import-organized-documents.ts --port-slug <slug> \
* --bucket-prefix "legacy-imports/" --apply
*
* Idempotency:
* - Folders: sibling-name unique index swallows duplicate creates and we
* reuse the existing row.
* - Documents: skipped when a row with `(port_id, fileStoragePath)` already
* exists — the storage key is the natural identity for this importer.
*/
import 'dotenv/config';
import path from 'node:path';
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { ports } from '@/lib/db/schema/ports';
import { documents, documentFolders, files } from '@/lib/db/schema/documents';
import { user } from '@/lib/db/schema/users';
import { createAuditLog } from '@/lib/audit';
import { ConflictError } from '@/lib/errors';
import { createFolder } from '@/lib/services/document-folders.service';
import { parseImportPath } from '@/lib/services/document-import';
import { getStorageBackend } from '@/lib/storage';
interface CliArgs {
portSlug: string;
bucketPrefix: string;
dryRun: boolean;
apply: boolean;
uploadedByUserId: string | null;
}
function parseArgs(argv: string[]): CliArgs {
const args: CliArgs = {
portSlug: '',
bucketPrefix: '',
dryRun: false,
apply: false,
uploadedByUserId: null,
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i]!;
if (a === '--port-slug') args.portSlug = argv[++i] ?? '';
else if (a === '--bucket-prefix') args.bucketPrefix = argv[++i] ?? '';
else if (a === '--uploaded-by') args.uploadedByUserId = argv[++i] ?? null;
else if (a === '--dry-run') args.dryRun = true;
else if (a === '--apply') args.apply = true;
else if (a === '-h' || a === '--help') {
printHelp();
process.exit(0);
} else {
console.error(`Unknown argument: ${a}`);
printHelp();
process.exit(1);
}
}
if (!args.portSlug) {
console.error('Missing required --port-slug');
process.exit(1);
}
if (!args.dryRun && !args.apply) {
console.error('Must specify either --dry-run or --apply.');
process.exit(1);
}
if (args.dryRun && args.apply) {
console.error('--dry-run and --apply are mutually exclusive.');
process.exit(1);
}
return args;
}
function printHelp(): void {
console.log(`Usage:
pnpm tsx scripts/import-organized-documents.ts \\
--port-slug <slug> \\
--bucket-prefix <prefix> \\
(--dry-run | --apply) \\
[--uploaded-by <userId>]
`);
}
interface PlannedDoc {
key: string;
folderSegments: string[];
filename: string;
bytes: number | null;
contentType: string;
alreadyImported: boolean;
}
const CONTENT_TYPE_BY_EXT: Record<string, string> = {
'.pdf': 'application/pdf',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.txt': 'text/plain',
'.csv': 'text/csv',
'.doc': 'application/msword',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.xls': 'application/vnd.ms-excel',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
};
function guessContentType(filename: string): string {
const ext = path.extname(filename).toLowerCase();
return CONTENT_TYPE_BY_EXT[ext] ?? 'application/octet-stream';
}
async function main(): Promise<void> {
const args = parseArgs(process.argv.slice(2));
const port = await db.query.ports.findFirst({ where: eq(ports.slug, args.portSlug) });
if (!port) {
console.error(`Port not found: ${args.portSlug}`);
process.exit(1);
}
let uploadedById = args.uploadedByUserId;
if (!uploadedById) {
const [u] = await db.select({ id: user.id }).from(user).limit(1);
if (!u) {
console.error(
'No user rows exist; pass --uploaded-by <userId> or seed at least one user before running.',
);
process.exit(1);
}
uploadedById = u.id;
console.log(`No --uploaded-by provided; falling back to first user: ${uploadedById}`);
}
const backend = await getStorageBackend();
console.log(`Listing keys under prefix "${args.bucketPrefix}" via ${backend.name} backend …`);
const keys = await backend.listByPrefix(args.bucketPrefix);
console.log(`Found ${keys.length} candidate keys.`);
const plan: PlannedDoc[] = [];
for (const key of keys) {
const parsed = parseImportPath(args.bucketPrefix, key);
if (!parsed.filename) continue;
const head = await backend.head(key);
const existing = await db.query.files.findFirst({
where: and(eq(files.portId, port.id), eq(files.storagePath, key)),
columns: { id: true },
});
plan.push({
key,
folderSegments: parsed.folderSegments,
filename: parsed.filename,
bytes: head?.sizeBytes ?? null,
contentType: head?.contentType ?? guessContentType(parsed.filename),
alreadyImported: !!existing,
});
}
printPlan(plan);
if (args.dryRun) {
console.log('\nDry-run complete. No changes written.');
return;
}
const folderIdByPath = new Map<string, string | null>();
folderIdByPath.set('', null);
let createdCount = 0;
let skippedCount = 0;
for (const entry of plan) {
if (entry.alreadyImported) {
skippedCount += 1;
continue;
}
const folderId = await ensureFolderChain(
port.id,
uploadedById,
entry.folderSegments,
folderIdByPath,
);
await db.transaction(async (tx) => {
const [fileRow] = await tx
.insert(files)
.values({
portId: port.id,
filename: entry.filename,
originalName: entry.filename,
mimeType: entry.contentType,
sizeBytes: entry.bytes !== null ? String(entry.bytes) : null,
storagePath: entry.key,
uploadedBy: uploadedById,
category: 'misc',
})
.returning();
const [docRow] = await tx
.insert(documents)
.values({
portId: port.id,
documentType: 'other',
title: entry.filename,
createdBy: uploadedById,
folderId,
fileId: fileRow!.id,
status: 'completed',
isManualUpload: true,
})
.returning();
void createAuditLog({
userId: uploadedById,
portId: port.id,
action: 'create',
entityType: 'document',
entityId: docRow!.id,
metadata: {
source: 'organized-bucket-importer',
storageKey: entry.key,
folderSegments: entry.folderSegments,
},
});
});
createdCount += 1;
console.log(`✓ Imported ${entry.key}`);
}
console.log(`\nDone. Created ${createdCount} documents, skipped ${skippedCount} (already imported).`);
}
async function ensureFolderChain(
portId: string,
userId: string,
segments: string[],
cache: Map<string, string | null>,
): Promise<string | null> {
if (segments.length === 0) return null;
let parentId: string | null = null;
for (let i = 0; i < segments.length; i += 1) {
const pathKey = segments.slice(0, i + 1).join('/');
const cached = cache.get(pathKey);
if (cached !== undefined) {
parentId = cached;
continue;
}
const name = segments[i]!;
parentId = await createOrFindFolder(portId, userId, name, parentId);
cache.set(pathKey, parentId);
}
return parentId;
}
async function createOrFindFolder(
portId: string,
userId: string,
name: string,
parentId: string | null,
): Promise<string> {
try {
const created = await createFolder(portId, userId, { name, parentId });
return created.id;
} catch (err) {
if (!(err instanceof ConflictError)) throw err;
// Sibling-name unique index hit — fetch the existing row so the import
// remains idempotent across re-runs.
const trimmed = name.trim();
const candidates = await db.query.documentFolders.findMany({
where: parentId
? and(eq(documentFolders.portId, portId), eq(documentFolders.parentId, parentId))
: eq(documentFolders.portId, portId),
});
const existing = candidates.find(
(row) =>
(parentId ? row.parentId === parentId : row.parentId === null) &&
row.name.toLowerCase() === trimmed.toLowerCase(),
);
if (!existing) throw err;
return existing.id;
}
}
function printPlan(plan: PlannedDoc[]): void {
const grouped = new Map<string, PlannedDoc[]>();
for (const entry of plan) {
const folder = entry.folderSegments.join('/') || '(root)';
if (!grouped.has(folder)) grouped.set(folder, []);
grouped.get(folder)!.push(entry);
}
const folderNames = Array.from(grouped.keys()).sort();
console.log('\nPlan:');
for (const folder of folderNames) {
console.log(` ${folder}/`);
for (const entry of grouped.get(folder)!) {
const flag = entry.alreadyImported ? '·' : '+';
const size = entry.bytes !== null ? ` (${entry.bytes}B)` : '';
console.log(` ${flag} ${entry.filename}${size}`);
}
}
const newCount = plan.filter((p) => !p.alreadyImported).length;
const dupCount = plan.length - newCount;
console.log(`\nTotal: ${plan.length} keys → ${newCount} new, ${dupCount} already imported.`);
}
main()
.then(() => process.exit(0))
.catch((err) => {
console.error(err);
process.exit(1);
});

View File

@@ -73,11 +73,7 @@ export const downloadHandler: RouteHandler<DownloadParams> = async (_req, ctx, p
} }
}; };
function buildExpectedSlug( function buildExpectedSlug(folderId: string | null, filename: string, tree: FolderNode[]): string {
folderId: string | null,
filename: string,
tree: FolderNode[],
): string {
const segments: string[] = []; const segments: string[] = [];
if (folderId) { if (folderId) {
const path = findFolderPath(tree, folderId); const path = findFolderPath(tree, folderId);

View File

@@ -0,0 +1,48 @@
/**
* Pure helpers for the organized-bucket document importer
* (`scripts/import-organized-documents.ts`).
*
* The script walks an existing storage prefix that already represents real
* organisation (e.g. `legacy-imports/Deals 2026/Q1/contract.pdf`) and
* materialises matching `document_folders` + `documents` rows in the CRM
* without rewriting the storage keys. Splitting these helpers out of the
* script body makes the path-parser unit-testable in isolation.
*/
export interface ParsedImportPath {
/** Folder names from outermost to innermost; empty when the file is at the prefix root. */
folderSegments: string[];
/** Filename only, never empty. */
filename: string;
}
/**
* Decompose a storage key into folder segments + filename relative to the
* importer prefix. Both `prefix` and `key` use POSIX separators (the
* filesystem backend's `listByPrefix` already normalises Windows paths).
*
* Edge cases:
* - Trailing slashes on prefix are tolerated (`legacy/` ≡ `legacy`).
* - Empty intermediate segments (`a//b`) collapse to `[a, b]`.
* - Leading-prefix mismatch throws — the caller should never feed in keys
* outside the prefix it asked the backend to list.
* - A key that ends in `/` (directory placeholder) yields an empty
* filename — the caller must filter those out before invoking.
*/
export function parseImportPath(prefix: string, key: string): ParsedImportPath {
const normalizedPrefix = prefix.replace(/\/+$/, '');
let relative = key;
if (normalizedPrefix.length > 0) {
if (!key.startsWith(`${normalizedPrefix}/`)) {
throw new Error(`Key "${key}" is not under prefix "${prefix}"`);
}
relative = key.slice(normalizedPrefix.length + 1);
}
const parts = relative.split('/').filter((segment) => segment.length > 0);
if (parts.length === 0) {
throw new Error(`Key "${key}" has no filename after stripping prefix`);
}
const filename = parts.pop()!;
return { folderSegments: parts, filename };
}

View File

@@ -307,10 +307,7 @@ async function hydrateDocumentsWithDownloadUrl(
const filename = row.fileId ? (filenameById.get(row.fileId) ?? null) : null; const filename = row.fileId ? (filenameById.get(row.fileId) ?? null) : null;
return { return {
...row, ...row,
downloadUrl: buildDocumentDownloadUrl( downloadUrl: buildDocumentDownloadUrl({ id: row.id, folderId: row.folderId, filename }, tree),
{ id: row.id, folderId: row.folderId, filename },
tree,
),
}; };
}); });
} }

View File

@@ -361,6 +361,43 @@ export class FilesystemBackend implements StorageBackend {
}; };
} }
/**
* Recursive readdir under `${root}/${prefix}`. Returns relative-to-root
* keys using POSIX separators, sorted alphabetically. Empty prefix lists
* every file in the storage root. Used by one-shot importers; not a hot
* path. We tolerate ENOENT (prefix doesn't exist) by returning [] so the
* caller doesn't have to special-case empty trees.
*/
async listByPrefix(prefix: string): Promise<string[]> {
const startAbs = prefix
? this.resolveKey(prefix.replace(/\/+$/, ''))
: this.rootResolved;
const out: string[] = [];
async function walk(dir: string): Promise<void> {
let entries: import('node:fs').Dirent[];
try {
entries = await fs.readdir(dir, { withFileTypes: true });
} catch (err) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT') return;
throw err;
}
for (const entry of entries) {
const child = path.join(dir, entry.name);
if (entry.isDirectory()) {
await walk(child);
} else if (entry.isFile()) {
out.push(child);
}
}
}
await walk(startAbs);
return out
.map((abs) => path.relative(this.rootResolved, abs).split(path.sep).join('/'))
.sort();
}
/** Used by the proxy route — returns the validated absolute path. */ /** Used by the proxy route — returns the validated absolute path. */
resolveKeyForProxy(key: string): string { resolveKeyForProxy(key: string): string {
return this.resolveKey(key); return this.resolveKey(key);

View File

@@ -72,6 +72,15 @@ export interface StorageBackend {
/** Generate a short-lived URL the browser can GET from. */ /** Generate a short-lived URL the browser can GET from. */
presignDownload(key: string, opts: PresignOpts): Promise<{ url: string; expiresAt: Date }>; presignDownload(key: string, opts: PresignOpts): Promise<{ url: string; expiresAt: Date }>;
/**
* Recursively list keys under `prefix`. Returns the relative key for each
* object, sorted alphabetically. Empty prefix means "the entire bucket /
* storage root". Used by one-shot importers (e.g. organized-bucket
* document import) that need to walk a flat key namespace; not meant for
* runtime hot paths.
*/
listByPrefix(prefix: string): Promise<string[]>;
readonly name: StorageBackendName; readonly name: StorageBackendName;
} }

View File

@@ -211,6 +211,22 @@ export class S3Backend implements StorageBackend {
return { url, expiresAt: new Date(Date.now() + expiry * 1000) }; return { url, expiresAt: new Date(Date.now() + expiry * 1000) };
} }
/**
* Recursive listObjectsV2 walk under `prefix`. The minio-js stream emits
* one entry per object; we drain it into a flat key array sorted
* alphabetically. Used by one-shot importers; not a hot path. Object
* "directories" (zero-byte placeholders ending in `/`) are filtered out.
*/
async listByPrefix(prefix: string): Promise<string[]> {
const stream = this.client.listObjectsV2(this.bucket, prefix, true);
const keys: string[] = [];
for await (const obj of stream as AsyncIterable<{ name?: string }>) {
if (obj.name && !obj.name.endsWith('/')) keys.push(obj.name);
}
keys.sort();
return keys;
}
/** Used by the admin UI's "Test connection" button. */ /** Used by the admin UI's "Test connection" button. */
async healthCheck(): Promise<{ ok: true } | { ok: false; error: string }> { async healthCheck(): Promise<{ ok: true } | { ok: false; error: string }> {
const sentinelKey = `_health/${Date.now()}.txt`; const sentinelKey = `_health/${Date.now()}.txt`;

View File

@@ -0,0 +1,60 @@
import { describe, expect, it } from 'vitest';
import { parseImportPath } from '@/lib/services/document-import';
describe('parseImportPath', () => {
it('splits a nested key into folders + filename', () => {
expect(parseImportPath('legacy', 'legacy/Deals 2026/Q1/contract.pdf')).toEqual({
folderSegments: ['Deals 2026', 'Q1'],
filename: 'contract.pdf',
});
});
it('returns empty folderSegments for a file at the prefix root', () => {
expect(parseImportPath('legacy', 'legacy/index.pdf')).toEqual({
folderSegments: [],
filename: 'index.pdf',
});
});
it('tolerates trailing slashes on the prefix', () => {
expect(parseImportPath('legacy/', 'legacy/Deals/x.pdf')).toEqual({
folderSegments: ['Deals'],
filename: 'x.pdf',
});
expect(parseImportPath('legacy///', 'legacy/Deals/x.pdf')).toEqual({
folderSegments: ['Deals'],
filename: 'x.pdf',
});
});
it('collapses empty intermediate segments', () => {
expect(parseImportPath('legacy', 'legacy/a//b/c.pdf')).toEqual({
folderSegments: ['a', 'b'],
filename: 'c.pdf',
});
});
it('handles an empty prefix as "list-the-whole-bucket"', () => {
expect(parseImportPath('', 'Folder/file.pdf')).toEqual({
folderSegments: ['Folder'],
filename: 'file.pdf',
});
});
it('preserves special characters in folder names', () => {
expect(parseImportPath('', "Q1 — Year's End/contract & rider.pdf")).toEqual({
folderSegments: ["Q1 — Year's End"],
filename: 'contract & rider.pdf',
});
});
it('throws when the key is not under the prefix', () => {
expect(() => parseImportPath('legacy', 'other/x.pdf')).toThrow(/not under prefix/);
});
it('throws when the relative path has no filename', () => {
expect(() => parseImportPath('legacy', 'legacy/')).toThrow(/no filename/);
expect(() => parseImportPath('', '')).toThrow(/no filename/);
});
});