feat(import): data model for the bulk CSV/XLSX importer
First increment of the importer (docs/superpowers/specs/2026-06-01-bulk-import-design.md): the three port-scoped tables, no changes to entity tables. - import_batches — one row per run: entity_type, filename, storage_key, status, conflict_policy, mapping_json, live counts, created_by, timestamps. - import_batch_rows — per-row action ledger (inserted/updated/skipped/errored) with entity_id + error; partial index on inserted rows powers Undo. - import_mappings — saved column mappings, unique per (port, entity, name). Migration 0090 applied via psql; schema re-exported from the index. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
50
src/lib/db/migrations/0090_bulk_import.sql
Normal file
50
src/lib/db/migrations/0090_bulk_import.sql
Normal file
@@ -0,0 +1,50 @@
|
||||
-- 0090_bulk_import.sql
|
||||
--
|
||||
-- Bulk CSV/XLSX importer (docs/superpowers/specs/2026-06-01-bulk-import-design.md).
|
||||
-- Three new tables, no changes to entity tables: a per-import batch header,
|
||||
-- a per-row action/undo ledger, and saved column mappings. All port-scoped.
|
||||
-- IDs are app-generated (crypto.randomUUID via Drizzle $defaultFn), so no DB
|
||||
-- default on the PK columns — matches the rest of the schema.
|
||||
|
||||
CREATE TABLE import_batches (
|
||||
id text PRIMARY KEY,
|
||||
port_id text NOT NULL REFERENCES ports(id) ON DELETE CASCADE,
|
||||
entity_type text NOT NULL,
|
||||
filename text NOT NULL,
|
||||
storage_key text,
|
||||
status text NOT NULL DEFAULT 'uploaded',
|
||||
conflict_policy text NOT NULL DEFAULT 'skip-matches',
|
||||
mapping_json jsonb,
|
||||
total_rows integer NOT NULL DEFAULT 0,
|
||||
inserted integer NOT NULL DEFAULT 0,
|
||||
updated integer NOT NULL DEFAULT 0,
|
||||
skipped integer NOT NULL DEFAULT 0,
|
||||
errored integer NOT NULL DEFAULT 0,
|
||||
created_by text NOT NULL,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
completed_at timestamptz
|
||||
);
|
||||
CREATE INDEX idx_import_batches_port ON import_batches (port_id, created_at DESC);
|
||||
|
||||
CREATE TABLE import_batch_rows (
|
||||
id text PRIMARY KEY,
|
||||
batch_id text NOT NULL REFERENCES import_batches(id) ON DELETE CASCADE,
|
||||
row_number integer NOT NULL,
|
||||
action text NOT NULL,
|
||||
entity_id text,
|
||||
error text
|
||||
);
|
||||
CREATE INDEX idx_import_batch_rows_batch ON import_batch_rows (batch_id, row_number);
|
||||
-- Undo walks only the rows a batch INSERTED; partial index keeps that scan tight.
|
||||
CREATE INDEX idx_import_batch_rows_inserted ON import_batch_rows (batch_id) WHERE action = 'inserted';
|
||||
|
||||
CREATE TABLE import_mappings (
|
||||
id text PRIMARY KEY,
|
||||
port_id text NOT NULL REFERENCES ports(id) ON DELETE CASCADE,
|
||||
entity_type text NOT NULL,
|
||||
name text NOT NULL,
|
||||
mapping_json jsonb NOT NULL,
|
||||
created_by text NOT NULL,
|
||||
created_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE UNIQUE INDEX uniq_import_mappings_name ON import_mappings (port_id, entity_type, name);
|
||||
90
src/lib/db/schema/imports.ts
Normal file
90
src/lib/db/schema/imports.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
/**
|
||||
* Bulk CSV/XLSX importer tables.
|
||||
*
|
||||
* See docs/superpowers/specs/2026-06-01-bulk-import-design.md. Three tables,
|
||||
* no changes to entity tables:
|
||||
* - `import_batches` — one row per import run (header + live counts).
|
||||
* - `import_batch_rows` — per-row action ledger; powers the error report
|
||||
* and inserts-only Undo.
|
||||
* - `import_mappings` — saved column mappings, reusable across runs.
|
||||
*/
|
||||
import { index, integer, jsonb, pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core';
|
||||
|
||||
import { ports } from './ports';
|
||||
|
||||
/** A saved or in-flight column mapping: target field key → source header. */
|
||||
export type ImportMappingJson = Record<string, string>;
|
||||
|
||||
export const importBatches = pgTable(
|
||||
'import_batches',
|
||||
{
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => crypto.randomUUID()),
|
||||
portId: text('port_id')
|
||||
.notNull()
|
||||
.references(() => ports.id, { onDelete: 'cascade' }),
|
||||
/** Which adapter this batch targets (ImportEntityKey). */
|
||||
entityType: text('entity_type').notNull(),
|
||||
filename: text('filename').notNull(),
|
||||
/** getStorageBackend() key of the uploaded file (re-read by the worker). */
|
||||
storageKey: text('storage_key'),
|
||||
/** uploaded | dry_run | committing | completed | failed | undone */
|
||||
status: text('status').notNull().default('uploaded'),
|
||||
/** skip-matches | update-matches | error-on-match */
|
||||
conflictPolicy: text('conflict_policy').notNull().default('skip-matches'),
|
||||
mappingJson: jsonb('mapping_json').$type<ImportMappingJson>(),
|
||||
totalRows: integer('total_rows').notNull().default(0),
|
||||
inserted: integer('inserted').notNull().default(0),
|
||||
updated: integer('updated').notNull().default(0),
|
||||
skipped: integer('skipped').notNull().default(0),
|
||||
errored: integer('errored').notNull().default(0),
|
||||
createdBy: text('created_by').notNull(),
|
||||
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
|
||||
completedAt: timestamp('completed_at', { withTimezone: true }),
|
||||
},
|
||||
(t) => [index('idx_import_batches_port').on(t.portId, t.createdAt)],
|
||||
);
|
||||
|
||||
export const importBatchRows = pgTable(
|
||||
'import_batch_rows',
|
||||
{
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => crypto.randomUUID()),
|
||||
batchId: text('batch_id')
|
||||
.notNull()
|
||||
.references(() => importBatches.id, { onDelete: 'cascade' }),
|
||||
/** 1-based source row number (after the header). */
|
||||
rowNumber: integer('row_number').notNull(),
|
||||
/** inserted | updated | skipped | errored */
|
||||
action: text('action').notNull(),
|
||||
/** Created/updated entity id (null for skipped/errored). */
|
||||
entityId: text('entity_id'),
|
||||
error: text('error'),
|
||||
},
|
||||
(t) => [index('idx_import_batch_rows_batch').on(t.batchId, t.rowNumber)],
|
||||
);
|
||||
|
||||
export const importMappings = pgTable(
|
||||
'import_mappings',
|
||||
{
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => crypto.randomUUID()),
|
||||
portId: text('port_id')
|
||||
.notNull()
|
||||
.references(() => ports.id, { onDelete: 'cascade' }),
|
||||
entityType: text('entity_type').notNull(),
|
||||
name: text('name').notNull(),
|
||||
mappingJson: jsonb('mapping_json').$type<ImportMappingJson>().notNull(),
|
||||
createdBy: text('created_by').notNull(),
|
||||
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
|
||||
},
|
||||
(t) => [uniqueIndex('uniq_import_mappings_name').on(t.portId, t.entityType, t.name)],
|
||||
);
|
||||
|
||||
export type ImportBatch = typeof importBatches.$inferSelect;
|
||||
export type NewImportBatch = typeof importBatches.$inferInsert;
|
||||
export type ImportBatchRow = typeof importBatchRows.$inferSelect;
|
||||
export type ImportMapping = typeof importMappings.$inferSelect;
|
||||
@@ -65,6 +65,9 @@ export * from './gdpr';
|
||||
// Migration ledger (one-shot scripts - NocoDB import etc.)
|
||||
export * from './migration';
|
||||
|
||||
// Bulk CSV/XLSX importer (batches, per-row ledger, saved mappings)
|
||||
export * from './imports';
|
||||
|
||||
// Website submissions (dual-write capture from the marketing site)
|
||||
export * from './website-submissions';
|
||||
|
||||
|
||||
Reference in New Issue
Block a user