diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 26093575c1194c..c38d8b810a9b2d 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -341,6 +341,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_ if (*selected_rows == 0) { break; } + const size_t original_columns = file_block->columns(); int result_column_id = -1; RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block, &result_column_id)); @@ -356,7 +357,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_ keep_filter[row] = !delete_filter[row]; has_kept_row |= keep_filter[row] != 0; } - file_block->erase(result_column_id); + file_block->erase_tail(original_columns); *selected_rows = !has_kept_row ? 0 : _apply_filter_to_selection(keep_filter, selection, *selected_rows); diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 6d0156af9ce2db..fb6c6d8ab35707 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -227,6 +227,31 @@ void write_dictionary_filter_parquet_file(const std::string& file_path) { builder.build())); } +void write_dictionary_edge_parquet_file(const std::string& file_path) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32(), false), + arrow::field("value", arrow::utf8(), false), + }); + auto table = arrow::Table::Make( + schema, + {build_int32_array({1, 2, 3, 4, 5, 6, 7, 8}), + build_string_array({"", "same", "other", "long-value", "", "tail", "same", "last"})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + builder.enable_dictionary("value"); + builder.disable_dictionary("id"); + builder.disable_statistics(); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 2, + builder.build())); +} + Block build_file_block(const std::vector& schema) { Block block; for (const auto& field : schema) { @@ -284,6 +309,10 @@ class TestFileReader final : public reader::FileReader { bool has_request() const { return _request != nullptr; } bool eof() const { return _eof; } + + bool has_io_context() const { return _io_ctx != nullptr; } + + long io_context_use_count() const { return _io_ctx.use_count(); } }; TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) { @@ -304,6 +333,25 @@ TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) { EXPECT_TRUE(reader.eof()); } +TEST(FileReaderTest, CloseReleasesSharedIOContext) { + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto file_description = std::make_unique(); + auto io_ctx = std::make_shared(); + std::weak_ptr weak_io_ctx = io_ctx; + TestFileReader reader(system_properties, file_description, io_ctx); + + EXPECT_TRUE(reader.has_io_context()); + EXPECT_EQ(reader.io_context_use_count(), 2); + io_ctx.reset(); + EXPECT_FALSE(weak_io_ctx.expired()); + EXPECT_EQ(reader.io_context_use_count(), 1); + + ASSERT_TRUE(reader.close().ok()); + EXPECT_FALSE(reader.has_io_context()); + EXPECT_TRUE(weak_io_ctx.expired()); +} + TEST(TableColumnMapperTest, CreatesComplexProjectionForStructChildren) { reader::SchemaField struct_field; struct_field.id = 0; @@ -691,6 +739,102 @@ TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) { EXPECT_EQ(values, std::vector({"az", "za"})); } +TEST_F(NewParquetReaderTest, DictionaryPageV2StringEdgesSurviveSelection) { + write_dictionary_edge_parquet_file(_file_path); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 4); + for (int row_group_idx = 0; row_group_idx < 4; ++row_group_idx) { + auto row_group = parquet_file_reader->metadata()->RowGroup(row_group_idx); + ASSERT_NE(row_group, nullptr); + ASSERT_TRUE(row_group->ColumnChunk(1)->has_dictionary_page()); + } + + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_unique(); + request->predicate_columns = {1}; + request->non_predicate_columns = {0}; + auto set = build_set(); + set->insert(const_cast(""), 0); + set->insert(const_cast("same"), 4); + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 1; + column_filter.predicates.push_back(create_in_list_predicate( + 1, "value", schema[1].type, set, false)); + request->column_predicate_filters.push_back(std::move(column_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + std::vector ids; + std::vector values; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& value_column = + assert_cast(*block.get_by_position(1).column); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + values.push_back(value_column.get_data_at(row).to_string()); + } + } + + EXPECT_EQ(ids, std::vector({1, 2, 5, 6, 7, 8})); + EXPECT_EQ(values, std::vector({"", "same", "", "tail", "same", "last"})); +} + +TEST_F(NewParquetReaderTest, StatisticsPruningSkipsPrefixRowGroupsAndReadsLaterGroups) { + write_parquet_file(_file_path, 1); + auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); + ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 5); + + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_unique(); + request->predicate_columns = {0}; + request->non_predicate_columns = {1}; + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = 0; + column_filter.predicates.push_back(create_comparison_predicate( + 0, "id", schema[0].type, Field::create_field(4), false)); + request->column_predicate_filters.push_back(std::move(column_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + std::vector ids; + std::vector values; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& value_column = + assert_cast(*block.get_by_position(1).column); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + values.push_back(value_column.get_data_at(row).to_string()); + } + } + + EXPECT_EQ(ids, std::vector({4, 5})); + EXPECT_EQ(values, std::vector({"four", "five"})); +} + TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) { write_parquet_file(_file_path, 2); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp index 1bb6eaf26be6fb..c5efa0512e603f 100644 --- a/be/test/format/reader/table_reader_test.cpp +++ b/be/test/format/reader/table_reader_test.cpp @@ -289,6 +289,27 @@ void write_iceberg_equality_delete_parquet_file(const std::string& file_path, in builder.build())); } +void write_iceberg_equality_delete_bigint_parquet_file(const std::string& file_path, + int32_t field_id, int64_t value) { + const auto metadata = + arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)}); + auto schema = arrow::schema({ + arrow::field("id", arrow::int64(), false)->WithMetadata(metadata), + }); + auto table = arrow::Table::Make(schema, {build_int64_array({value})}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, + builder.build())); +} + void write_int_pair_parquet_file(const std::string& file_path, const std::vector& ids, const std::vector& scores, const std::vector& values, @@ -722,6 +743,90 @@ TEST(TableReaderTest, PushDownMinMaxCastsFileValueToTableType) { std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, PushDownMinMaxOnlyUsesSelectedRowGroupInFileRange) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_minmax_range_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {10, 1, 100}, {100, 10, 1000}, {"ten", "one", "hundred"}, + 1); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::MINMAX, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path, 1)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + const auto& id_column = assert_cast(*block.get_by_position(0).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 1); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, PushDownCountOnlyUsesSelectedRowGroupInFileRange) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_table_reader_count_range_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}, 1); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + TableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = &state, + .scanner_profile = nullptr, + .allow_missing_columns = true, + .push_down_agg_type = TPushAggOp::type::COUNT, + .profile = nullptr, + }) + .ok()); + ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path, 2)).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 1); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + TEST(TableReaderTest, PushDownCountFallsBackWithTableConjunct) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_count_conjunct_test"; @@ -1827,6 +1932,163 @@ TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithEqualityDele std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, IcebergEqualityDeleteCastsDataColumnToDeleteKeyType) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_equality_delete_cast_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "equality-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_iceberg_equality_delete_bigint_parquet_file(delete_file_path, 0, 2); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_equality_delete_file(delete_file_path, {0})})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({1, 3})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergPositionDeleteOnlyMatchesOriginalDataFilePath) { + const auto test_dir = std::filesystem::temp_directory_path() / + "doris_iceberg_position_delete_path_match_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto other_file_path = (test_dir / "other.parquet").string(); + const auto delete_file_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_position_delete_parquet_file(delete_file_path, {other_file_path, file_path}, {0, 1}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_position_delete_file(delete_file_path)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({1, 3})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergRowLineageRemainsFileLocalAfterDeleteFiltering) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_row_lineage_delete_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); + write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); + + std::vector projected_columns; + projected_columns.push_back( + make_table_column(100, "_row_id", make_nullable(std::make_shared()))); + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + TTableFormatFileDesc table_format_params = make_iceberg_table_format_desc( + file_path, {make_iceberg_position_delete_file(delete_file_path)}); + table_format_params.iceberg_params.__set_first_row_id(1000); + split_options.current_range.__set_table_format_params(table_format_params); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + Block block = build_table_block(projected_columns); + bool eos = false; + ASSERT_TRUE(reader.get_block(&block, &eos).ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 2); + expect_nullable_int64_column_values(*block.get_by_position(0).column, {1000, 1002}); + const auto& id_column = assert_cast(*block.get_by_position(1).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 3); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_file_test";