Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@
* # --- Recommended: configurable partition value format ---
* # iceberg.partition.value.datetime.format is a DateTimeFormatter pattern applied to the output
* # partition value used in the filter expression.
* # When CURRENT_DATE is used, the reference datetime is LocalDateTime.now(), so a pattern
* # with HH will embed the current hour automatically — no separate hour config needed.
* # When set, it supersedes iceberg.hourly.partition.enabled.
* #
* # CURRENT_DATE behaviour:
* # - iceberg.partition.value.datetime.format set (recommended):
* # LocalDateTime.now() — HH embeds the live clock-hour (e.g. 2025-04-01-14).
* # - iceberg.partition.value.datetime.format absent — falls back to the legacy flag
* # iceberg.hourly.partition.enabled (kept for backward compat only, superseded by the above):
* # hourly.partition.enabled=true (default) → LocalDate.now() at midnight → -00 suffix preserved.
* # hourly.partition.enabled=false → LocalDate.now() at midnight → yyyy-MM-dd, no hour.
*
* # Standard hourly partitions (yyyy-MM-dd-HH) — CURRENT_DATE picks up live hour
* iceberg.partition.value.datetime.format=yyyy-MM-dd-HH # → "2025-04-01-14" (current hour)
Expand Down Expand Up @@ -152,10 +158,26 @@ public class IcebergSource extends FileBasedSource<String, FileAwareInputStream>
/**
* Optional {@link DateTimeFormatter} pattern controlling how the partition value is rendered.
*
* <p>When {@code iceberg.filter.date=CURRENT_DATE} the reference datetime is
* {@link java.time.LocalDateTime#now()}, so a pattern that includes {@code HH} will embed
* the current clock-hour automatically — no separate hour config is needed.
* For a specific date (e.g. {@code 2025-04-03}), the time defaults to midnight (00:00).
* <p><b>CURRENT_DATE resolution — two paths:</b>
* <ul>
* <li><b>Recommended:</b> when {@code iceberg.partition.value.datetime.format} is set,
* {@code CURRENT_DATE} resolves to {@link java.time.LocalDateTime#now()}, embedding the live
* clock-hour when the pattern includes {@code HH} (e.g. {@code 2025-04-01-14}).</li>
* <li><b>Legacy fallback:</b> when {@code iceberg.partition.value.datetime.format} is absent,
* behaviour is controlled by the legacy flag {@code iceberg.hourly.partition.enabled}
* (superseded by this property; kept only for backward compatibility):
* <ul>
* <li>{@code iceberg.hourly.partition.enabled=true} (default) — {@code CURRENT_DATE}
* resolves to {@link java.time.LocalDate#now()} at midnight, so the
* {@code yyyy-MM-dd-HH} pattern always produces a {@code -00} suffix.</li>
* <li>{@code iceberg.hourly.partition.enabled=false} — {@code CURRENT_DATE} resolves to
* {@link java.time.LocalDate#now()} at midnight with a {@code yyyy-MM-dd} pattern;
* no hour is rendered.</li>
* </ul>
* </li>
* </ul>
* For a static date value (e.g. {@code 2025-04-03}), the time always defaults to midnight (00:00)
* regardless of which path is used.
*
* <p>Examples:
* <ul>
Expand Down Expand Up @@ -284,28 +306,28 @@ public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state)
* (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is specified separately via
* {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
*
* <p><b>Partition Value Format:</b> Both the input date ({@code iceberg.filter.date}) and the output
* partition value use the pattern specified by {@code iceberg.partition.value.datetime.format}
* (a standard {@link java.time.format.DateTimeFormatter} pattern). Use {@code CURRENT_DATE} as the
* date value to resolve the reference datetime to {@link java.time.LocalDateTime#now()} automatically,
* embedding the current hour when the pattern includes {@code HH}. Examples:
* <p><b>Partition Value Format:</b> The output partition value format is controlled by
* {@code iceberg.partition.value.datetime.format} (a standard {@link java.time.format.DateTimeFormatter}
* pattern). When absent, the legacy {@code iceberg.hourly.partition.enabled} flag drives the format.
*
* <p><b>{@code CURRENT_DATE} resolution:</b>
* <ul>
* <li>{@code yyyy-MM-dd-HH} with date {@code 2025-04-01-05} → {@code 2025-04-01-05}</li>
* <li>{@code dd-MM-yyyy-HH} with date {@code 01-04-2025-00} → {@code 01-04-2025-00}</li>
* <li>{@code yyyyMMdd} with date {@code 20250401} → {@code 20250401} (compact daily)</li>
* <li>With {@code iceberg.partition.value.datetime.format} set → {@link java.time.LocalDateTime#now()},
* so a pattern including {@code HH} embeds the live clock-hour (e.g. {@code 2025-04-08-14}).</li>
* <li>Without that property (legacy) → {@link java.time.LocalDate#now()} at midnight, so the
* hour is always {@code 00} (e.g. {@code 2025-04-08-00}). This preserves the pre-PR
* behaviour for tables that store daily data in {@code yyyy-MM-dd-00} partitions.</li>
* </ul>
* When {@code iceberg.partition.value.datetime.format} is set it supersedes
* {@code iceberg.hourly.partition.enabled}. When absent, the legacy
* {@code iceberg.hourly.partition.enabled} behaviour is preserved for backward compatibility.
* Static date values always default to midnight regardless of which path is used.
*
* <p><b>Configuration Examples:</b>
* <ul>
* <li>Standard daily: {@code iceberg.partition.value.datetime.format=yyyy-MM-dd, iceberg.filter.date=2025-04-03,
* iceberg.lookback.days=3} → partitions: {@code 2025-04-03, 2025-04-02, 2025-04-01}</li>
* <li>Reversed-date hourly: {@code iceberg.partition.value.datetime.format=dd-MM-yyyy-HH,
* iceberg.filter.date=CURRENT_DATE} → {@code 03-04-2025-14, 02-04-2025-14, 01-04-2025-14}</li>
* <li>Dynamic daily: {@code iceberg.filter.date=CURRENT_DATE, iceberg.lookback.days=1}
* → today's partition only (resolved at runtime)</li>
* <li>Truly-hourly (live hour): {@code iceberg.partition.value.datetime.format=yyyy-MM-dd-HH,
* iceberg.filter.date=CURRENT_DATE} → {@code 2025-04-08-14, 2025-04-07-14, 2025-04-06-14}</li>
* <li>Daily-at-midnight (legacy default): {@code iceberg.filter.date=CURRENT_DATE, iceberg.lookback.days=1}
* → {@code 2025-04-08-00} (hour always 00, backward compat)</li>
* </ul>
*
* @param state source state containing filter configuration
Expand Down Expand Up @@ -337,13 +359,35 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);

// Resolve the reference datetime for the filter.
// CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that includes HH will
// embed the current clock-hour automatically. For a specific date (yyyy-MM-dd) the time
// defaults to midnight (00:00).
// For a specific date (yyyy-MM-dd) the time always defaults to midnight (00:00).
// For CURRENT_DATE:
// - Custom format path (iceberg.partition.value.datetime.format set): LocalDateTime.now() so
// a pattern that includes HH will embed the live clock-hour automatically.
// - Legacy hourly path (no custom format, hourly.partition.enabled=true): LocalDate.now().atStartOfDay()
// to preserve the pre-PR behavior where CURRENT_DATE always produced a -00 suffix.
// - Legacy daily path (no custom format, hourly.partition.enabled=false): LocalDate.now().atStartOfDay()
// with yyyy-MM-dd pattern; hour is not rendered.
LocalDateTime startDateTime;
if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
startDateTime = LocalDateTime.now();
log.info("Resolved {} placeholder to current datetime: {}", CURRENT_DATE_PLACEHOLDER, startDateTime);
boolean isCustomFormat = state.contains(ICEBERG_PARTITION_VALUE_DATETIME_FORMAT);
if (isCustomFormat) {
startDateTime = LocalDateTime.now();
log.info("Resolved {} to current datetime with live hour (custom format='{}'): {}",
CURRENT_DATE_PLACEHOLDER, state.getProp(ICEBERG_PARTITION_VALUE_DATETIME_FORMAT), startDateTime);
} else {
boolean isHourly = state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, DEFAULT_HOURLY_PARTITION_ENABLED);
startDateTime = LocalDate.now().atStartOfDay();
if (isHourly) {
// Legacy hourly backward-compat: midnight ensures yyyy-MM-dd-HH pattern keeps the old -00 suffix.
log.info("Resolved {} to current date at midnight (legacy hourly mode, -00 preserved): {}. "
+ "Set {} to use the live hour.",
CURRENT_DATE_PLACEHOLDER, startDateTime, ICEBERG_PARTITION_VALUE_DATETIME_FORMAT);
} else {
// Legacy daily: pattern is yyyy-MM-dd, hour is not rendered.
log.info("Resolved {} to current date at midnight (legacy daily mode): {}.",
CURRENT_DATE_PLACEHOLDER, startDateTime);
}
}
} else {
// When iceberg.partition.value.datetime.format is explicitly set, the input date must match
// that pattern (consistent input/output format). Legacy path keeps accepting yyyy-MM-dd for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ public void testLookbackPeriodLogic() throws Exception {

@Test
public void testCurrentDatePlaceholder() throws Exception {
// CURRENT_DATE resolves to LocalDateTime.now() so the current hour is embedded automatically.
// The default legacy format (hourly.partition.enabled=true) produces yyyy-MM-dd-HH.
// Legacy path (no iceberg.partition.value.datetime.format set):
// CURRENT_DATE resolves to LocalDate.now() at midnight, so the default yyyy-MM-dd-HH
// pattern always produces a -00 suffix — preserving pre-PR backward compat.
properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
Expand All @@ -351,11 +352,38 @@ public void testCurrentDatePlaceholder() throws Exception {
m.setAccessible(true);
m.invoke(icebergSource, sourceState, mockTable);

String partitionValues = sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
Assert.assertNotNull(partitionValues, "Partition values should be set");
String expectedToday = java.time.LocalDate.now().toString() + "-00";
Assert.assertEquals(partitionValues, expectedToday,
"Legacy CURRENT_DATE should produce today's date with -00 suffix (backward compat)");
}

@Test
public void testCurrentDatePlaceholderWithCustomFormat() throws Exception {
// New path (iceberg.partition.value.datetime.format set to yyyy-MM-dd-HH):
// CURRENT_DATE resolves to LocalDateTime.now() so the live clock-hour is embedded.
properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_DATETIME_FORMAT, "yyyy-MM-dd-HH");
properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
sourceState = new SourceState(new State(properties));

TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
when(mockTable.getTableId()).thenReturn(tableId);
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
.thenReturn(new java.util.ArrayList<>());

Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
SourceState.class, IcebergTable.class);
m.setAccessible(true);
m.invoke(icebergSource, sourceState, mockTable);

// Assert format rather than exact value to avoid clock-dependent flakiness
String partitionValues = sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
Assert.assertNotNull(partitionValues, "Partition values should be set");
Assert.assertTrue(partitionValues.matches("\\d{4}-\\d{2}-\\d{2}-\\d{2}"),
"Should resolve to yyyy-MM-dd-HH format, got: " + partitionValues);
"Custom format CURRENT_DATE should produce yyyy-MM-dd-HH with live hour, got: " + partitionValues);
}

@Test
Expand Down
Loading