Skip to content

Commit b4e4d4a

Browse files
authored
[GOBBLIN-2163] Add Iceberg-Distcp table metadata validation (for partition copy) (#4064)
1 parent 4b639f6 commit b4e4d4a

3 files changed

Lines changed: 308 additions & 1 deletion

File tree

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
3939
public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
4040
public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
41+
public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality";
42+
// true, requires equality of the partitions' specId as well as the partitions' fields' fieldId
43+
public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true";
4144

4245
public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) {
4346
super(sourceFs, properties);
@@ -46,7 +49,12 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties)
4649
@Override
4750
protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable,
4851
Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException {
49-
// TODO: Add Validator for source and destination tables later
52+
53+
boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
54+
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
55+
56+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
57+
srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
5058

5159
String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE,
5260
ICEBERG_PARTITION_NAME_KEY);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.iceberg.PartitionSpec;
23+
import org.apache.iceberg.Schema;
24+
import org.apache.iceberg.TableMetadata;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
/**
29+
* Validator for Iceberg table metadata, ensuring that the given tables metadata have same schema and partition spec.
30+
*/
31+
@Slf4j
32+
public class IcebergTableMetadataValidatorUtils {
33+
34+
private IcebergTableMetadataValidatorUtils() {
35+
// Do not instantiate
36+
}
37+
38+
/**
39+
* Compares the metadata of the given two iceberg tables.
40+
* <ul>
41+
* <li>First compares the schema of the metadata.</li>
42+
* <li>Then compares the partition spec of the metadata.</li>
43+
* </ul>
44+
* @param tableMetadataA the metadata of the first table
45+
* @param tableMetadataB the metadata of the second table
46+
* @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
47+
* @throws IOException if the schemas or partition spec do not match
48+
*/
49+
public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA,
50+
TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) throws IOException {
51+
log.info("Starting comparison between iceberg tables with metadata file location : {} and {}",
52+
tableMetadataA.metadataFileLocation(),
53+
tableMetadataB.metadataFileLocation());
54+
55+
Schema schemaA = tableMetadataA.schema();
56+
Schema schemaB = tableMetadataB.schema();
57+
// TODO: Need to add support for schema evolution
58+
// This check needs to be broken down into multiple checks to support schema evolution
59+
// Possible cases - schemaA == schemaB,
60+
// - schemaA is subset of schemaB [ schemaB Evolved ],
61+
// - schemaA is superset of schemaB [ schemaA Evolved ],
62+
// - Other cases?
63+
// Also consider using Strategy or any other design pattern for this to make it a better solution
64+
if (!schemaA.sameSchema(schemaB)) {
65+
String errMsg = String.format(
66+
"Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
67+
tableMetadataA.metadataFileLocation(),
68+
schemaA.schemaId(),
69+
tableMetadataB.metadataFileLocation(),
70+
schemaB.schemaId()
71+
);
72+
log.error(errMsg);
73+
throw new IOException(errMsg);
74+
}
75+
76+
PartitionSpec partitionSpecA = tableMetadataA.spec();
77+
PartitionSpec partitionSpecB = tableMetadataB.spec();
78+
// .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
79+
boolean partitionSpecMatch = validateStrictPartitionEquality ? partitionSpecA.equals(partitionSpecB)
80+
: partitionSpecA.compatibleWith(partitionSpecB);
81+
if (!partitionSpecMatch) {
82+
String errMsg = String.format(
83+
"Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}",
84+
tableMetadataA.metadataFileLocation(),
85+
partitionSpecA.specId(),
86+
tableMetadataB.metadataFileLocation(),
87+
partitionSpecB.specId()
88+
);
89+
log.error(errMsg);
90+
throw new IOException(errMsg);
91+
}
92+
93+
log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}",
94+
tableMetadataA.metadataFileLocation(),
95+
tableMetadataB.metadataFileLocation());
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import java.io.IOException;
21+
import java.util.HashMap;
22+
23+
import org.apache.iceberg.PartitionSpec;
24+
import org.apache.iceberg.TableMetadata;
25+
import org.apache.iceberg.avro.AvroSchemaUtil;
26+
import org.apache.iceberg.Schema;
27+
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
28+
import org.testng.Assert;
29+
import org.testng.annotations.Test;
30+
31+
public class IcebergTableMetadataValidatorUtilsTest {
32+
private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned();
33+
private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1")
34+
.fields()
35+
.requiredString("field1")
36+
.requiredString("field2")
37+
.endRecord());
38+
private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
39+
.fields()
40+
.requiredString("field2")
41+
.requiredString("field1")
42+
.endRecord());
43+
private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
44+
.fields()
45+
.requiredString("field1")
46+
.requiredString("field2")
47+
.requiredInt("field3")
48+
.endRecord());
49+
private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
50+
.fields()
51+
.requiredInt("field1")
52+
.requiredString("field2")
53+
.requiredInt("field3")
54+
.endRecord());
55+
private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
56+
.identity("field1")
57+
.build();
58+
private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata(
59+
schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
60+
private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
61+
schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
62+
private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
63+
schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
64+
private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
65+
private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
66+
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
67+
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
68+
@Test
69+
public void testValidateSameSchema() throws IOException {
70+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
71+
tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec,
72+
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
73+
);
74+
Assert.assertTrue(true);
75+
}
76+
77+
@Test
78+
public void testValidateDifferentSchemaFails() {
79+
// Schema 1 and Schema 2 have different field order
80+
81+
TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
82+
unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
83+
84+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
85+
tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
86+
}
87+
88+
@Test
89+
public void testValidateSchemaWithDifferentTypesFails() {
90+
// schema 3 and schema 4 have different field types for field1
91+
92+
TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
93+
unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
94+
95+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
96+
tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
97+
}
98+
99+
@Test
100+
public void testValidateSchemaWithEvolvedSchemaIFails() {
101+
// Schema 3 has one more extra field as compared to Schema 1
102+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
103+
tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
104+
}
105+
106+
@Test
107+
public void testValidateSchemaWithEvolvedSchemaIIFails() {
108+
// TODO: This test should pass in the future when we support schema evolution
109+
// Schema 3 has one more extra field as compared to Schema 1
110+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
111+
tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
112+
}
113+
114+
@Test
115+
public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
116+
// Adding this test as to verify that partition copy doesn't proceed further for this case
117+
// as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
118+
// specially from int to long
119+
Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
120+
.fields()
121+
.requiredLong("field1")
122+
.requiredString("field2")
123+
.requiredInt("field3")
124+
.endRecord());
125+
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
126+
.identity("field1")
127+
.build();
128+
TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
129+
partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());
130+
131+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
132+
tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
133+
}
134+
135+
@Test
136+
public void testValidateSamePartitionSpec() throws IOException {
137+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
138+
tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1,
139+
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
140+
);
141+
Assert.assertTrue(true);
142+
}
143+
144+
@Test
145+
public void testValidatePartitionSpecWithDiffNameFails() {
146+
PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
147+
.identity("field2")
148+
.build();
149+
TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
150+
"tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
151+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
152+
tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
153+
}
154+
155+
@Test
156+
public void testValidatePartitionSpecWithUnpartitionedFails() {
157+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
158+
tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
159+
}
160+
161+
@Test
162+
public void testPartitionSpecWithDifferentTransformFails() {
163+
PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
164+
.truncate("field1", 4)
165+
.build();
166+
TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
167+
"tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
168+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
169+
tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
170+
}
171+
172+
@Test
173+
public void testStrictPartitionSpecEqualityOffVsOn() throws IOException {
174+
PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1)
175+
.identity("field1")
176+
.identity("field2")
177+
.build();
178+
179+
TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = TableMetadata.newTableMetadata(schema1,
180+
partitionSpecWithTwoCols, "tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>());
181+
TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 = tableMetadataWithSchema1AndPartitionSpec1
182+
.updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec());
183+
184+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
185+
tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
186+
updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1,
187+
VALIDATE_STRICT_PARTITION_EQUALITY_FALSE);
188+
Assert.assertTrue(true); // passes w/ non-strict equality...
189+
// but fails when strict equality
190+
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
191+
updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
192+
}
193+
194+
private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata tableAMetadata,
195+
TableMetadata tableBMetadata, String expectedMessage) {
196+
IOException exception = Assert.expectThrows(IOException.class, () -> {
197+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata,
198+
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE);
199+
});
200+
Assert.assertTrue(exception.getMessage().startsWith(expectedMessage));
201+
}
202+
}

0 commit comments

Comments
 (0)