Skip to content

Commit ffd226c

Browse files
tmaterImpala Public Jenkins
authored andcommitted
IMPALA-11377: Handle concurrent Iceberg INSERT OVERWRITEs
Iceberg now supports validations in the ReplacePartitions API, an initial snapshot id can be configured for the operation and Iceberg will validate at commit if there were any conflicting change while the ReplacePartitions was running. This commit utilizes this new functionality, the Coordinator sends the initial snapshot id to the Catalog, so the Catalog can validate the operation. Testing: - Added e2e tests. Change-Id: I812d19736c8e563541b038970786de7710b59f31 Reviewed-on: http://gerrit.cloudera.org:8080/18648 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
1 parent 63c0ce0 commit ffd226c

6 files changed

Lines changed: 61 additions & 5 deletions

File tree

be/src/service/client-request-state.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,6 +1467,7 @@ Status ClientRequestState::UpdateCatalog() {
14671467
ice_op.__set_iceberg_data_files_fb(
14681468
dml_exec_state->CreateIcebergDataFilesVector());
14691469
ice_op.__set_is_overwrite(finalize_params.is_overwrite);
1470+
ice_op.__set_initial_snapshot_id(finalize_params.initial_snapshot_id);
14701471
}
14711472

14721473
Status cnxn_status;

common/thrift/CatalogService.thrift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ struct TIcebergOperationParam {
213213

214214
// Is overwrite operation
215215
3: required bool is_overwrite = false;
216+
217+
// The snapshot id when the operation was started
218+
4: optional i64 initial_snapshot_id;
216219
}
217220

218221
// Per-partion info needed by Catalog to handle an INSERT.

common/thrift/Query.thrift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,9 @@ struct TFinalizeParams {
816816

817817
// Stores the Iceberg spec id of the partition spec used for this INSERT.
818818
9: optional i32 spec_id;
819+
820+
// Stores the Iceberg snapshot id of the target table for INSERTs.
821+
10: optional i64 initial_snapshot_id;
819822
}
820823

821824
// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()

fe/src/main/java/org/apache/impala/service/Frontend.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2259,6 +2259,7 @@ public static void addFinalizationParamsForInsert(
22592259
} else if (targetTable instanceof FeIcebergTable) {
22602260
FeIcebergTable iceTable = (FeIcebergTable)targetTable;
22612261
finalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
2262+
finalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
22622263
} else {
22632264
// TODO: Currently this flag only controls the removal of the query-level staging
22642265
// directory. HdfsTableSink (that creates the staging dir) calculates the path

fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import org.apache.iceberg.AppendFiles;
26+
import org.apache.iceberg.BaseReplacePartitions;
2627
import org.apache.iceberg.BaseTable;
2728
import org.apache.iceberg.DataFile;
2829
import org.apache.iceberg.DataFiles;
@@ -38,6 +39,7 @@
3839
import org.apache.iceberg.UpdateProperties;
3940
import org.apache.iceberg.UpdateSchema;
4041
import org.apache.iceberg.catalog.TableIdentifier;
42+
import org.apache.iceberg.exceptions.ValidationException;
4143
import org.apache.iceberg.expressions.Expressions;
4244
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
4345
import org.apache.impala.catalog.FeIcebergTable;
@@ -250,8 +252,10 @@ public void commit() {
250252

251253
private static class DynamicOverwrite implements BatchWrite {
252254
final private ReplacePartitions replace;
253-
public DynamicOverwrite(Transaction txn) {
255+
final long initialSnapshotId;
256+
public DynamicOverwrite(Transaction txn, long initialSnapshotId) {
254257
replace = txn.newReplacePartitions();
258+
this.initialSnapshotId = initialSnapshotId;
255259
}
256260

257261
@Override
@@ -261,6 +265,9 @@ public void addFile(DataFile file) {
261265

262266
@Override
263267
public void commit() {
268+
replace.validateFromSnapshot(initialSnapshotId);
269+
replace.validateNoConflictingData();
270+
replace.validateNoConflictingDeletes();
264271
replace.commit();
265272
}
266273
}
@@ -275,7 +282,7 @@ public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn,
275282
List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb();
276283
BatchWrite batchWrite;
277284
if (icebergOp.isIs_overwrite()) {
278-
batchWrite = new DynamicOverwrite(txn);
285+
batchWrite = new DynamicOverwrite(txn, icebergOp.getInitial_snapshot_id());
279286
} else {
280287
batchWrite = new Append(txn);
281288
}
@@ -297,7 +304,11 @@ public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn,
297304
if (partitionData != null) builder.withPartition(partitionData);
298305
batchWrite.addFile(builder.build());
299306
}
300-
batchWrite.commit();
307+
try {
308+
batchWrite.commit();
309+
} catch (ValidationException e) {
310+
throw new ImpalaRuntimeException(e.getMessage(), e);
311+
}
301312
}
302313

303314
private static Metrics buildDataFileMetrics(FbIcebergDataFile dataFile) {

tests/query_test/test_iceberg.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from avro.io import DatumReader
2929
import json
3030

31+
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
3132
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
3233
from tests.common.skip import SkipIf
3334

@@ -71,18 +72,54 @@ def test_drop_incomplete_table(self, vector, unique_database):
7172
self.hdfs_client.delete_file_dir(cat_location, True)
7273
self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name))
7374

74-
@SkipIf.not_hdfs
7575
def test_insert(self, vector, unique_database):
7676
self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database)
7777

