Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@
* # --- Recommended: configurable partition value format ---
* # iceberg.partition.value.datetime.format is a DateTimeFormatter pattern applied to the output
* # partition value used in the filter expression.
* # When CURRENT_DATE is used, the reference datetime is LocalDateTime.now(), so a pattern
* # with HH will embed the current hour automatically — no separate hour config needed.
* # When set, it supersedes iceberg.hourly.partition.enabled.
* #
* # CURRENT_DATE behaviour:
* # - With this property set → LocalDateTime.now(), so HH embeds the live clock-hour.
* # - Without this property (legacy) → LocalDate.now() at midnight, HH stays -00 (backward compat).
Comment thread
debabhishek53 marked this conversation as resolved.
Outdated
*
* # Standard hourly partitions (yyyy-MM-dd-HH) — CURRENT_DATE picks up live hour
* iceberg.partition.value.datetime.format=yyyy-MM-dd-HH # → "2025-04-01-14" (current hour)
Expand Down Expand Up @@ -152,10 +154,18 @@ public class IcebergSource extends FileBasedSource<String, FileAwareInputStream>
/**
* Optional {@link DateTimeFormatter} pattern controlling how the partition value is rendered.
*
* <p>When {@code iceberg.filter.date=CURRENT_DATE} the reference datetime is
* {@link java.time.LocalDateTime#now()}, so a pattern that includes {@code HH} will embed
* the current clock-hour automatically — no separate hour config is needed.
* For a specific date (e.g. {@code 2025-04-03}), the time defaults to midnight (00:00).
* <p><b>CURRENT_DATE behaviour differs between the two paths:</b>
* <ul>
* <li>When this property <em>is</em> set, {@code CURRENT_DATE} resolves to
Comment thread
debabhishek53 marked this conversation as resolved.
Outdated
* {@link java.time.LocalDateTime#now()}, so a pattern that includes {@code HH} embeds the
* live clock-hour automatically — useful for truly hourly-partitioned tables.</li>
* <li>When this property is <em>absent</em> (legacy path), {@code CURRENT_DATE} resolves to
* {@link java.time.LocalDate#now()} at midnight (00:00), preserving the pre-PR behaviour
Comment thread
debabhishek53 marked this conversation as resolved.
Outdated
* where the hour suffix was always {@code -00}. This is the right choice for tables whose
* partitions are daily but formatted as {@code yyyy-MM-dd-00}.</li>
* </ul>
* For a static date value (e.g. {@code 2025-04-03}), the time always defaults to midnight (00:00)
* regardless of which path is used.
*
* <p>Examples:
* <ul>
Expand Down Expand Up @@ -284,28 +294,28 @@ public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state)
* (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is specified separately via
* {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
*
* <p><b>Partition Value Format:</b> Both the input date ({@code iceberg.filter.date}) and the output
* partition value use the pattern specified by {@code iceberg.partition.value.datetime.format}
* (a standard {@link java.time.format.DateTimeFormatter} pattern). Use {@code CURRENT_DATE} as the
* date value to resolve the reference datetime to {@link java.time.LocalDateTime#now()} automatically,
* embedding the current hour when the pattern includes {@code HH}. Examples:
* <p><b>Partition Value Format:</b> The output partition value format is controlled by
* {@code iceberg.partition.value.datetime.format} (a standard {@link java.time.format.DateTimeFormatter}
* pattern). When absent, the legacy {@code iceberg.hourly.partition.enabled} flag drives the format.
*
* <p><b>{@code CURRENT_DATE} resolution:</b>
* <ul>
* <li>{@code yyyy-MM-dd-HH} with date {@code 2025-04-01-05} → {@code 2025-04-01-05}</li>
* <li>{@code dd-MM-yyyy-HH} with date {@code 01-04-2025-00} → {@code 01-04-2025-00}</li>
* <li>{@code yyyyMMdd} with date {@code 20250401} → {@code 20250401} (compact daily)</li>
* <li>With {@code iceberg.partition.value.datetime.format} set → {@link java.time.LocalDateTime#now()},
* so a pattern including {@code HH} embeds the live clock-hour (e.g. {@code 2025-04-08-14}).</li>
* <li>Without that property (legacy) → {@link java.time.LocalDate#now()} at midnight, so the
* hour is always {@code 00} (e.g. {@code 2025-04-08-00}). This preserves the pre-PR
* behaviour for tables that store daily data in {@code yyyy-MM-dd-00} partitions.</li>
* </ul>
* When {@code iceberg.partition.value.datetime.format} is set it supersedes
* {@code iceberg.hourly.partition.enabled}. When absent, the legacy
* {@code iceberg.hourly.partition.enabled} behaviour is preserved for backward compatibility.
* Static date values always default to midnight regardless of which path is used.
*
* <p><b>Configuration Examples:</b>
* <ul>
* <li>Standard daily: {@code iceberg.partition.value.datetime.format=yyyy-MM-dd, iceberg.filter.date=2025-04-03,
* iceberg.lookback.days=3} → partitions: {@code 2025-04-03, 2025-04-02, 2025-04-01}</li>
* <li>Reversed-date hourly: {@code iceberg.partition.value.datetime.format=dd-MM-yyyy-HH,
* iceberg.filter.date=CURRENT_DATE} → {@code 03-04-2025-14, 02-04-2025-14, 01-04-2025-14}</li>
* <li>Dynamic daily: {@code iceberg.filter.date=CURRENT_DATE, iceberg.lookback.days=1}
* → today's partition only (resolved at runtime)</li>
* <li>Truly-hourly (live hour): {@code iceberg.partition.value.datetime.format=yyyy-MM-dd-HH,
* iceberg.filter.date=CURRENT_DATE} → {@code 2025-04-08-14, 2025-04-07-14, 2025-04-06-14}</li>
* <li>Daily-at-midnight (legacy default): {@code iceberg.filter.date=CURRENT_DATE, iceberg.lookback.days=1}
* → {@code 2025-04-08-00} (hour always 00, backward compat)</li>
* </ul>
*
* @param state source state containing filter configuration
Expand Down Expand Up @@ -337,13 +347,27 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state);

// Resolve the reference datetime for the filter.
// CURRENT_DATE uses LocalDateTime.now() so a formatter pattern that includes HH will
// embed the current clock-hour automatically. For a specific date (yyyy-MM-dd) the time
// defaults to midnight (00:00).
// For a specific date (yyyy-MM-dd) the time always defaults to midnight (00:00).
// For CURRENT_DATE:
// - Custom format path (iceberg.partition.value.datetime.format set): LocalDateTime.now() so
// a pattern that includes HH will embed the live clock-hour automatically.
// - Legacy path (no custom format): LocalDate.now().atStartOfDay() (midnight) to preserve the
// pre-PR behavior where CURRENT_DATE always produced a -00 suffix. Users who genuinely need
// the live hour should migrate to iceberg.partition.value.datetime.format=yyyy-MM-dd-HH.
LocalDateTime startDateTime;
if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
startDateTime = LocalDateTime.now();
log.info("Resolved {} placeholder to current datetime: {}", CURRENT_DATE_PLACEHOLDER, startDateTime);
boolean isCustomFormat = state.contains(ICEBERG_PARTITION_VALUE_DATETIME_FORMAT);
if (isCustomFormat) {
startDateTime = LocalDateTime.now();
log.info("Resolved {} to current datetime with live hour (custom format='{}'): {}",
CURRENT_DATE_PLACEHOLDER, state.getProp(ICEBERG_PARTITION_VALUE_DATETIME_FORMAT), startDateTime);
} else {
// Legacy backward-compat: always midnight so the yyyy-MM-dd-HH pattern keeps the old -00 suffix.
startDateTime = LocalDate.now().atStartOfDay();
log.info("Resolved {} to current date at midnight (legacy mode, -00 preserved): {}. "
+ "Set {} to use the live hour.",
CURRENT_DATE_PLACEHOLDER, startDateTime, ICEBERG_PARTITION_VALUE_DATETIME_FORMAT);
Comment thread
debabhishek53 marked this conversation as resolved.
Outdated
}
} else {
// When iceberg.partition.value.datetime.format is explicitly set, the input date must match
// that pattern (consistent input/output format). Legacy path keeps accepting yyyy-MM-dd for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ public void testLookbackPeriodLogic() throws Exception {

@Test
public void testCurrentDatePlaceholder() throws Exception {
// CURRENT_DATE resolves to LocalDateTime.now() so the current hour is embedded automatically.
// The default legacy format (hourly.partition.enabled=true) produces yyyy-MM-dd-HH.
// Legacy path (no iceberg.partition.value.datetime.format set):
// CURRENT_DATE resolves to LocalDate.now() at midnight, so the default yyyy-MM-dd-HH
// pattern always produces a -00 suffix — preserving pre-PR backward compat.
properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
Expand All @@ -351,11 +352,38 @@ public void testCurrentDatePlaceholder() throws Exception {
m.setAccessible(true);
m.invoke(icebergSource, sourceState, mockTable);

String partitionValues = sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
Assert.assertNotNull(partitionValues, "Partition values should be set");
String expectedToday = java.time.LocalDate.now().toString() + "-00";
Assert.assertEquals(partitionValues, expectedToday,
"Legacy CURRENT_DATE should produce today's date with -00 suffix (backward compat)");
}

@Test
public void testCurrentDatePlaceholderWithCustomFormat() throws Exception {
// New path (iceberg.partition.value.datetime.format set to yyyy-MM-dd-HH):
// CURRENT_DATE resolves to LocalDateTime.now() so the live clock-hour is embedded.
properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
properties.setProperty(IcebergSource.ICEBERG_PARTITION_VALUE_DATETIME_FORMAT, "yyyy-MM-dd-HH");
properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
sourceState = new SourceState(new State(properties));

TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
when(mockTable.getTableId()).thenReturn(tableId);
when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
.thenReturn(new java.util.ArrayList<>());

Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
SourceState.class, IcebergTable.class);
m.setAccessible(true);
m.invoke(icebergSource, sourceState, mockTable);

// Assert format rather than exact value to avoid clock-dependent flakiness
String partitionValues = sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
Assert.assertNotNull(partitionValues, "Partition values should be set");
Assert.assertTrue(partitionValues.matches("\\d{4}-\\d{2}-\\d{2}-\\d{2}"),
"Should resolve to yyyy-MM-dd-HH format, got: " + partitionValues);
"Custom format CURRENT_DATE should produce yyyy-MM-dd-HH with live hour, got: " + partitionValues);
}

@Test
Expand Down
Loading