import { and, eq, gte, lte, inArray, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { berths, berthTags, berthWaitingList, berthMaintenanceLog } from '@/lib/db/schema/berths'; import { clients } from '@/lib/db/schema/clients'; import { interestBerths, interests } from '@/lib/db/schema/interests'; import { tags } from '@/lib/db/schema/system'; import { PIPELINE_STAGES } from '@/lib/constants'; import { createAuditLog, toAuditJson, type AuditMeta } from '@/lib/audit'; import { activeInterestsWhere } from '@/lib/services/active-interest'; import { diffEntity } from '@/lib/entity-diff'; import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors'; import { buildListQuery } from '@/lib/db/query-builder'; import { emitToRoom } from '@/lib/socket/server'; import { setEntityTags } from '@/lib/services/entity-tags.helper'; import { getPortBerthsDefaultCurrency } from '@/lib/services/port-config'; import { sortByMooring } from '@/lib/utils/mooring-sort'; import type { CreateBerthInput, UpdateBerthInput, UpdateBerthStatusInput, ListBerthsQuery, AddMaintenanceLogInput, UpdateWaitingListInput, } from '@/lib/validators/berths'; // ─── List ───────────────────────────────────────────────────────────────────── export async function listBerths(portId: string, query: ListBerthsQuery) { const filters = []; if (query.status) { filters.push(eq(berths.status, query.status)); } if (query.area) { filters.push(eq(berths.area, query.area)); } if (query.minLength !== undefined) { filters.push(gte(berths.lengthM, String(query.minLength))); } if (query.maxLength !== undefined) { filters.push(lte(berths.lengthM, String(query.maxLength))); } if (query.minPrice !== undefined) { filters.push(gte(berths.price, String(query.minPrice))); } if (query.maxPrice !== undefined) { filters.push(lte(berths.price, String(query.maxPrice))); } if (query.tenureType) { filters.push(eq(berths.tenureType, query.tenureType)); } // Tag filter: join against berthTags if (query.tagIds && query.tagIds.length > 0) { const tagIds = query.tagIds; const berthsWithTags = await db .selectDistinct({ berthId: berthTags.berthId }) .from(berthTags) .where(inArray(berthTags.tagId, tagIds)); const matchingIds = berthsWithTags.map((r) => r.berthId); if (matchingIds.length === 0) { return { data: [], total: 0 }; } filters.push(inArray(berths.id, matchingIds)); } // Default ordering is natural alphanumeric on mooring number // (A1, A2, A10, B1...) — Postgres' default lexicographic sort // would put A10 before A2, which is the wrong story for a marina // map. The mooring format is locked at `^[A-Z]+\d+$` so the regexp // splits are safe. const NATURAL_MOORING_SORT = [ sql`regexp_replace(${berths.mooringNumber}, '[0-9]+$', '') ASC`, sql`(regexp_replace(${berths.mooringNumber}, '^[A-Z]+', ''))::int ASC`, ]; const sortColumn = (() => { switch (query.sort) { case 'mooringNumber': // Honoured via customOrderBy below — caller asked for mooring // sort explicitly, give them the natural order. return null; case 'area': return berths.area; case 'price': return berths.price; case 'status': return berths.status; case 'lengthM': return berths.lengthM; default: // No sort requested → natural mooring order is the friendliest // default for the berth grid (groups by pontoon letter). return null; } })(); const result = await buildListQuery({ table: berths, portIdColumn: berths.portId, portId, idColumn: berths.id, updatedAtColumn: berths.updatedAt, filters, sort: sortColumn ? { column: sortColumn, direction: query.order } : undefined, customOrderBy: sortColumn ? undefined : NATURAL_MOORING_SORT, page: query.page, pageSize: query.limit, searchColumns: [berths.mooringNumber, berths.area], searchTerm: query.search, // berths.archivedAt + ?includeArchived flag landed in migration 0065. // Default the admin list to active-only; an `?includeArchived=true` // query string surfaces the archive bin for ops. archivedAtColumn: berths.archivedAt, includeArchived: Boolean(query.includeArchived), }); // Attach tags for list items const berthIds = (result.data as Array<{ id: string }>).map((b) => b.id); const tagsByBerthId: Record> = {}; if (berthIds.length > 0) { const tagRows = await db .select({ berthId: berthTags.berthId, id: tags.id, name: tags.name, color: tags.color, }) .from(berthTags) .innerJoin(tags, eq(berthTags.tagId, tags.id)) .where(inArray(berthTags.berthId, berthIds)); for (const row of tagRows) { if (!tagsByBerthId[row.berthId]) tagsByBerthId[row.berthId] = []; tagsByBerthId[row.berthId]!.push({ id: row.id, name: row.name, color: row.color }); } } const latestStageByBerthId = await getLatestInterestStageByBerth(berthIds, portId); const data = (result.data as Array>).map((b) => ({ ...b, tags: tagsByBerthId[b.id as string] ?? [], latestInterestStage: latestStageByBerthId[b.id as string] ?? null, })); return { data, total: result.total }; } /** * For each berth id, returns the most-advanced pipeline stage among its * linked active interests (outcome IS NULL, not archived). Used by the * berth list + detail to surface the deal furthest along on a berth so * reps can see at a glance whether a berth is "Reservation Sent" via * its connected interest, even though berth.status only tracks * available/under_offer/sold. */ async function getLatestInterestStageByBerth( berthIds: string[], portId: string, ): Promise> { if (berthIds.length === 0) return {}; const rows = await db .select({ berthId: interestBerths.berthId, pipelineStage: interests.pipelineStage, }) .from(interestBerths) .innerJoin(interests, eq(interestBerths.interestId, interests.id)) .where(and(activeInterestsWhere(portId), inArray(interestBerths.berthId, berthIds))); // Pipeline stages are an ordered enum — rank by position in PIPELINE_STAGES // so "contract_signed" beats "eoi_sent". Falls back to 0 for any unknown // legacy values so they're treated as least-advanced. const rankOf = (stage: string) => { const idx = (PIPELINE_STAGES as readonly string[]).indexOf(stage); return idx === -1 ? -1 : idx; }; const top: Record = {}; for (const row of rows) { const current = top[row.berthId]; if (!current || rankOf(row.pipelineStage) > rankOf(current)) { top[row.berthId] = row.pipelineStage; } } return top; } // ─── Get By ID ──────────────────────────────────────────────────────────────── export async function getBerthById(id: string, portId: string) { const berth = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), with: { mapData: true, }, }); if (!berth) throw new NotFoundError('Berth'); // Fetch tags const tagRows = await db .select({ id: tags.id, name: tags.name, color: tags.color }) .from(berthTags) .innerJoin(tags, eq(berthTags.tagId, tags.id)) .where(eq(berthTags.berthId, id)); const latestStageMap = await getLatestInterestStageByBerth([id], portId); return { ...berth, tags: tagRows, latestInterestStage: latestStageMap[id] ?? null, }; } // ─── Update ─────────────────────────────────────────────────────────────────── export async function updateBerth( id: string, portId: string, data: UpdateBerthInput, meta: AuditMeta, ) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); const { changed, diff } = diffEntity( existing as Record, data as Record, ); if (!changed) return existing; // Drizzle numeric columns expect string | null - coerce numbers to strings const n = (v: number | undefined) => (v !== undefined ? String(v) : undefined); const [updated] = await db .update(berths) .set({ area: data.area, lengthFt: n(data.lengthFt), lengthM: n(data.lengthM), widthFt: n(data.widthFt), widthM: n(data.widthM), draftFt: n(data.draftFt), draftM: n(data.draftM), widthIsMinimum: data.widthIsMinimum, nominalBoatSize: n(data.nominalBoatSize), nominalBoatSizeM: n(data.nominalBoatSizeM), waterDepth: n(data.waterDepth), waterDepthM: n(data.waterDepthM), waterDepthIsMinimum: data.waterDepthIsMinimum, sidePontoon: data.sidePontoon, powerCapacity: n(data.powerCapacity), voltage: n(data.voltage), mooringType: data.mooringType, cleatType: data.cleatType, cleatCapacity: data.cleatCapacity, bollardType: data.bollardType, bollardCapacity: data.bollardCapacity, access: data.access, price: n(data.price), priceCurrency: data.priceCurrency, bowFacing: data.bowFacing, berthApproved: data.berthApproved, tenureType: data.tenureType, tenureYears: data.tenureYears, tenureStartDate: data.tenureStartDate, tenureEndDate: data.tenureEndDate, updatedAt: new Date(), }) .where(and(eq(berths.id, id), eq(berths.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'berth', entityId: id, oldValue: toAuditJson(diff), newValue: toAuditJson(data), ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'berth:updated', { berthId: id, changedFields: Object.keys(diff), }); void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) => dispatchWebhookEvent(portId, 'berth:updated', { berthId: id }), ); return updated!; } // ─── Update Status ──────────────────────────────────────────────────────────── export async function updateBerthStatus( id: string, portId: string, data: UpdateBerthStatusInput, meta: AuditMeta, ) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); const [updated] = await db .update(berths) .set({ status: data.status, statusLastChangedBy: meta.userId, statusLastChangedReason: data.reason, statusLastModified: new Date(), updatedAt: new Date(), }) .where(and(eq(berths.id, id), eq(berths.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'berth', entityId: id, oldValue: { status: existing.status }, newValue: { status: data.status, reason: data.reason }, metadata: { type: 'status_change', reason: data.reason }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'berth:statusChanged', { berthId: id, oldStatus: existing.status, newStatus: data.status, triggeredBy: meta.userId, }); void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) => dispatchWebhookEvent(portId, 'berth:statusChanged', { berthId: id, oldStatus: existing.status, newStatus: data.status, }), ); // Optional: link the chosen interest as the primary holder of this // berth. Cross-port checks live inside the helper so a malicious // interestId from another port can't slip past the status PATCH. if (data.interestId) { const { setPrimaryBerth } = await import('@/lib/services/interest-berths.service'); await setPrimaryBerth(data.interestId, id); } return updated!; } // ─── Set Tags ───────────────────────────────────────────────────────────────── export async function setBerthTags(id: string, portId: string, tagIds: string[], meta: AuditMeta) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); const result = await setEntityTags({ joinTable: berthTags, entityColumn: berthTags.berthId, tagColumn: berthTags.tagId, entityId: id, portId, tagIds, meta, entityType: 'berth', }); return { berthId: result.entityId, tagIds: result.tagIds }; } // ─── Add Maintenance Log ────────────────────────────────────────────────────── export async function addMaintenanceLog( id: string, portId: string, data: AddMaintenanceLogInput, meta: AuditMeta, ) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); const rows = await db .insert(berthMaintenanceLog) .values({ berthId: id, portId, category: data.category, description: data.description, cost: data.cost !== undefined ? String(data.cost) : undefined, costCurrency: data.costCurrency, responsibleParty: data.responsibleParty, performedDate: data.performedDate, photoFileIds: data.photoFileIds, createdBy: meta.userId, }) .returning(); const log = rows[0]!; void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'berth_maintenance_log', entityId: log.id, metadata: { berthId: id, category: data.category }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'berth:maintenanceAdded', { berthId: id, logEntry: log, }); return log; } // ─── Get Maintenance Logs ───────────────────────────────────────────────────── export async function getMaintenanceLogs(id: string, portId: string) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); return db .select() .from(berthMaintenanceLog) .where(and(eq(berthMaintenanceLog.berthId, id), eq(berthMaintenanceLog.portId, portId))) .orderBy(berthMaintenanceLog.performedDate); } // ─── Get Waiting List ───────────────────────────────────────────────────────── export async function getWaitingList(id: string, portId: string) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); return db .select() .from(berthWaitingList) .where(eq(berthWaitingList.berthId, id)) .orderBy(berthWaitingList.position); } // ─── Update Waiting List ────────────────────────────────────────────────────── export async function updateWaitingList( id: string, portId: string, data: UpdateWaitingListInput, meta: AuditMeta, ) { const existing = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!existing) throw new NotFoundError('Berth'); // Validate every supplied clientId belongs to portId. Without this // check, a port-A admin could insert port-B clientIds into the // waiting list - corrupting reportable data and creating a join // surface that hydrates foreign-tenant client rows. if (data.entries.length > 0) { const clientIds = [...new Set(data.entries.map((e) => e.clientId))]; const validClients = await db .select({ id: clients.id }) .from(clients) .where(and(inArray(clients.id, clientIds), eq(clients.portId, portId))); if (validClients.length !== clientIds.length) { throw new ValidationError('One or more clients are not in this port'); } } // Replace entire waiting list await db.delete(berthWaitingList).where(eq(berthWaitingList.berthId, id)); if (data.entries.length > 0) { await db.insert(berthWaitingList).values( data.entries.map((entry) => ({ berthId: id, clientId: entry.clientId, position: entry.position, priority: entry.priority ?? 'normal', notifyPref: entry.notifyPref ?? 'email', notes: entry.notes, })), ); } void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'berth', entityId: id, metadata: { type: 'waiting_list_updated', count: data.entries.length }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'berth:waitingListChanged', { berthId: id, action: 'replaced', entry: data.entries, }); return data.entries; } // ─── Create ────────────────────────────────────────────────────────────────── export async function createBerth(portId: string, data: CreateBerthInput, meta: AuditMeta) { // Check mooring number uniqueness within port const existing = await db.query.berths.findFirst({ where: and(eq(berths.portId, portId), eq(berths.mooringNumber, data.mooringNumber)), }); if (existing) { throw new ConflictError(`Berth "${data.mooringNumber}" already exists in this port`); } // Caller-specified currency wins; otherwise inherit the port's admin- // configured default (system_settings.berths_default_currency, USD if // unset). Lets a multi-currency portfolio be modelled cleanly without // forcing reps to pick a currency on every new-berth form. const resolvedCurrency = data.priceCurrency ?? (await getPortBerthsDefaultCurrency(portId)); const [berth] = await db .insert(berths) .values({ portId, mooringNumber: data.mooringNumber, area: data.area, status: data.status ?? 'available', lengthFt: data.lengthFt?.toString(), lengthM: data.lengthM?.toString(), widthFt: data.widthFt?.toString(), widthM: data.widthM?.toString(), draftFt: data.draftFt?.toString(), draftM: data.draftM?.toString(), price: data.price?.toString(), priceCurrency: resolvedCurrency, tenureType: data.tenureType ?? 'permanent', mooringType: data.mooringType, powerCapacity: data.powerCapacity?.toString(), voltage: data.voltage?.toString(), access: data.access, bowFacing: data.bowFacing, sidePontoon: data.sidePontoon, }) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'berth', entityId: berth!.id, newValue: { mooringNumber: berth!.mooringNumber, area: berth!.area }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'system:alert', { alertType: 'berth:created', message: `Berth "${berth!.mooringNumber}" created`, severity: 'info', }); return berth!; } // ─── Bulk add ─────────────────────────────────────────────────────────────── export async function bulkAddBerths( portId: string, inputs: CreateBerthInput[], meta: AuditMeta, ): Promise<{ inserted: number; ids: string[] }> { // Input-level dedup: catch fat-finger duplicates in the wizard before // hitting the unique index. const seenMoorings = new Set(); for (const row of inputs) { if (seenMoorings.has(row.mooringNumber)) { throw new ConflictError(`Duplicate mooring number "${row.mooringNumber}" in input`); } seenMoorings.add(row.mooringNumber); } const moorings = inputs.map((r) => r.mooringNumber); const existing = await db .select({ mooringNumber: berths.mooringNumber }) .from(berths) .where(and(eq(berths.portId, portId), inArray(berths.mooringNumber, moorings))); if (existing.length > 0) { throw new ConflictError( `Mooring numbers already exist in this port: ${existing.map((r) => r.mooringNumber).join(', ')}`, ); } const defaultCurrency = await getPortBerthsDefaultCurrency(portId); const values = inputs.map((row) => ({ portId, mooringNumber: row.mooringNumber, area: row.area, status: row.status ?? 'available', lengthFt: row.lengthFt?.toString(), lengthM: row.lengthM?.toString(), widthFt: row.widthFt?.toString(), widthM: row.widthM?.toString(), draftFt: row.draftFt?.toString(), draftM: row.draftM?.toString(), price: row.price?.toString(), priceCurrency: row.priceCurrency ?? defaultCurrency, tenureType: row.tenureType ?? 'permanent', mooringType: row.mooringType, powerCapacity: row.powerCapacity?.toString(), voltage: row.voltage?.toString(), access: row.access, bowFacing: row.bowFacing, sidePontoon: row.sidePontoon, })); const inserted = await db.insert(berths).values(values).returning({ id: berths.id }); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'berth', entityId: 'bulk', newValue: { count: inserted.length, mooringNumbers: moorings }, metadata: { type: 'bulk_add' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'system:alert', { alertType: 'berth:bulk_created', message: `${inserted.length} berths added`, severity: 'info', }); return { inserted: inserted.length, ids: inserted.map((r) => r.id) }; } // ─── Archive / Restore ───────────────────────────────────────────────────── /** * Post-audit F5: soft-archive replaces hard-delete. The previous * `db.delete()` permanently dropped the berth row + cascade-vanished * interest_berths links + broke historical audit references. Now the * row stays; `archived_at` shields it from default queries. * * Reasoning chain: * 1. Block if there's an active (non-archived, no-outcome) interest * still linked — archiving with deals in flight breaks reports. * 2. Stamp archived_at + archived_by + archive_reason in a single update. * 3. Audit log captures the reason so /admin/audit shows the why. * 4. Emit a socket alert so any open berth-detail page bounces. */ export async function archiveBerth( id: string, portId: string, input: { reason: string }, meta: AuditMeta, ) { const berth = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!berth) throw new NotFoundError('Berth'); if (berth.archivedAt) { throw new ConflictError('Berth is already archived'); } // Block archive when an active interest still depends on the berth — // forces the rep to resolve the deal first instead of orphaning it. const activeLink = await db .select({ interestId: interestBerths.interestId }) .from(interestBerths) .innerJoin(interests, eq(interests.id, interestBerths.interestId)) .where( and(eq(interestBerths.berthId, id), isNull(interests.archivedAt), isNull(interests.outcome)), ) .limit(1); if (activeLink.length > 0) { throw new ConflictError( 'Cannot archive a berth with an active interest. Resolve or archive the interest first.', ); } await db .update(berths) .set({ archivedAt: new Date(), archivedBy: meta.userId, archiveReason: input.reason, updatedAt: new Date(), }) .where(and(eq(berths.id, id), eq(berths.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'archive', entityType: 'berth', entityId: id, oldValue: { mooringNumber: berth.mooringNumber, area: berth.area, status: berth.status }, newValue: { reason: input.reason }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'system:alert', { alertType: 'berth:archived', message: `Berth "${berth.mooringNumber}" archived: ${input.reason}`, severity: 'info', }); } /** Un-archive. Available to anyone with `berths:edit`. Audit-logged. */ export async function restoreBerth(id: string, portId: string, meta: AuditMeta) { const berth = await db.query.berths.findFirst({ where: and(eq(berths.id, id), eq(berths.portId, portId)), }); if (!berth) throw new NotFoundError('Berth'); if (!berth.archivedAt) { throw new ConflictError('Berth is not archived'); } await db .update(berths) .set({ archivedAt: null, archivedBy: null, archiveReason: null, updatedAt: new Date(), }) .where(and(eq(berths.id, id), eq(berths.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'restore', entityType: 'berth', entityId: id, oldValue: { archivedAt: berth.archivedAt, archiveReason: berth.archiveReason }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'system:alert', { alertType: 'berth:restored', message: `Berth "${berth.mooringNumber}" restored`, severity: 'info', }); } /** * @deprecated Use `archiveBerth` instead. Kept temporarily for callers * that haven't migrated. Calls archiveBerth under the hood — the * "hard delete" name is now a lie but we don't break the import sites * in a single PR. */ export async function deleteBerth(id: string, portId: string, meta: AuditMeta) { return archiveBerth(id, portId, { reason: 'Deleted via legacy delete path' }, meta); } // ─── Options ────────────────────────────────────────────────────────────────── export async function getBerthOptions(portId: string) { // DB-side `ORDER BY mooring_number` is lexicographic (A1, A10, A11, A2…). // Natural-sort in JS so dropdowns surface them as reps read them: A1, A2, // …, A10, A11. See compareMooringNumbers for the prefix/index split. const rows = await db .select({ id: berths.id, mooringNumber: berths.mooringNumber, area: berths.area, status: berths.status, }) .from(berths) // F5: hide archived berths from option pickers; otherwise a dead berth // appears in the New Interest combobox and re-links itself to a deal. .where(and(eq(berths.portId, portId), isNull(berths.archivedAt))); return sortByMooring(rows, (r) => r.mooringNumber); }