Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 329 additions & 13 deletions hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js

Large diffs are not rendered by default.

59 changes: 57 additions & 2 deletions hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,74 @@ export type BlobIOWriteObserver = (event: BlobIOWriteEvent) => void
export interface ExportRetentionConfig {
min_snapshots_to_keep: number
max_snapshot_age_hours: number
/**
* Rewrite a table once its live data-file count reaches this threshold.
* Only consulted by the out-of-band compaction path (LLP 0022).
*/
compact_file_count: number
/**
* Skip the rewrite when the current snapshot's `total-files-size`
* exceeds this many bytes: icebird's rewrite materializes every live
* row in memory, so an unbounded table would OOM the manual CLI run.
*/
compact_max_bytes: number
}

/**
* Why a requested compaction did not commit a rewrite.
* - `no-table`: the table verifiably does not exist (no metadata files).
* - `below-threshold`: live data-file count under `compact_file_count`.
* - `above-byte-cap`: `total-files-size` over `compact_max_bytes`; raise the
* cap (and the heap) to rewrite anyway.
* - `conflict`: another writer's commit was confirmed to have won the race;
* staged files were cleaned up, re-run to retry from fresh metadata.
* - `error`: the metadata load or the rewrite failed (IO, auth, ...); see
* `error`. A failed commit whose outcome could not be verified also lands
* here, with its staged files deliberately left in place (deleting them
* could corrupt the table if the commit actually landed).
*/
export type ExportCompactionSkipReason =
| 'no-table'
| 'below-threshold'
| 'above-byte-cap'
| 'conflict'
| 'error'

export interface ExportCompactionResult {
compacted: boolean
/** Present iff `compacted` is false. */
reason?: ExportCompactionSkipReason
/** Error message when `reason` is 'conflict' or 'error'. */
error?: string
/** Current snapshot's `total-files-size`, when the byte cap rejected it. */
totalBytes?: number
dataFilesBefore: number
dataFilesAfter: number
}

export interface ExportMaintenanceDatasetReport {
dataset: string
snapshotsExpired: number
snapshotsBefore: number
compactionSupported: false
/** icebird >= 0.8.9 exposes `icebergRewrite`; out-of-band only (LLP 0022). */
compactionSupported: true
/** True when an opt-in rewrite committed (or would have, under dryRun). */
compacted: boolean
/** Present when compaction was requested but did not commit. */
compactionReason?: ExportCompactionSkipReason
/** Present when the rewrite conflicted or failed. */
compactionError?: string
/** Present only when compaction was requested. */
dataFilesBefore?: number
/** Present only when compaction was requested. */
dataFilesAfter?: number
}

export interface ExportMaintenanceReport {
datasets: ExportMaintenanceDatasetReport[]
totalSnapshotsExpired: number
compactionSupported: false
totalTablesCompacted: number
compactionSupported: true
dryRun: boolean
elapsedMs: number
}
Expand Down
95 changes: 74 additions & 21 deletions hypaware-core/smoke/flows/iceberg_export_local_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
import { fileURLToPath, pathToFileURL } from 'node:url'

