diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js b/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js index d03efaa..abd5f08 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js +++ b/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js @@ -5,6 +5,8 @@ import { icebergExpireSnapshots, loadLatestFileCatalogMetadata, } from 'icebird' +import { fileCatalogCommit } from 'icebird/src/write/commit.js' +import { icebergStageRewrite } from 'icebird/src/write/rewrite.js' import { SNAPSHOT_RETENTION_DEFAULTS } from '../../../../src/core/cache/maintenance.js' import { createBlobStoreIO, tableUrlForBlobPrefix } from './blob-io.js' @@ -12,11 +14,24 @@ import { createBlobStoreIO, tableUrlForBlobPrefix } from './blob-io.js' /** * @import { BlobStore } from '../../../../collectivus-plugin-kernel-types.d.ts' * @import { Resolver, Lister, TableMetadata } from 'icebird/src/types.js' - * @import { ExportRetentionConfig, ExportMaintenanceDatasetReport, ExportMaintenanceReport } from './types.d.ts' + * @import { ExportCompactionResult, ExportRetentionConfig, ExportMaintenanceDatasetReport, ExportMaintenanceReport } from './types.d.ts' */ /** @type {ExportRetentionConfig} */ -const DEFAULTS = SNAPSHOT_RETENTION_DEFAULTS +const DEFAULTS = Object.freeze({ + ...SNAPSHOT_RETENTION_DEFAULTS, + // Mirrors the local-cache `compact_file_count` trigger: rewrite once a + // table's live data-file count crosses this threshold. + compact_file_count: 32, + // icebird's rewrite materializes every live row in memory before writing + // (no streaming path yet), so an unbounded table would re-create the + // parquet-encoder OOM this repo already hit twice (#82 local cache, + // #90 parquet exports). Skip the rewrite when the current snapshot's + // `total-files-size` exceeds this. 128 MB matches the cache's + // `target_file_bytes`: compressed parquet expands ~10x into JS objects, + // keeping the rewrite around a gigabyte of heap under a default node heap. + compact_max_bytes: 128 * 1024 * 1024, +}) /** * @param {Partial | undefined} config @@ -26,6 +41,8 @@ export function normalizeExportRetentionConfig(config) { return { min_snapshots_to_keep: config?.min_snapshots_to_keep ?? DEFAULTS.min_snapshots_to_keep, max_snapshot_age_hours: config?.max_snapshot_age_hours ?? DEFAULTS.max_snapshot_age_hours, + compact_file_count: config?.compact_file_count ?? DEFAULTS.compact_file_count, + compact_max_bytes: config?.compact_max_bytes ?? DEFAULTS.compact_max_bytes, } } @@ -113,22 +130,299 @@ export async function discoverExportDatasets(blobStore, prefix) { return Array.from(datasets).sort() } +/** + * Compact a blob-store-backed Iceberg export table by rewriting its live + * rows into consolidated, sorted data files (icebird `icebergRewrite`; + * 0.8.10 preserves v3 row lineage across the rewrite, which matters + * because export tables are created with `formatVersion: 3`). + * + * @ref LLP 0022#compaction — this is the *out-of-band* rewrite the spec + * reserves: it must only run from an explicit, manual invocation + * (`hyp sink maintain --compact`), never from the daemon loop or the + * sink tick, because a full read-rewrite in the daemon process is the + * OOM/blocking failure mode already seen with the parquet encoder. + * + * The rewrite is skipped while the table's live data-file count is below + * `compactFileCount` — for a day-partitioned archive the files are + * already large, so most tables never reach the threshold — and when the + * current snapshot's `total-files-size` exceeds `compactMaxBytes` + * (icebird's rewrite holds every live row in memory; see DEFAULTS). + * + * A rewrite commit is intentionally NOT retried on a concurrent-commit + * conflict: it only rewrote the rows it read, so a blind retry could drop + * rows another writer appended in the meantime. On a failed commit the + * latest metadata is re-loaded BEFORE any cleanup: a timeout after the + * conditional PUT durably landed, or an SDK-internal retry of its own + * successful write surfacing 412, both leave the staged rewrite as the + * table's current snapshot — deleting its data files then would corrupt + * the export. Only when the reload confirms the staged snapshot did not + * land is a conflict's staged output deleted best-effort (icebird's + * `icebergRewrite` leaves it orphaned, which is why this stages and + * commits explicitly); an unverifiable outcome leaves the bounded + * orphans in place and says so in the error. + * + * Every non-compaction outcome is discriminated by `reason` so the CLI + * can tell an idle table from a failed rewrite (a swallowed failure here + * would misreport as "below threshold" — the one manual compaction tool + * misdiagnosing itself). A metadata *load* failure is only reported as + * `no-table` when the table verifiably does not exist; auth/IO failures + * surface as `error` so the CLI exits nonzero instead of printing an + * idle-table skip. + * + * @param {{ + * tableUrl: string + * resolver: Resolver + * lister: Lister + * compactFileCount: number + * compactMaxBytes?: number + * dryRun?: boolean + * }} opts + * @returns {Promise} + */ +export async function compactExportTable({ tableUrl, resolver, lister, compactFileCount, compactMaxBytes, dryRun }) { + /** @type {Awaited>} */ + let loaded + try { + loaded = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + } catch (err) { + if (isMissingTableError(err)) { + return { compacted: false, reason: 'no-table', dataFilesBefore: 0, dataFilesAfter: 0 } + } + return { + compacted: false, + reason: 'error', + error: describeError(err), + dataFilesBefore: 0, + dataFilesAfter: 0, + } + } + const metadata = loaded.metadata + + const dataFilesBefore = currentDataFileCount(metadata) + if (dataFilesBefore < compactFileCount) { + return { compacted: false, reason: 'below-threshold', dataFilesBefore, dataFilesAfter: dataFilesBefore } + } + const totalBytes = currentSummaryNumber(metadata, 'total-files-size') + if (compactMaxBytes !== undefined && totalBytes !== undefined && totalBytes > compactMaxBytes) { + return { + compacted: false, + reason: 'above-byte-cap', + totalBytes, + dataFilesBefore, + dataFilesAfter: dataFilesBefore, + } + } + if (dryRun) { + return { compacted: true, dataFilesBefore, dataFilesAfter: dataFilesBefore } + } + + // Stage and commit explicitly (the same load → stage → single-attempt + // commit `icebergRewrite` performs for a conditional-commit file + // catalog) so a failed commit can clean up `staged.writtenFiles` + // instead of leaving a full rewritten copy of the table orphaned in + // the blob store on every lost race. + // + // The stage phase itself writes data files, a manifest, and a + // manifest list one by one, and icebird only reports `writtenFiles` + // on a StagedUpdate that completed — so a failure after the first + // write would leak everything written before it. Track every path + // whose write finished through a wrapped resolver and reclaim them + // when staging dies mid-flight. + /** @type {string[]} */ + const stagedPaths = [] + /** @type {Resolver} */ + const trackingResolver = { + ...resolver, + writer(url, options) { + const writer = resolver.writer(url, options) + const finish = writer.finish.bind(writer) + writer.finish = async () => { + await finish() + stagedPaths.push(url) + } + return writer + }, + } + /** @type {Awaited>} */ + let staged + try { + staged = await icebergStageRewrite({ tableUrl, metadata, resolver: trackingResolver }) + } catch (err) { + await deleteFilesBestEffort(resolver, stagedPaths) + return { + compacted: false, + reason: 'error', + error: describeError(err), + dataFilesBefore, + dataFilesAfter: dataFilesBefore, + } + } + try { + const post = await fileCatalogCommit({ + tableUrl, + metadata, + metadataFileName: loaded.metadataFileName, + currentVersion: loaded.version, + staged, + resolver, + conditionalCommits: true, + }) + return { compacted: true, dataFilesBefore, dataFilesAfter: currentDataFileCount(post) } + } catch (err) { + // The thrown error alone cannot prove the commit missed: a timeout + // after the conditional PUT durably landed, or an SDK-internal + // retry of its own successful write surfacing 412, both leave the + // staged rewrite committed and referencing the files this catch + // would delete. Re-load and check before any cleanup. + /** @type {'landed' | 'lost' | 'unknown'} */ + let outcome = 'unknown' + /** @type {TableMetadata | undefined} */ + let postMetadata + try { + const reloaded = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + const stagedId = staged.snapshot['snapshot-id'] + const present = (reloaded.metadata.snapshots ?? []).some( + (s) => String(s['snapshot-id']) === String(stagedId) + ) + outcome = present ? 'landed' : 'lost' + postMetadata = reloaded.metadata + } catch { + // Reload failed: the commit outcome is unverifiable. Fall through + // to the conservative no-delete path below. + } + if (outcome === 'landed' && postMetadata) { + return { compacted: true, dataFilesBefore, dataFilesAfter: currentDataFileCount(postMetadata) } + } + // A 412 means the conditional write was rejected, so once the reload + // confirms the staged snapshot is absent the staged files are safe + // to reclaim. Any other shape (network error, reload failure) could + // still be an in-flight commit — leave the bounded orphans rather + // than risk deleting data files a landed commit references. + if (outcome === 'lost' && isCommitConflict(err)) { + await deleteFilesBestEffort(resolver, staged.writtenFiles) + return { + compacted: false, + reason: 'conflict', + error: describeError(err), + dataFilesBefore, + dataFilesAfter: dataFilesBefore, + } + } + return { + compacted: false, + reason: 'error', + error: + `${describeError(err)}; commit outcome unverified — ` + + `${staged.writtenFiles.length} staged rewrite file(s) left under the table ` + + '(deleting them could corrupt the table if the commit actually landed)', + dataFilesBefore, + dataFilesAfter: dataFilesBefore, + } + } +} + +/** + * Best-effort reclamation of staged rewrite output. Failures are + * swallowed: the caller is already on an error path and a stuck delete + * must not mask the original failure. + * + * @param {Resolver} resolver + * @param {string[]} paths + */ +async function deleteFilesBestEffort(resolver, paths) { + if (!resolver.deleter || paths.length === 0) return + const { deleter } = resolver + await Promise.allSettled(paths.map((p) => deleter(p))) +} + +/** + * True when a metadata-load failure means "this table does not exist" + * rather than "the load itself failed". Local-fs surfaces a missing + * table as ENOENT; a blob lister returns an empty listing for a + * missing prefix, which icebird reports as 'no metadata files found'. + * Anything else (auth, corrupt metadata, transient IO) must NOT fold + * into `no-table`: the CLI exits 0 for a missing table but nonzero + * for a failed load. + * + * @param {unknown} err + * @returns {boolean} + */ +function isMissingTableError(err) { + if (!err || typeof err !== 'object') return false + const record = /** @type {Record} */ (err) + if (record.code === 'ENOENT' || record.code === 'NoSuchKey') return true + return err instanceof Error && err.message.includes('no metadata files found') +} + +/** + * Live data-file count from the current snapshot's summary. + * + * @param {TableMetadata} metadata + * @returns {number} + */ +function currentDataFileCount(metadata) { + return currentSummaryNumber(metadata, 'total-data-files') ?? 0 +} + +/** + * Numeric field from the current snapshot's summary, or undefined when + * there is no current snapshot or the field is absent/non-numeric. + * + * @param {TableMetadata} metadata + * @param {string} key + * @returns {number | undefined} + */ +function currentSummaryNumber(metadata, key) { + const currentId = metadata['current-snapshot-id'] + if (currentId === undefined) return undefined + const snapshot = metadata.snapshots?.find((s) => String(s['snapshot-id']) === String(currentId)) + const raw = snapshot?.summary?.[key] + if (raw === undefined) return undefined + const value = Number(raw) + return Number.isFinite(value) ? value : undefined +} + +/** + * Concurrent-commit conflict detection, mirroring icebird's internal + * `isCommitConflict`: the blob-io writer surfaces a conditional-write + * collision as a 412 (and tags it `iceberg_commit_conflict`); REST-shaped + * catalogs use 409. + * + * @param {unknown} err + * @returns {boolean} + */ +function isCommitConflict(err) { + if (!err || typeof err !== 'object') return false + const record = /** @type {Record} */ (err) + if (record.hypErrorKind === 'iceberg_commit_conflict') return true + const status = record.status ?? record.statusCode + return status === 412 || status === 409 +} + +/** + * @param {unknown} err + * @returns {string} + */ +function describeError(err) { + if (err instanceof Error) return err.message + return String(err) +} + /** * Run export maintenance on all datasets under a prefix: snapshot - * expiration per dataset, plus a compaction status report. + * expiration per dataset, plus — only when `compact` is set — the + * out-of-band data-file rewrite ({@link compactExportTable}). * - * @ref LLP 0022#compaction — icebird now exposes `icebergRewrite` - * (read-rewrite compaction), but the export deliberately does not run it: - * day-grain partitioning already yields large files, and an in-daemon - * read-rewrite risks the OOM/blocking failure seen with the parquet sink. - * `compactionSupported: false` here means "not run by this sink" (out-of-band - * only), not "impossible". + * @ref LLP 0022#compaction — `compact` defaults to false; the daemon + * and the sink tick never set it. Only the manual CLI path + * (`hyp sink maintain --compact`) opts in. * * @param {{ * blobStore: BlobStore * prefix: string * datasets?: string[] * config?: Partial + * compact?: boolean * dryRun?: boolean * }} opts * @returns {Promise} @@ -137,6 +431,7 @@ export async function maintainExportTables(opts) { const startMs = Date.now() const cfg = normalizeExportRetentionConfig(opts.config) const dryRun = opts.dryRun ?? false + const compact = opts.compact ?? false const datasets = opts.datasets ?? await discoverExportDatasets(opts.blobStore, opts.prefix) const { resolver, lister } = await createBlobStoreIO(opts.blobStore) @@ -144,6 +439,7 @@ export async function maintainExportTables(opts) { /** @type {ExportMaintenanceDatasetReport[]} */ const reports = [] let totalSnapshotsExpired = 0 + let totalTablesCompacted = 0 for (const dataset of datasets) { const blobPrefix = joinKeys(stripSlashes(opts.prefix), dataset) @@ -156,18 +452,38 @@ export async function maintainExportTables(opts) { dryRun, }) totalSnapshotsExpired += result.expired - reports.push({ + /** @type {ExportMaintenanceDatasetReport} */ + const report = { dataset, snapshotsExpired: result.expired, snapshotsBefore: result.snapshotsBefore, - compactionSupported: false, - }) + compactionSupported: true, + compacted: false, + } + if (compact) { + const compaction = await compactExportTable({ + tableUrl, + resolver, + lister, + compactFileCount: cfg.compact_file_count, + compactMaxBytes: cfg.compact_max_bytes, + dryRun, + }) + report.compacted = compaction.compacted + report.compactionReason = compaction.reason + report.compactionError = compaction.error + report.dataFilesBefore = compaction.dataFilesBefore + report.dataFilesAfter = compaction.dataFilesAfter + if (compaction.compacted) totalTablesCompacted += 1 + } + reports.push(report) } return { datasets: reports, totalSnapshotsExpired, - compactionSupported: false, + totalTablesCompacted, + compactionSupported: true, dryRun, elapsedMs: Date.now() - startMs, } diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts b/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts index 8986e39..7fe557e 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts +++ b/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts @@ -84,19 +84,74 @@ export type BlobIOWriteObserver = (event: BlobIOWriteEvent) => void export interface ExportRetentionConfig { min_snapshots_to_keep: number max_snapshot_age_hours: number + /** + * Rewrite a table once its live data-file count reaches this threshold. + * Only consulted by the out-of-band compaction path (LLP 0022). + */ + compact_file_count: number + /** + * Skip the rewrite when the current snapshot's `total-files-size` + * exceeds this many bytes: icebird's rewrite materializes every live + * row in memory, so an unbounded table would OOM the manual CLI run. + */ + compact_max_bytes: number +} + +/** + * Why a requested compaction did not commit a rewrite. + * - `no-table`: the table verifiably does not exist (no metadata files). + * - `below-threshold`: live data-file count under `compact_file_count`. + * - `above-byte-cap`: `total-files-size` over `compact_max_bytes`; raise the + * cap (and the heap) to rewrite anyway. + * - `conflict`: another writer's commit was confirmed to have won the race; + * staged files were cleaned up, re-run to retry from fresh metadata. + * - `error`: the metadata load or the rewrite failed (IO, auth, ...); see + * `error`. A failed commit whose outcome could not be verified also lands + * here, with its staged files deliberately left in place (deleting them + * could corrupt the table if the commit actually landed). + */ +export type ExportCompactionSkipReason = + | 'no-table' + | 'below-threshold' + | 'above-byte-cap' + | 'conflict' + | 'error' + +export interface ExportCompactionResult { + compacted: boolean + /** Present iff `compacted` is false. */ + reason?: ExportCompactionSkipReason + /** Error message when `reason` is 'conflict' or 'error'. */ + error?: string + /** Current snapshot's `total-files-size`, when the byte cap rejected it. */ + totalBytes?: number + dataFilesBefore: number + dataFilesAfter: number } export interface ExportMaintenanceDatasetReport { dataset: string snapshotsExpired: number snapshotsBefore: number - compactionSupported: false + /** icebird >= 0.8.9 exposes `icebergRewrite`; out-of-band only (LLP 0022). */ + compactionSupported: true + /** True when an opt-in rewrite committed (or would have, under dryRun). */ + compacted: boolean + /** Present when compaction was requested but did not commit. */ + compactionReason?: ExportCompactionSkipReason + /** Present when the rewrite conflicted or failed. */ + compactionError?: string + /** Present only when compaction was requested. */ + dataFilesBefore?: number + /** Present only when compaction was requested. */ + dataFilesAfter?: number } export interface ExportMaintenanceReport { datasets: ExportMaintenanceDatasetReport[] totalSnapshotsExpired: number - compactionSupported: false + totalTablesCompacted: number + compactionSupported: true dryRun: boolean elapsedMs: number } diff --git a/hypaware-core/smoke/flows/iceberg_export_local_fs.js b/hypaware-core/smoke/flows/iceberg_export_local_fs.js index 79a018b..9127719 100644 --- a/hypaware-core/smoke/flows/iceberg_export_local_fs.js +++ b/hypaware-core/smoke/flows/iceberg_export_local_fs.js @@ -2,7 +2,7 @@ import fs from 'node:fs/promises' import path from 'node:path' -import { fileURLToPath } from 'node:url' +import { fileURLToPath, pathToFileURL } from 'node:url' import { icebergRead, @@ -188,6 +188,11 @@ export async function run({ harness, expect }) { (v) => v && v.kind === 'table-format' && v.tableFormat === 'iceberg' ) + // Settle the spool so the fixture's rows are committed to their + // routed `source=...` partition table before discovery — appendRows + // only schedules a background flush at the size threshold. + await kernel.storage.flushAll({ force: true }) + // Discover the dataset partition the fixture registered and // simulate one sink tick by calling `exportBatch` directly. (The // sink driver auto-tick path is still being wired up — flows in @@ -261,7 +266,8 @@ export async function run({ harness, expect }) { ) expect.that('export2: status=exported', exportResult2.status, (v) => v === 'exported') - // Maintenance: snapshot expiration + compaction status. + // Maintenance, default mode: snapshot expiration only — compaction is + // out-of-band (LLP 0022) and must not run without the explicit opt-in. const maintainReport = await maintainExportTables({ blobStore, prefix: 'iceberg/datasets', @@ -271,11 +277,6 @@ export async function run({ harness, expect }) { maintainReport.datasets, (ds) => Array.isArray(ds) && ds.some((d) => d.dataset === DATASET) ) - expect.that( - 'maintain: compactionSupported is false (not run by this sink — out-of-band only, LLP 0022)', - maintainReport.compactionSupported, - (v) => v === false - ) const datasetReport = maintainReport.datasets.find((d) => d.dataset === DATASET) expect.that( 'maintain: dataset report has snapshotsBefore >= 2', @@ -283,17 +284,67 @@ export async function run({ harness, expect }) { (v) => typeof v === 'number' && v >= 2 ) expect.that( - 'maintain: per-dataset compactionSupported is false', - datasetReport?.compactionSupported, + 'maintain: default run does not compact (out-of-band only, LLP 0022)', + datasetReport?.compacted, (v) => v === false ) - // After maintenance the table is still readable. + // Opt-in compaction: the two exported batches left (at least) two data + // files; compact_file_count=2 forces a rewrite so the smoke exercises + // the real icebergRewrite path. + const compactReport = await maintainExportTables({ + blobStore, + prefix: 'iceberg/datasets', + config: { compact_file_count: 2 }, + compact: true, + }) + const compactedDataset = compactReport.datasets.find((d) => d.dataset === DATASET) + expect.that( + 'maintain --compact: table was compacted', + compactedDataset?.compacted, + (v) => v === true + ) + expect.that( + 'maintain --compact: rewrite consolidated data files', + compactedDataset, + (d) => d !== undefined && + typeof d.dataFilesBefore === 'number' && d.dataFilesBefore >= 2 && + typeof d.dataFilesAfter === 'number' && d.dataFilesAfter < d.dataFilesBefore + ) + expect.that( + 'maintain --compact: one table compacted in totals', + compactReport.totalTablesCompacted, + (v) => v === 1 + ) + + // After maintenance the table holds exactly the rows both batches + // committed — batch-1 and batch-2 each appended the same ROW_COUNT + // fixture rows, so every id 0..ROW_COUNT-1 must appear exactly twice. + // An exact check here is the smoke-tier guard against the rewrite's + // central risk (LLP 0022): a compaction that silently drops rows. const { readTableRows: postMaintainRows } = await readIcebergTable(tableUrl, blobStore) expect.that( - 'maintain: table still readable after maintenance', + 'maintain: exact row count survives compaction (2 batches x ROW_COUNT)', + postMaintainRows, + (rows) => Array.isArray(rows) && rows.length === 2 * ROW_COUNT + ) + expect.that( + 'maintain: every fixture id survives compaction exactly twice', postMaintainRows, - (rows) => Array.isArray(rows) && rows.length >= ROW_COUNT + (rows) => { + if (!Array.isArray(rows)) return false + /** @type {Map} */ + const counts = new Map() + for (const row of rows) { + const key = String(row.id) + counts.set(key, (counts.get(key) ?? 0) + 1) + } + if (counts.size !== ROW_COUNT) return false + for (let i = 0; i < ROW_COUNT; i++) { + if (counts.get(String(i)) !== 2) return false + } + return true + } ) await obs.shutdown() @@ -403,8 +454,11 @@ async function writeFixturePlugin(dir) { } function fixturePluginSource() { + const partitionModuleUrl = pathToFileURL( + path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js') + ).href return `// auto-generated by iceberg_export_local_fs smoke; fixture: @hypaware/test-fixture -import path from 'node:path' +import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)} /** * @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts' @@ -423,15 +477,14 @@ const dataset = { plugin: '@hypaware/test-fixture', schema: { columns: COLUMNS }, primaryTimestampColumn: undefined, - discoverPartitions(ctx) { + // The storage spool routes flushed rows to '/source=/table/', + // so partition discovery must go through the kernel's discovery helper + // instead of guessing a static path. + async discoverPartitions(ctx) { const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? '' - return [ - { - dataset: DATASET, - partition: { partition: 'all' }, - tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '', - }, - ] + if (!cacheDir) return [] + const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] }) + return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path })) }, async createDataSource(partitions, ctx) { const partition = partitions[0] diff --git a/hypaware-core/smoke/flows/iceberg_export_s3_fixture.js b/hypaware-core/smoke/flows/iceberg_export_s3_fixture.js index 6d04562..48cc0c1 100644 --- a/hypaware-core/smoke/flows/iceberg_export_s3_fixture.js +++ b/hypaware-core/smoke/flows/iceberg_export_s3_fixture.js @@ -3,7 +3,7 @@ import { Buffer } from 'node:buffer' import fs from 'node:fs/promises' import path from 'node:path' -import { fileURLToPath } from 'node:url' +import { fileURLToPath, pathToFileURL } from 'node:url' import { Readable } from 'node:stream' import { @@ -269,6 +269,11 @@ export async function run({ harness, expect }) { storage: kernel.storage, }) + // Settle the spool so the fixture's rows are committed to their + // routed `source=...` partition table before discovery — appendRows + // only schedules a background flush at the size threshold. + await kernel.storage.flushAll({ force: true }) + const dataset = kernel.query.getDataset(DATASET) if (!dataset) throw new Error(`fixture dataset ${DATASET} did not register`) const partitions = await dataset.discoverPartitions({ @@ -387,8 +392,11 @@ async function writeFixturePlugin(dir) { } function fixturePluginSource() { + const partitionModuleUrl = pathToFileURL( + path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js') + ).href return `// auto-generated by iceberg_export_s3_fixture smoke; fixture: @hypaware/test-fixture -import path from 'node:path' +import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)} /** * @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts' @@ -407,15 +415,14 @@ const dataset = { plugin: '@hypaware/test-fixture', schema: { columns: COLUMNS }, primaryTimestampColumn: undefined, - discoverPartitions(ctx) { + // The storage spool routes flushed rows to '/source=/table/', + // so partition discovery must go through the kernel's discovery helper + // instead of guessing a static path. + async discoverPartitions(ctx) { const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? '' - return [ - { - dataset: DATASET, - partition: { partition: 'all' }, - tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '', - }, - ] + if (!cacheDir) return [] + const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] }) + return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path })) }, async createDataSource(partitions, ctx) { const partition = partitions[0] diff --git a/hypaware-core/smoke/flows/iceberg_export_s3_roundtrip.js b/hypaware-core/smoke/flows/iceberg_export_s3_roundtrip.js index a2706d8..d132b5b 100644 --- a/hypaware-core/smoke/flows/iceberg_export_s3_roundtrip.js +++ b/hypaware-core/smoke/flows/iceberg_export_s3_roundtrip.js @@ -2,7 +2,7 @@ import fs from 'node:fs/promises' import path from 'node:path' -import { fileURLToPath } from 'node:url' +import { fileURLToPath, pathToFileURL } from 'node:url' import { icebergRead, @@ -188,6 +188,11 @@ export async function run({ harness, expect }) { storage: kernel.storage, }) + // Settle the spool so the fixture's rows are committed to their + // routed `source=...` partition table before discovery — appendRows + // only schedules a background flush at the size threshold. + await kernel.storage.flushAll({ force: true }) + const dataset = kernel.query.getDataset(DATASET) if (!dataset) throw new Error(`fixture dataset ${DATASET} did not register`) const partitions = await dataset.discoverPartitions({ @@ -272,8 +277,11 @@ async function writeFixturePlugin(dir) { } function fixturePluginSource() { + const partitionModuleUrl = pathToFileURL( + path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js') + ).href return `// auto-generated by iceberg_export_s3_roundtrip smoke; fixture: @hypaware/test-fixture -import path from 'node:path' +import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)} /** * @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts' @@ -292,15 +300,14 @@ const dataset = { plugin: '@hypaware/test-fixture', schema: { columns: COLUMNS }, primaryTimestampColumn: undefined, - discoverPartitions(ctx) { + // The storage spool routes flushed rows to '/source=/table/', + // so partition discovery must go through the kernel's discovery helper + // instead of guessing a static path. + async discoverPartitions(ctx) { const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? '' - return [ - { - dataset: DATASET, - partition: { partition: 'all' }, - tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '', - }, - ] + if (!cacheDir) return [] + const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] }) + return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path })) }, async createDataSource(partitions, ctx) { const partition = partitions[0] diff --git a/llp/0022-iceberg-export-partitioning.spec.md b/llp/0022-iceberg-export-partitioning.spec.md index 899238e..f90d0db 100644 --- a/llp/0022-iceberg-export-partitioning.spec.md +++ b/llp/0022-iceberg-export-partitioning.spec.md @@ -130,8 +130,9 @@ and time-pruneable. **Files are locally, not globally, sorted.** Each append sorts only its own rows, so a day partition touched by N export batches holds N internally-sorted files whose `conversation_id` ranges may overlap. Row-group pruning still skips -most of them; a tighter global sort is available out-of-band via `icebergRewrite` -([§Compaction](#compaction)), not run in V1. +most of them; a tighter global sort is available out-of-band via +`hyp sink maintain --compact` ([§Compaction](#compaction)) — never from the +daemon loop or the sink tick. Sort order is mutable table metadata, **not** partition spec, so introducing or evolving it later is not partition-spec drift @@ -203,13 +204,58 @@ for free — a bonus beyond this spec's scope. icebird now exposes `icebergRewrite` (reads live rows, sorts globally under the target spec, writes consolidated files, commits a replace snapshot). So compaction is **available**, reframing the prior -`format-iceberg/src/maintenance.js:120` "blocked by icebird" status. But for a +`format-iceberg/src/maintenance.js` "blocked by icebird" status. But for a day-partitioned archive it is **not needed** (partitions already hold large files), and it is **not run in the daemon** — a full read-rewrite risks the OOM/blocking failure already seen with the parquet sink (the encoder OOMed/blocked -the daemon; exports run manually with a large heap). V1 leaves it as an out-of-band tool that -would tighten the local-vs-global sort, nothing more. The maintenance report -should say "not needed / out-of-band," not "blocked." +the daemon; exports run manually with a large heap). + +The out-of-band tool now exists: `hyp sink maintain --compact` runs +`compactExportTable` from the manual CLI process, gated on a +`compact_file_count` threshold (default 32, via the sink's `maintenance` +config) **and** a `compact_max_bytes` cap (default 128 MB): icebird's rewrite +materializes every live row in memory, so a table whose current snapshot's +`total-files-size` exceeds the cap is skipped with `above-byte-cap` rather +than re-creating the parquet-encoder OOM in the manual process — raise the +cap and the heap together to rewrite a bigger table. The flag is the only +path to a rewrite — `maintain` without it, the daemon loop, and the sink +tick never compact. + +`compactExportTable` stages and commits explicitly (icebird's +`icebergStageRewrite` + `fileCatalogCommit`, the same single-attempt sequence +`icebergRewrite` performs) rather than calling `icebergRewrite`, for two +reasons. First, a rewrite is **not retried** on a concurrent-commit conflict +(it only rewrote the rows it read; a blind retry could drop rows another +writer appended); the next manual run starts from fresh metadata. Second, +because the daemon keeps appending concurrently, lost races are *expected* — +and `icebergRewrite` leaves the staged files (a full rewritten copy of the +table) orphaned in the blob store on a failed commit. Holding the +`StagedUpdate` lets a failed commit reclaim its staged output, the same +orphan-leak class the local cache fixed in #82. A mid-stage failure is +covered too: a tracking resolver records every write that finished so a +rewrite that dies between its first data file and the manifest list still +cleans up. + +Cleanup is **verify-then-delete**, not delete-on-error. A failed commit's +error alone cannot prove the commit missed — a timeout after the conditional +PUT durably landed, or an SDK-internal retry of its own successful write +surfacing 412, both leave the staged rewrite as the table's current snapshot, +and deleting its data files then would corrupt the export. So the catch +re-loads the latest metadata first: if the staged snapshot landed, the run +reports success; only a confirmed-lost conflict deletes the staged files; an +unverifiable outcome leaves the bounded orphans in place and says so in the +error message. + +Every non-compaction outcome carries a `reason` discriminant +(`below-threshold` / `above-byte-cap` / `no-table` / `conflict` / `error`) so +the CLI reports the real cause instead of folding failures into the threshold +skip. `no-table` is reserved for a verifiably absent table (no metadata +files); a metadata *load* failure (auth, corrupt metadata, transient IO) +reports `error`. Unexpected rewrite errors exit nonzero; a lost race exits 0. + +The icebird pin sits at `0.8.10`, which preserves v3 row lineage across +rewrites — export tables are `formatVersion: 3`, so the earlier `0.8.9` +rewrite was not safe for them. ## Observability @@ -222,9 +268,9 @@ Emit the resolved partition spec and sort order on the iceberg sink's ## icebird dependency -All three enablers are implemented in icebird `master` (commit `3edb15b`, -"Scan pruning and sort-on-write (#20, #21, #22)"), **as yet unpublished** (npm -tops out at `0.8.5` pinned / `0.8.8` latest): +The committed `package.json` pin is **`0.8.10`**. All three enablers landed +in icebird commit `3edb15b` ("Scan pruning and sort-on-write (#20, #21, +#22)"), first published as `0.8.9`: - **#20** data-file pruning via partition values + manifest bounds — `prune.js` `partitionMightMatch` / `fileMightMatch`. @@ -233,13 +279,17 @@ tops out at `0.8.5` pinned / `0.8.8` latest): - **#22** sort-on-append — `prepareAppend` sorts each partition group by the table's `default-sort-order-id`; `icebergRewrite` for global compaction. -**Landing requirement:** this work depends on a published icebird containing -`3edb15b` (e.g. `0.8.9`/`0.9.0`); the committed `package.json` pin moves from -`0.8.5` to that version. The bump is a **shared-engine** change — the cache rides -the same icebird — so the cache's tests and hermetic smokes must be re-run to -confirm no regression across the changed `create.js` / `commit.js` / `read.js` / -`transform.js`. During development this worktree builds against a local checkout -of icebird `master`; the pin is updated to the published version before merge. +`0.8.10` additionally preserves v3 row lineage across rewrites; export tables +are `formatVersion: 3`, so `0.8.9`'s rewrite was not safe for them and the +pin must not regress below `0.8.10` while compaction exists +([§Compaction](#compaction)). + +*(Historical: this spec originally landed while `3edb15b` was unpublished — +npm topped out at `0.8.5` pinned / `0.8.8` latest — and development built +against a local icebird `master` checkout.)* The bump was a **shared-engine** +change — the cache rides the same icebird — so any future pin move re-runs +the cache's tests and hermetic smokes to confirm no regression across the +shared `create.js` / `commit.js` / `read.js` / `transform.js`. ## Out of scope @@ -266,6 +316,7 @@ of icebird `master`; the pin is updated to the published version before merge. (`partitionSpecForDeclaration`, `validatePartitionSpecStability`), `format-iceberg/src/commit.js:81-107` (export create+append), `format-iceberg/src/table-format.js:184` (per-dataset `exportDataset`), - `format-iceberg/src/maintenance.js:120` (compaction framing). + `format-iceberg/src/maintenance.js` (`compactExportTable` / + `maintainExportTables`, the out-of-band compaction path). - icebird `master` `3edb15b` — `src/write/sort.js`, `src/write/stage.js`, `src/prune.js`, `src/write/rewrite.js`. diff --git a/package.json b/package.json index 666ab14..9b3b2f3 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "@aws-sdk/credential-provider-ini": "3.972.43", "hyparquet": "1.26.0", "hyparquet-compressors": "1.1.1", - "icebird": "0.8.9", + "icebird": "0.8.10", "squirreling": "0.12.23" }, "optionalDependencies": { diff --git a/src/core/cli/core_commands.js b/src/core/cli/core_commands.js index 85f4118..55a2eea 100644 --- a/src/core/cli/core_commands.js +++ b/src/core/cli/core_commands.js @@ -43,6 +43,7 @@ import { SCAFFOLD_KINDS, scaffoldPlugin } from '../plugin_doctor/scaffold.js' * @import { ExtendedQueryStorageService } from '../cache/types.d.ts' * @import { PluginMetadata } from '../config/types.d.ts' * @import { DaemonInstallOptions, HypAwareStatusReport, ServiceState } from '../daemon/types.d.ts' + * @import { ExportMaintenanceDatasetReport } from '../../../hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts' * @import { ConfirmInstall } from '../plugin_install/types.d.ts' * @import { QueryFormat, RefreshMode } from '../query/types.d.ts' * @import { ExtendedSinkRegistry, ExtendedSourceRegistry } from '../registry/types.d.ts' @@ -308,8 +309,8 @@ function buildCoreCommands() { }, { name: 'sink maintain', - summary: 'Run export maintenance (snapshot expiration) on table-format sinks', - usage: 'hyp sink maintain [instance] [--dry-run]', + summary: 'Run export maintenance (snapshot expiration; data-file compaction with --compact) on table-format sinks', + usage: 'hyp sink maintain [instance] [--compact] [--dry-run]', run: runSinkMaintain, }, { @@ -2253,13 +2254,15 @@ async function runSinkForce(argv, ctx) { } /** - * `hyp sink maintain [instance] [--dry-run]` + * `hyp sink maintain [instance] [--compact] [--dry-run]` * * Runs export maintenance on table-format (Iceberg) sink instances: - * snapshot expiration on exported tables. Data-file compaction is not - * run by this sink — icebird exposes it via `icebergRewrite`, but rewrites - * are out-of-band only (LLP 0022); the command reports - * `compaction_out_of_band` for each dataset. + * snapshot expiration on exported tables, and — only with `--compact` — + * a data-file rewrite via icebird's `icebergRewrite`. + * + * @ref LLP 0022#compaction — rewrites are out-of-band only: this manual + * CLI invocation is the one place they may run. The daemon loop and the + * sink tick never compact. * * @param {string[]} argv * @param {CommandRunContext} ctx @@ -2267,10 +2270,12 @@ async function runSinkForce(argv, ctx) { async function runSinkMaintain(argv, ctx) { let instance = /** @type {string | undefined} */ (undefined) let dryRun = false + let compact = false for (const arg of argv) { if (arg === '--dry-run') { dryRun = true; continue } + if (arg === '--compact') { compact = true; continue } if (arg === '--help' || arg === '-h') { - ctx.stdout.write('usage: hyp sink maintain [instance] [--dry-run]\n') + ctx.stdout.write('usage: hyp sink maintain [instance] [--compact] [--dry-run]\n') return 0 } if (arg.startsWith('--')) { @@ -2316,6 +2321,8 @@ async function runSinkMaintain(argv, ctx) { if (dryRun) ctx.stdout.write('[dry-run]\n') let totalExpired = 0 + let totalCompacted = 0 + let rewriteErrors = 0 for (const handle of targets) { const config = handle.config ?? {} const prefix = typeof config.prefix === 'string' && config.prefix.length > 0 @@ -2326,16 +2333,21 @@ async function runSinkMaintain(argv, ctx) { blobStore: handle.blobStore, prefix, config: typeof config.maintenance === 'object' ? config.maintenance : undefined, + compact, dryRun, }) for (const d of report.datasets) { const actions = [] if (d.snapshotsExpired > 0) actions.push(`expired ${d.snapshotsExpired} snapshots (was ${d.snapshotsBefore})`) - if (!d.compactionSupported) actions.push('compaction_out_of_band') + if (d.compacted) actions.push(`compacted ${d.dataFilesBefore} -> ${d.dataFilesAfter} data files`) + else if (compact) actions.push(describeCompactionSkip(d)) + if (actions.length === 0) actions.push('nothing to do') ctx.stdout.write(` ${handle.instanceName}/${d.dataset}: ${actions.join(', ')}\n`) + if (d.compactionReason === 'error') rewriteErrors += 1 } totalExpired += report.totalSnapshotsExpired + totalCompacted += report.totalTablesCompacted if (report.datasets.length === 0) { ctx.stdout.write(` ${handle.instanceName}: no exported datasets found\n`) @@ -2343,12 +2355,43 @@ async function runSinkMaintain(argv, ctx) { } ctx.stdout.write( - `sink maintain: ${totalExpired} snapshots expired` + - ' (compaction not run by this sink — out-of-band only, see LLP 0022)\n' + compact + ? `sink maintain: ${totalExpired} snapshots expired, ${totalCompacted} tables compacted\n` + : `sink maintain: ${totalExpired} snapshots expired` + + ' (data-file compaction is out-of-band: re-run with --compact, see LLP 0022)\n' ) + if (rewriteErrors > 0) { + ctx.stderr.write(`sink maintain: ${rewriteErrors} rewrite(s) failed\n`) + return 1 + } return 0 } +/** + * Render the precise reason a requested compaction did not commit, so the + * operator can tell an idle table from a failed rewrite (LLP 0022). The + * `compactionReason` discriminant comes from `compactExportTable`. + * + * @param {ExportMaintenanceDatasetReport} d + * @returns {string} + */ +function describeCompactionSkip(d) { + switch (d.compactionReason) { + case 'below-threshold': + return 'compaction_skipped (below compact_file_count)' + case 'above-byte-cap': + return 'compaction_skipped (table exceeds compact_max_bytes; raise the cap and the heap to rewrite)' + case 'no-table': + return 'compaction_skipped (no table metadata)' + case 'conflict': + return 'compaction_conflict (concurrent commit won the race; staged files cleaned up — re-run to retry from fresh metadata)' + case 'error': + return `compaction_failed (${d.compactionError ?? 'unknown error'})` + default: + return 'compaction_skipped' + } +} + /* ---------- misc ---------- */ /** diff --git a/test/core/sink-maintain-command.test.js b/test/core/sink-maintain-command.test.js new file mode 100644 index 0000000..cf9474e --- /dev/null +++ b/test/core/sink-maintain-command.test.js @@ -0,0 +1,246 @@ +// @ts-check + +import assert from 'node:assert/strict' +import test from 'node:test' + +import { + fileCatalog, + icebergAppend, + icebergCreateTable, + icebergRead, + loadLatestFileCatalogMetadata, +} from 'icebird' + +import { + createBlobStoreIO, + tableUrlForBlobPrefix, +} from '../../hypaware-core/plugins-workspace/format-iceberg/src/blob-io.js' +import { registerCoreCommands } from '../../src/core/cli/core_commands.js' +import { createCommandRegistry } from '../../src/core/registry/commands.js' + +/** + * @import { BlobStore } from '../../collectivus-plugin-kernel-types.d.ts' + */ + +const DATASET = 'maintain_rows' +const PREFIX = 'iceberg/datasets' + +/** + * In-memory BlobStore speaking the same conditional-write protocol the + * iceberg blob-io adapter expects: a conflicting `ifNoneMatch: '*'` put + * throws `errorKind: 'blob_precondition_failed'`. + * + * @returns {BlobStore & { objects: Map }} + */ +function makeMemoryBlobStore() { + /** @type {Map} */ + const objects = new Map() + return { + kind: 'memory', + objects, + async putObject(input) { + if (input.ifNoneMatch === '*' && objects.has(input.key)) { + const err = /** @type {Error & { errorKind: string }} */ ( + new Error(`object already exists at '${input.key}'`) + ) + err.errorKind = 'blob_precondition_failed' + throw err + } + objects.set(input.key, input.body) + return { key: input.key } + }, + async getObject(input) { + const bytes = objects.get(input.key) + if (!bytes) return null + return { body: bytes, contentLength: bytes.byteLength } + }, + listObjects(input) { + const prefix = input?.prefix ?? '' + const keys = Array.from(objects.keys()) + .filter((key) => key.startsWith(prefix)) + .sort() + return { + async *[Symbol.asyncIterator]() { + for (const key of keys) { + const bytes = objects.get(key) + yield { key, size: bytes?.byteLength ?? 0 } + } + }, + } + }, + async deleteObject(input) { + objects.delete(input.key) + }, + } +} + +/** + * Create a real 2-data-file iceberg table in the memory blob store at + * `/` so `hyp sink maintain` discovers and (with + * --compact + compact_file_count=2) rewrites it. + * + * @param {BlobStore} blobStore + */ +async function seedTable(blobStore) { + const tableUrl = tableUrlForBlobPrefix(`${PREFIX}/${DATASET}`) + const { resolver, lister } = await createBlobStoreIO(blobStore) + const catalog = fileCatalog({ resolver, lister, conditionalCommits: true }) + const schema = { + type: /** @type {const} */ ('struct'), + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: /** @type {const} */ ('long') }, + { id: 2, name: 'value', required: false, type: /** @type {const} */ ('string') }, + ], + } + await icebergCreateTable({ catalog, tableUrl, schema, formatVersion: 3 }) + await icebergAppend({ catalog, tableUrl, records: [{ id: 1n, value: 'a' }, { id: 2n, value: 'b' }] }) + await icebergAppend({ catalog, tableUrl, records: [{ id: 3n, value: 'c' }] }) + return { tableUrl, resolver, lister } +} + +/** + * @param {BlobStore} blobStore + * @param {{ compact_file_count?: number }} [maintenance] + */ +function makeHandle(blobStore, maintenance) { + return { + kind: 'table-format', + tableFormat: 'iceberg', + instanceName: 'lake', + blobStore, + config: { prefix: PREFIX, maintenance: { compact_file_count: 2, ...maintenance } }, + } +} + +/** + * Run the registered `sink maintain` command body the same way dispatch + * would, against a fake CommandRunContext carrying the sink handles. + * + * @param {string[]} argv + * @param {object[]} handles + */ +async function runMaintain(argv, handles) { + const registry = createCommandRegistry() + registerCoreCommands(registry) + const command = registry.get('sink maintain') + assert.ok(command, 'sink maintain is registered') + const stdout = makeBuf() + const stderr = makeBuf() + const ctx = /** @type {any} */ ({ + stdout, + stderr, + sinks: { listHandles: () => handles }, + }) + const code = await command.run(argv, ctx) + return { code, stdout: stdout.text(), stderr: stderr.text() } +} + +function makeBuf() { + let value = '' + return { + /** @param {unknown} chunk */ + write(chunk) { + value += String(chunk) + return true + }, + text() { + return value + }, + } +} + +/** + * @param {{ tableUrl: string, resolver: any, lister: any }} io + */ +async function dataFileCount({ tableUrl, resolver, lister }) { + const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + const currentId = metadata['current-snapshot-id'] + const snapshot = metadata.snapshots?.find((s) => String(s['snapshot-id']) === String(currentId)) + return Number(snapshot?.summary?.['total-data-files'] ?? NaN) +} + +test('sink maintain without --compact never rewrites and exits 0', async () => { + const blobStore = makeMemoryBlobStore() + const io = await seedTable(blobStore) + + const { code, stdout } = await runMaintain([], [makeHandle(blobStore)]) + assert.equal(code, 0) + assert.match(stdout, /data-file compaction is out-of-band: re-run with --compact/) + assert.doesNotMatch(stdout, /compacted \d/) + assert.equal(await dataFileCount(io), 2, 'table untouched by the default maintain path') +}) + +test('sink maintain --compact rewrites past the configured threshold and exits 0', async () => { + const blobStore = makeMemoryBlobStore() + const io = await seedTable(blobStore) + + const { code, stdout } = await runMaintain(['--compact'], [makeHandle(blobStore)]) + assert.equal(code, 0) + assert.match(stdout, /lake\/maintain_rows: .*compacted 2 -> 1 data files/) + assert.match(stdout, /1 tables compacted/) + assert.equal(await dataFileCount(io), 1, 'rewrite consolidated the data files') + + const { metadata } = await loadLatestFileCatalogMetadata(io) + const rows = await icebergRead({ tableUrl: io.tableUrl, metadata, resolver: io.resolver }) + assert.deepEqual( + rows.map((r) => ({ id: r.id, value: r.value })).sort((a, b) => Number(a.id - b.id)), + [{ id: 1n, value: 'a' }, { id: 2n, value: 'b' }, { id: 3n, value: 'c' }], + 'every row survives the CLI-driven rewrite' + ) +}) + +test('sink maintain --compact reports a commit conflict and exits 0', async () => { + const blobStore = makeMemoryBlobStore() + await seedTable(blobStore) + + // Lose the race on the rewrite's conditional metadata commit: the + // dataset's table already has v1..v3, so the rewrite targets v4. + let armed = true + const conflicting = /** @type {BlobStore} */ ({ + ...blobStore, + async putObject(input) { + if (armed && input.ifNoneMatch === '*' && /metadata\/v\d+\.metadata\.json$/.test(input.key)) { + armed = false + const err = /** @type {Error & { errorKind: string }} */ ( + new Error(`object already exists at '${input.key}'`) + ) + err.errorKind = 'blob_precondition_failed' + throw err + } + return blobStore.putObject(input) + }, + }) + + const { code, stdout, stderr } = await runMaintain(['--compact'], [makeHandle(conflicting)]) + assert.equal(code, 0, 'a lost race is an expected outcome, not a failure') + assert.match(stdout, /compaction_conflict/) + assert.equal(stderr, '') +}) + +test('sink maintain --compact exits 1 when the rewrite fails', async () => { + const blobStore = makeMemoryBlobStore() + await seedTable(blobStore) + + const failing = /** @type {BlobStore} */ ({ + ...blobStore, + async putObject(input) { + if (input.ifNoneMatch === '*' && /metadata\/v\d+\.metadata\.json$/.test(input.key)) { + throw new Error('s3 access denied') + } + return blobStore.putObject(input) + }, + }) + + const { code, stdout, stderr } = await runMaintain(['--compact'], [makeHandle(failing)]) + assert.equal(code, 1, 'unexpected rewrite errors exit nonzero (LLP 0022)') + assert.match(stdout, /compaction_failed/) + assert.match(stdout, /access denied/) + assert.match(stderr, /1 rewrite\(s\) failed/) +}) + +test('sink maintain rejects unknown flags with exit 2', async () => { + const { code, stderr } = await runMaintain(['--frobnicate'], []) + assert.equal(code, 2) + assert.match(stderr, /unknown flag '--frobnicate'/) +}) diff --git a/test/plugins/iceberg-maintenance.test.js b/test/plugins/iceberg-maintenance.test.js new file mode 100644 index 0000000..0328f65 --- /dev/null +++ b/test/plugins/iceberg-maintenance.test.js @@ -0,0 +1,340 @@ +// @ts-check + +import assert from 'node:assert/strict' +import fs from 'node:fs/promises' +import os from 'node:os' +import path from 'node:path' +import test from 'node:test' + +import { + fileCatalog, + icebergAppend, + icebergCreateTable, + icebergRead, + loadLatestFileCatalogMetadata, +} from 'icebird' + +import { + compactExportTable, + normalizeExportRetentionConfig, +} from '../../hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js' +import { createLocalIcebergIO, tableUrlForDir } from '../../src/core/cache/iceberg/resolver.js' + +test('normalizeExportRetentionConfig fills defaults', () => { + const cfg = normalizeExportRetentionConfig(undefined) + assert.equal(cfg.min_snapshots_to_keep, 10) + assert.equal(cfg.max_snapshot_age_hours, 24) + assert.equal(cfg.compact_file_count, 32) + assert.equal(cfg.compact_max_bytes, 128 * 1024 * 1024) +}) + +test('normalizeExportRetentionConfig honors overrides', () => { + const cfg = normalizeExportRetentionConfig({ compact_file_count: 2, min_snapshots_to_keep: 1 }) + assert.equal(cfg.compact_file_count, 2) + assert.equal(cfg.min_snapshots_to_keep, 1) + assert.equal(cfg.max_snapshot_age_hours, 24) +}) + +test('compactExportTable reports no compaction for a missing table', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { resolver, lister } = await createLocalIcebergIO() + const result = await compactExportTable({ + tableUrl: tableUrlForDir(path.join(dir, 'missing')), + resolver, + lister, + compactFileCount: 2, + }) + assert.deepEqual(result, { compacted: false, reason: 'no-table', dataFilesBefore: 0, dataFilesAfter: 0 }) + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable reports a metadata load failure as reason=error, not no-table', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { resolver } = await createLocalIcebergIO() + // An auth/IO-shaped listing failure (not ENOENT, not an empty listing) + // must not be folded into the idle-table skip: the CLI exits 0 for + // 'no-table' but must exit nonzero for a load that actually failed. + const failingLister = async () => { + throw new Error('access denied: s3:ListBucket') + } + const result = await compactExportTable({ + tableUrl: tableUrlForDir(path.join(dir, 'missing')), + resolver, + lister: failingLister, + compactFileCount: 2, + }) + assert.equal(result.compacted, false) + assert.equal(result.reason, 'error') + assert.match(result.error ?? '', /access denied/) + await fs.rm(dir, { recursive: true, force: true }) +}) + +/** + * Create a 2-data-file, 3-row v3 table for the compaction tests. + * + * @param {string} dir + * @returns {Promise<{ tableUrl: string, resolver: any, lister: any, rows: { id: bigint, value: string }[] }>} + */ +async function createTwoFileTable(dir) { + const tableUrl = tableUrlForDir(path.join(dir, 'rows')) + const { resolver, lister } = await createLocalIcebergIO() + const catalog = fileCatalog({ resolver, lister, conditionalCommits: true }) + + const schema = { + type: /** @type {const} */ ('struct'), + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: /** @type {const} */ ('long') }, + { id: 2, name: 'value', required: false, type: /** @type {const} */ ('string') }, + ], + } + await icebergCreateTable({ catalog, tableUrl, schema, formatVersion: 3 }) + // Two appends -> two data files. + await icebergAppend({ catalog, tableUrl, records: [{ id: 1n, value: 'a' }, { id: 2n, value: 'b' }] }) + await icebergAppend({ catalog, tableUrl, records: [{ id: 3n, value: 'c' }] }) + return { + tableUrl, + resolver, + lister, + rows: [{ id: 1n, value: 'a' }, { id: 2n, value: 'b' }, { id: 3n, value: 'c' }], + } +} + +/** + * Read the table back and project to user columns, sorted by id, so a + * rewrite that dropped or mangled rows fails a deep-equal (v3 reads also + * carry `_row_id` / `_last_updated_sequence_number` lineage columns). + * + * @param {{ tableUrl: string, resolver: any, lister: any }} opts + * @returns {Promise<{ id: bigint, value: string }[]>} + */ +async function readUserRows({ tableUrl, resolver, lister }) { + const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + const rows = await icebergRead({ tableUrl, metadata, resolver }) + return rows + .map((r) => ({ id: r.id, value: r.value })) + .sort((a, b) => Number(a.id - b.id)) +} + +test('compactExportTable rewrites a v3 table once the data-file threshold is reached', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister, rows } = await createTwoFileTable(dir) + + const below = await compactExportTable({ tableUrl, resolver, lister, compactFileCount: 3 }) + assert.equal(below.compacted, false, 'threshold not reached -> no rewrite') + assert.equal(below.reason, 'below-threshold') + assert.equal(below.dataFilesBefore, 2) + + const dry = await compactExportTable({ tableUrl, resolver, lister, compactFileCount: 2, dryRun: true }) + assert.equal(dry.compacted, true, 'dryRun reports the rewrite without committing') + assert.equal(dry.dataFilesAfter, dry.dataFilesBefore, 'dryRun does not rewrite') + + const result = await compactExportTable({ tableUrl, resolver, lister, compactFileCount: 2 }) + assert.equal(result.compacted, true) + assert.equal(result.reason, undefined) + assert.equal(result.dataFilesBefore, 2) + assert.equal(result.dataFilesAfter, 1, 'live rows consolidated into one data file') + + const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + assert.equal(metadata['format-version'], 3, 'rewrite preserves format-version 3') + assert.deepEqual( + await readUserRows({ tableUrl, resolver, lister }), + rows, + 'every appended row survives the rewrite byte-for-byte' + ) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable skips a table whose total-files-size exceeds compact_max_bytes', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister } = await createTwoFileTable(dir) + + const result = await compactExportTable({ + tableUrl, resolver, lister, compactFileCount: 2, compactMaxBytes: 1, + }) + assert.equal(result.compacted, false) + assert.equal(result.reason, 'above-byte-cap') + assert.ok(typeof result.totalBytes === 'number' && result.totalBytes > 1, 'reports the offending size') + assert.equal(result.dataFilesAfter, 2, 'table untouched') + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable reports a commit conflict and cleans up the staged files', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister, rows } = await createTwoFileTable(dir) + const before = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + + // Fail the conditional metadata write (the commit point) the way a lost + // race does, while data/manifest staging goes through the real resolver. + /** @type {string[]} */ + const deleted = [] + const conflictResolver = { + ...resolver, + /** @param {string} url @param {{ ifNoneMatch?: string }} [options] */ + writer(url, options) { + if (options?.ifNoneMatch === '*' && url.endsWith('.metadata.json')) { + const err = /** @type {Error & { statusCode: number }} */ (new Error('conditional write collision')) + err.statusCode = 412 + throw err + } + return resolver.writer(url, options) + }, + /** @param {string} url */ + async deleter(url) { + deleted.push(url) + await resolver.deleter(url) + }, + } + + const result = await compactExportTable({ + tableUrl, resolver: conflictResolver, lister, compactFileCount: 2, + }) + assert.equal(result.compacted, false) + assert.equal(result.reason, 'conflict') + assert.ok(result.error, 'conflict carries the underlying error message') + + // Staged data file + manifest + manifest list were all reclaimed. + assert.ok(deleted.length >= 3, `staged files deleted (got ${deleted.length})`) + for (const url of deleted) { + await assert.rejects(fs.access(new URL(url).pathname), 'deleted file is gone from disk') + } + + const after = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + assert.equal(after.version, before.version, 'failed commit leaves table metadata untouched') + assert.deepEqual(await readUserRows({ tableUrl, resolver, lister }), rows, 'original rows intact') + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable reports a non-conflict commit failure as reason=error and leaves staged files', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister } = await createTwoFileTable(dir) + + /** @type {string[]} */ + const deleted = [] + const failingResolver = { + ...resolver, + /** @param {string} url @param {{ ifNoneMatch?: string }} [options] */ + writer(url, options) { + if (options?.ifNoneMatch === '*' && url.endsWith('.metadata.json')) { + throw new Error('disk full') + } + return resolver.writer(url, options) + }, + /** @param {string} url */ + async deleter(url) { + deleted.push(url) + await resolver.deleter(url) + }, + } + + const result = await compactExportTable({ + tableUrl, resolver: failingResolver, lister, compactFileCount: 2, + }) + assert.equal(result.compacted, false) + assert.equal(result.reason, 'error') + assert.match(result.error ?? '', /disk full/) + // A non-conflict failure cannot prove the commit missed (e.g. a + // timeout after the PUT landed), so staged files must NOT be deleted. + assert.deepEqual(deleted, [], 'no staged files reclaimed on an unverifiable commit failure') + assert.match(result.error ?? '', /left under the table/, 'error explains the deliberate orphans') + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable cleans up partial output when staging fails mid-flight', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister, rows } = await createTwoFileTable(dir) + + // Fail the manifest write — by then the consolidated data file(s) + // already landed, so without tracked cleanup they would leak (icebird + // only reports writtenFiles on a StagedUpdate that completed). + /** @type {string[]} */ + const written = [] + /** @type {string[]} */ + const deleted = [] + const midStageFailingResolver = { + ...resolver, + /** @param {string} url @param {{ ifNoneMatch?: string }} [options] */ + writer(url, options) { + if (url.endsWith('-m0.avro')) throw new Error('stage IO failure') + const writer = resolver.writer(url, options) + const finish = writer.finish.bind(writer) + writer.finish = async () => { + await finish() + written.push(url) + } + return writer + }, + /** @param {string} url */ + async deleter(url) { + deleted.push(url) + await resolver.deleter(url) + }, + } + + const result = await compactExportTable({ + tableUrl, resolver: midStageFailingResolver, lister, compactFileCount: 2, + }) + assert.equal(result.compacted, false) + assert.equal(result.reason, 'error') + assert.match(result.error ?? '', /stage IO failure/) + + const stagedDataFiles = written.filter((url) => url.includes('/data/')) + assert.ok(stagedDataFiles.length >= 1, 'staging wrote at least one data file before failing') + for (const url of stagedDataFiles) { + assert.ok(deleted.includes(url), `partial stage output reclaimed: ${url}`) + await assert.rejects(fs.access(new URL(url).pathname), 'deleted file is gone from disk') + } + assert.deepEqual(await readUserRows({ tableUrl, resolver, lister }), rows, 'original rows intact') + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('compactExportTable reports success when the commit landed despite a thrown 412', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-maint-')) + const { tableUrl, resolver, lister, rows } = await createTwoFileTable(dir) + + // The S3 conditional-write hazard: the SDK retries its own successful + // PUT and surfaces PreconditionFailed even though v is ours. + // Cleanup keyed on the error alone would delete the data files the + // landed commit references, corrupting the table. + /** @type {string[]} */ + const deleted = [] + const landedButThrowingResolver = { + ...resolver, + /** @param {string} url @param {{ ifNoneMatch?: string }} [options] */ + writer(url, options) { + const writer = resolver.writer(url, options) + if (options?.ifNoneMatch === '*' && url.endsWith('.metadata.json')) { + const finish = writer.finish.bind(writer) + writer.finish = async () => { + await finish() + const err = /** @type {Error & { statusCode: number }} */ ( + new Error('PreconditionFailed: retried own successful write') + ) + err.statusCode = 412 + throw err + } + } + return writer + }, + /** @param {string} url */ + async deleter(url) { + deleted.push(url) + await resolver.deleter(url) + }, + } + + const result = await compactExportTable({ + tableUrl, resolver: landedButThrowingResolver, lister, compactFileCount: 2, + }) + assert.equal(result.compacted, true, 'landed commit reported as success, not conflict') + assert.equal(result.dataFilesAfter, 1, 'post-commit metadata reflects the rewrite') + assert.deepEqual(deleted, [], 'no files of the landed commit were deleted') + assert.deepEqual(await readUserRows({ tableUrl, resolver, lister }), rows, 'rows readable after the rewrite') + + await fs.rm(dir, { recursive: true, force: true }) +})