7878
def test_partitioned_insert(self, vector, unique_database):
7979
self.run_test_case('QueryTest/iceberg-partitioned-insert', vector,
8080
use_db=unique_database)
8181

82-
@SkipIf.not_hdfs
8382
def test_insert_overwrite(self, vector, unique_database):
83+
"""Run iceberg-overwrite tests, then test that INSERT INTO/OVERWRITE queries running
84+
concurrently with a long running INSERT OVERWRITE are handled gracefully. query_a is
85+
started before query_b/query_c, but query_b/query_c supposed to finish before query_a.
86+
query_a should fail because the overwrite should not erase query_b/query_c's result.
87+
"""
88+
# Run iceberg-overwrite.test
8489
self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database)
8590

91+
# Create test dataset for concurrency tests and warm-up the test table
92+
tbl_name = unique_database + ".overwrite_tbl"
93+
self.client.execute("""create table {0} (i int)
94+
partitioned by spec (truncate(3, i))
95+
stored as iceberg""".format(tbl_name))
96+
self.client.execute("insert into {0} values (1), (2), (3);".format(tbl_name))
97+
98+
# Test queries: 'a' is the long running query while 'b' and 'c' are the short ones
99+
query_a = """insert overwrite {0} select sleep(5000);""".format(tbl_name)
100+
query_b = """insert overwrite {0} select * from {0};""".format(tbl_name)
101+
query_c = """insert into {0} select * from {0};""".format(tbl_name)
102+
103+
# Test concurrent INSERT OVERWRITEs, the exception closes the query handle.
104+
handle = self.client.execute_async(query_a)
105+
time.sleep(1)
106+
self.client.execute(query_b)
107+
try:
108+
self.client.wait_for_finished_timeout(handle, 30)
109+
assert False
110+
except ImpalaBeeswaxException as e:
111+
assert "Found conflicting files" in str(e)
112+
113+
# Test INSERT INTO during INSERT OVERWRITE, the exception closes the query handle.
114+
handle = self.client.execute_async(query_a)
115+
time.sleep(1)
116+
self.client.execute(query_c)
117+
try:
118+
self.client.wait_for_finished_timeout(handle, 30)
119+
assert False
120+
except ImpalaBeeswaxException as e:
121+
assert "Found conflicting files" in str(e)
122+
86123
def test_ctas(self, vector, unique_database):
87124
self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database)
88125

0 commit comments

Comments
 (0)