import {
icebergRead,
Expand Down Expand Up @@ -188,6 +188,11 @@ export async function run({ harness, expect }) {
(v) => v && v.kind === 'table-format' && v.tableFormat === 'iceberg'
)

// Settle the spool so the fixture's rows are committed to their
// routed `source=...` partition table before discovery — appendRows
// only schedules a background flush at the size threshold.
await kernel.storage.flushAll({ force: true })

// Discover the dataset partition the fixture registered and
// simulate one sink tick by calling `exportBatch` directly. (The
// sink driver auto-tick path is still being wired up — flows in
Expand Down Expand Up @@ -261,7 +266,8 @@ export async function run({ harness, expect }) {
)
expect.that('export2: status=exported', exportResult2.status, (v) => v === 'exported')

// Maintenance: snapshot expiration + compaction status.
// Maintenance, default mode: snapshot expiration only — compaction is
// out-of-band (LLP 0022) and must not run without the explicit opt-in.
const maintainReport = await maintainExportTables({
blobStore,
prefix: 'iceberg/datasets',
Expand All @@ -271,29 +277,74 @@ export async function run({ harness, expect }) {
maintainReport.datasets,
(ds) => Array.isArray(ds) && ds.some((d) => d.dataset === DATASET)
)
expect.that(
'maintain: compactionSupported is false (not run by this sink — out-of-band only, LLP 0022)',
maintainReport.compactionSupported,
(v) => v === false
)
const datasetReport = maintainReport.datasets.find((d) => d.dataset === DATASET)
expect.that(
'maintain: dataset report has snapshotsBefore >= 2',
datasetReport?.snapshotsBefore,
(v) => typeof v === 'number' && v >= 2
)
expect.that(
'maintain: per-dataset compactionSupported is false',
datasetReport?.compactionSupported,
'maintain: default run does not compact (out-of-band only, LLP 0022)',
datasetReport?.compacted,
(v) => v === false
)

// After maintenance the table is still readable.
// Opt-in compaction: the two exported batches left (at least) two data
// files; compact_file_count=2 forces a rewrite so the smoke exercises
// the real icebergRewrite path.
const compactReport = await maintainExportTables({
blobStore,
prefix: 'iceberg/datasets',
config: { compact_file_count: 2 },
compact: true,
})
const compactedDataset = compactReport.datasets.find((d) => d.dataset === DATASET)
expect.that(
'maintain --compact: table was compacted',
compactedDataset?.compacted,
(v) => v === true
)
expect.that(
'maintain --compact: rewrite consolidated data files',
compactedDataset,
(d) => d !== undefined &&
typeof d.dataFilesBefore === 'number' && d.dataFilesBefore >= 2 &&
typeof d.dataFilesAfter === 'number' && d.dataFilesAfter < d.dataFilesBefore
)
expect.that(
'maintain --compact: one table compacted in totals',
compactReport.totalTablesCompacted,
(v) => v === 1
)

// After maintenance the table holds exactly the rows both batches
// committed — batch-1 and batch-2 each appended the same ROW_COUNT
// fixture rows, so every id 0..ROW_COUNT-1 must appear exactly twice.
// An exact check here is the smoke-tier guard against the rewrite's
// central risk (LLP 0022): a compaction that silently drops rows.
const { readTableRows: postMaintainRows } = await readIcebergTable(tableUrl, blobStore)
expect.that(
'maintain: table still readable after maintenance',
'maintain: exact row count survives compaction (2 batches x ROW_COUNT)',
postMaintainRows,
(rows) => Array.isArray(rows) && rows.length === 2 * ROW_COUNT
)
expect.that(
'maintain: every fixture id survives compaction exactly twice',
postMaintainRows,
(rows) => Array.isArray(rows) && rows.length >= ROW_COUNT
(rows) => {
if (!Array.isArray(rows)) return false
/** @type {Map<string, number>} */
const counts = new Map()
for (const row of rows) {
const key = String(row.id)
counts.set(key, (counts.get(key) ?? 0) + 1)
}
if (counts.size !== ROW_COUNT) return false
for (let i = 0; i < ROW_COUNT; i++) {
if (counts.get(String(i)) !== 2) return false
}
return true
}
)

await obs.shutdown()
Expand Down Expand Up @@ -403,8 +454,11 @@ async function writeFixturePlugin(dir) {
}

function fixturePluginSource() {
const partitionModuleUrl = pathToFileURL(
path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js')
).href
return `// auto-generated by iceberg_export_local_fs smoke; fixture: @hypaware/test-fixture
import path from 'node:path'
import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)}

/**
* @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts'
Expand All @@ -423,15 +477,14 @@ const dataset = {
plugin: '@hypaware/test-fixture',
schema: { columns: COLUMNS },
primaryTimestampColumn: undefined,
discoverPartitions(ctx) {
// The storage spool routes flushed rows to '<dataset>/source=<client>/table/',
// so partition discovery must go through the kernel's discovery helper
// instead of guessing a static path.
async discoverPartitions(ctx) {
const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? ''
return [
{
dataset: DATASET,
partition: { partition: 'all' },
tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '',
},
]
if (!cacheDir) return []
const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] })
return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path }))
},
async createDataSource(partitions, ctx) {
const partition = partitions[0]
Expand Down
27 changes: 17 additions & 10 deletions hypaware-core/smoke/flows/iceberg_export_s3_fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { Buffer } from 'node:buffer'
import fs from 'node:fs/promises'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
import { fileURLToPath, pathToFileURL } from 'node:url'
import { Readable } from 'node:stream'

import {
Expand Down Expand Up @@ -269,6 +269,11 @@ export async function run({ harness, expect }) {
storage: kernel.storage,
})

// Settle the spool so the fixture's rows are committed to their
// routed `source=...` partition table before discovery — appendRows
// only schedules a background flush at the size threshold.
await kernel.storage.flushAll({ force: true })

const dataset = kernel.query.getDataset(DATASET)
if (!dataset) throw new Error(`fixture dataset ${DATASET} did not register`)
const partitions = await dataset.discoverPartitions({
Expand Down Expand Up @@ -387,8 +392,11 @@ async function writeFixturePlugin(dir) {
}

function fixturePluginSource() {
const partitionModuleUrl = pathToFileURL(
path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js')
).href
return `// auto-generated by iceberg_export_s3_fixture smoke; fixture: @hypaware/test-fixture
import path from 'node:path'
import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)}

/**
* @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts'
Expand All @@ -407,15 +415,14 @@ const dataset = {
plugin: '@hypaware/test-fixture',
schema: { columns: COLUMNS },
primaryTimestampColumn: undefined,
discoverPartitions(ctx) {
// The storage spool routes flushed rows to '<dataset>/source=<client>/table/',
// so partition discovery must go through the kernel's discovery helper
// instead of guessing a static path.
async discoverPartitions(ctx) {
const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? ''
return [
{
dataset: DATASET,
partition: { partition: 'all' },
tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '',
},
]
if (!cacheDir) return []
const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] })
return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path }))
},
async createDataSource(partitions, ctx) {
const partition = partitions[0]
Expand Down
27 changes: 17 additions & 10 deletions hypaware-core/smoke/flows/iceberg_export_s3_roundtrip.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
import { fileURLToPath, pathToFileURL } from 'node:url'

import {
icebergRead,
Expand Down Expand Up @@ -188,6 +188,11 @@ export async function run({ harness, expect }) {
storage: kernel.storage,
})

// Settle the spool so the fixture's rows are committed to their
// routed `source=...` partition table before discovery — appendRows
// only schedules a background flush at the size threshold.
await kernel.storage.flushAll({ force: true })

const dataset = kernel.query.getDataset(DATASET)
if (!dataset) throw new Error(`fixture dataset ${DATASET} did not register`)
const partitions = await dataset.discoverPartitions({
Expand Down Expand Up @@ -272,8 +277,11 @@ async function writeFixturePlugin(dir) {
}

function fixturePluginSource() {
const partitionModuleUrl = pathToFileURL(
path.resolve(SMOKE_DIR, '../../../src/core/cache/partition.js')
).href
return `// auto-generated by iceberg_export_s3_roundtrip smoke; fixture: @hypaware/test-fixture
import path from 'node:path'
import { discoverCachePartitions } from ${JSON.stringify(partitionModuleUrl)}

/**
* @import { ActivePlugin, BlobStore, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts'
Expand All @@ -292,15 +300,14 @@ const dataset = {
plugin: '@hypaware/test-fixture',
schema: { columns: COLUMNS },
primaryTimestampColumn: undefined,
discoverPartitions(ctx) {
// The storage spool routes flushed rows to '<dataset>/source=<client>/table/',
// so partition discovery must go through the kernel's discovery helper
// instead of guessing a static path.
async discoverPartitions(ctx) {
const cacheDir = ctx.cacheDir ?? activatedStorage?.cacheRoot ?? ''
return [
{
dataset: DATASET,
partition: { partition: 'all' },
tablePath: cacheDir ? path.join(cacheDir, 'datasets', DATASET, 'all') : '',
},
]
if (!cacheDir) return []
const discovered = await discoverCachePartitions(cacheDir, { datasets: [DATASET] })
return discovered.map((p) => ({ dataset: DATASET, partition: p.partition, tablePath: p.path }))
},
async createDataSource(partitions, ctx) {
const partition = partitions[0]
Expand Down
Loading
Loading