Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_
if (*selected_rows == 0) {
break;
}
const size_t original_columns = file_block->columns();
int result_column_id = -1;
RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block,
&result_column_id));
Expand All @@ -356,7 +357,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_
keep_filter[row] = !delete_filter[row];
has_kept_row |= keep_filter[row] != 0;
}
file_block->erase(result_column_id);
file_block->erase_tail(original_columns);
*selected_rows =
!has_kept_row ? 0
: _apply_filter_to_selection(keep_filter, selection, *selected_rows);
Expand Down
144 changes: 144 additions & 0 deletions be/test/format/new_parquet/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,31 @@ void write_dictionary_filter_parquet_file(const std::string& file_path) {
builder.build()));
}

void write_dictionary_edge_parquet_file(const std::string& file_path) {
auto schema = arrow::schema({
arrow::field("id", arrow::int32(), false),
arrow::field("value", arrow::utf8(), false),
});
auto table = arrow::Table::Make(
schema,
{build_int32_array({1, 2, 3, 4, 5, 6, 7, 8}),
build_string_array({"", "same", "other", "long-value", "", "tail", "same", "last"})});

auto file_result = arrow::io::FileOutputStream::Open(file_path);
ASSERT_TRUE(file_result.ok()) << file_result.status();
std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;

::parquet::WriterProperties::Builder builder;
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
builder.enable_dictionary("value");
builder.disable_dictionary("id");
builder.disable_statistics();
PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 2,
builder.build()));
}

Block build_file_block(const std::vector<reader::SchemaField>& schema) {
Block block;
for (const auto& field : schema) {
Expand Down Expand Up @@ -284,6 +309,10 @@ class TestFileReader final : public reader::FileReader {
bool has_request() const { return _request != nullptr; }

bool eof() const { return _eof; }

bool has_io_context() const { return _io_ctx != nullptr; }

long io_context_use_count() const { return _io_ctx.use_count(); }
};

TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) {
Expand All @@ -304,6 +333,25 @@ TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) {
EXPECT_TRUE(reader.eof());
}

TEST(FileReaderTest, CloseReleasesSharedIOContext) {
auto system_properties = std::make_shared<io::FileSystemProperties>();
system_properties->system_type = TFileType::FILE_LOCAL;
auto file_description = std::make_unique<io::FileDescription>();
auto io_ctx = std::make_shared<io::IOContext>();
std::weak_ptr<io::IOContext> weak_io_ctx = io_ctx;
TestFileReader reader(system_properties, file_description, io_ctx);

EXPECT_TRUE(reader.has_io_context());
EXPECT_EQ(reader.io_context_use_count(), 2);
io_ctx.reset();
EXPECT_FALSE(weak_io_ctx.expired());
EXPECT_EQ(reader.io_context_use_count(), 1);

ASSERT_TRUE(reader.close().ok());
EXPECT_FALSE(reader.has_io_context());
EXPECT_TRUE(weak_io_ctx.expired());
}

TEST(TableColumnMapperTest, CreatesComplexProjectionForStructChildren) {
reader::SchemaField struct_field;
struct_field.id = 0;
Expand Down Expand Up @@ -691,6 +739,102 @@ TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) {
EXPECT_EQ(values, std::vector<std::string>({"az", "za"}));
}

TEST_F(NewParquetReaderTest, DictionaryPageV2StringEdgesSurviveSelection) {
write_dictionary_edge_parquet_file(_file_path);
auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false);
ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 4);
for (int row_group_idx = 0; row_group_idx < 4; ++row_group_idx) {
auto row_group = parquet_file_reader->metadata()->RowGroup(row_group_idx);
ASSERT_NE(row_group, nullptr);
ASSERT_TRUE(row_group->ColumnChunk(1)->has_dictionary_page());
}

auto reader = create_reader();
RuntimeState state {TQueryOptions(), TQueryGlobals()};
ASSERT_TRUE(reader->init(&state).ok());

std::vector<reader::SchemaField> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {1};
request->non_predicate_columns = {0};
auto set = build_set<TYPE_STRING>();
set->insert(const_cast<char*>(""), 0);
set->insert(const_cast<char*>("same"), 4);
reader::FileColumnPredicateFilter column_filter;
column_filter.file_column_id = 1;
column_filter.predicates.push_back(create_in_list_predicate<PredicateType::IN_LIST>(
1, "value", schema[1].type, set, false));
request->column_predicate_filters.push_back(std::move(column_filter));
ASSERT_TRUE(reader->open(request).ok());

std::vector<int32_t> ids;
std::vector<std::string> values;
bool eof = false;
while (!eof) {
Block block = build_file_block(schema);
size_t rows = 0;
ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
if (rows == 0) {
continue;
}
const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column);
const auto& value_column =
assert_cast<const ColumnString&>(*block.get_by_position(1).column);
for (size_t row = 0; row < rows; ++row) {
ids.push_back(id_column.get_element(row));
values.push_back(value_column.get_data_at(row).to_string());
}
}

EXPECT_EQ(ids, std::vector<int32_t>({1, 2, 5, 6, 7, 8}));
EXPECT_EQ(values, std::vector<std::string>({"", "same", "", "tail", "same", "last"}));
}

TEST_F(NewParquetReaderTest, StatisticsPruningSkipsPrefixRowGroupsAndReadsLaterGroups) {
write_parquet_file(_file_path, 1);
auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false);
ASSERT_EQ(parquet_file_reader->metadata()->num_row_groups(), 5);

auto reader = create_reader();
RuntimeState state {TQueryOptions(), TQueryGlobals()};
ASSERT_TRUE(reader->init(&state).ok());

std::vector<reader::SchemaField> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0};
request->non_predicate_columns = {1};
reader::FileColumnPredicateFilter column_filter;
column_filter.file_column_id = 0;
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GE>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(4), false));
request->column_predicate_filters.push_back(std::move(column_filter));
ASSERT_TRUE(reader->open(request).ok());

std::vector<int32_t> ids;
std::vector<std::string> values;
bool eof = false;
while (!eof) {
Block block = build_file_block(schema);
size_t rows = 0;
ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
if (rows == 0) {
continue;
}
const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column);
const auto& value_column =
assert_cast<const ColumnString&>(*block.get_by_position(1).column);
for (size_t row = 0; row < rows; ++row) {
ids.push_back(id_column.get_element(row));
values.push_back(value_column.get_data_at(row).to_string());
}
}

EXPECT_EQ(ids, std::vector<int32_t>({4, 5}));
EXPECT_EQ(values, std::vector<std::string>({"four", "five"}));
}

TEST_F(NewParquetReaderTest, RowPositionReaderReturnsFileLocalPositions) {
write_parquet_file(_file_path, 2);
auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false);
Expand Down
Loading
Loading