Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ef45c20
Add Iceberg Parquet reader API skeleton
suxiaogang223 May 18, 2026
57178c8
Refine Iceberg reader API boundaries
suxiaogang223 May 18, 2026
1676f2e
fix compiling (#63368)
Gabriel39 May 18, 2026
e5d17b8
refactor table reader (#63397)
Gabriel39 May 19, 2026
783e740
Add unit tests for expr (#63415)
Gabriel39 May 20, 2026
dae05ba
Framework to do delete filtering (#63442)
Gabriel39 May 20, 2026
2539b0a
[test](be) Add DeletePredicate unit tests (#63455)
Gabriel39 May 21, 2026
0fb11e4
cast for schema change (#63477)
Gabriel39 May 21, 2026
3dbfc4c
Complete basic parquet reader (#63659)
Gabriel39 May 26, 2026
5dc54d8
[improvement](be) Reuse table reader file block (#63704)
Gabriel39 May 27, 2026
e65a09e
[test](be) Cover parquet conjunct local filter (#63705)
Gabriel39 May 27, 2026
4fe7254
Refactor 0527 (#63712)
Gabriel39 May 27, 2026
aabcbc3
[parquet] Clarify reader lifecycle comments
suxiaogang223 May 27, 2026
e1ed7f2
[parquet] Update new reader design docs
suxiaogang223 May 27, 2026
f19f78a
[feature](be) Build table filters from conjuncts (#63733)
Gabriel39 May 27, 2026
8882d08
[feature](be) Support nested projection in new parquet reader
suxiaogang223 May 27, 2026
ee4a332
[fix](be) Fix VSlotRef constructor build
suxiaogang223 May 27, 2026
472c1cc
[fix](be) Fix parquet batch row cast
suxiaogang223 May 27, 2026
4073912
[feature](be) Support expression filters on file reader (#63748)
Gabriel39 May 27, 2026
5ad0921
[fix](be) Cast localized filter slots for file schema types (#63754)
Gabriel39 May 27, 2026
6f4dac3
[feature](be) Add basic parquet list reader
suxiaogang223 May 28, 2026
ac24a82
[fix](be) Fill partition columns in TableReader (#63773)
Gabriel39 May 28, 2026
321134d
Materialize Iceberg row lineage virtual columns (#63787)
Gabriel39 May 28, 2026
6cb5719
[feature](be) Add basic parquet map reader
suxiaogang223 May 28, 2026
b8a40bd
[feature](be) Support parquet repeated level assembly
suxiaogang223 May 28, 2026
68a29c5
[fix](be) Fix parquet map reader build warnings
suxiaogang223 May 28, 2026
c3c9d3b
[feature](be) Support parquet struct scalar assembly
suxiaogang223 May 28, 2026
2cfd503
[feature](be) Support Iceberg position delete predicates (#63799)
Gabriel39 May 28, 2026
6b3ce8c
[feature](be) Support Iceberg equality deletes in reader (#63852)
Gabriel39 May 28, 2026
2369707
[doc](be) Update parquet complex reader status
suxiaogang223 May 28, 2026
4ca4e21
[feature](be) Add parquet dictionary row group pruning
suxiaogang223 May 29, 2026
97dc0f1
[fix](be) Decode parquet dictionary page directly
suxiaogang223 May 29, 2026
d807514
[fix](be) Match parquet list test schema nullability
suxiaogang223 May 29, 2026
cf14d1e
[fix](be) Read required nested parquet scalars
suxiaogang223 May 29, 2026
aa1381c
[fix](be) Stabilize parquet nested scalar levels
suxiaogang223 May 29, 2026
5ddf3f2
[fix](be) Align nullable parquet struct child slots
suxiaogang223 May 29, 2026
0ae7e1c
[feature](be) Support parquet minmax aggregate pushdown (#63868)
Gabriel39 May 29, 2026
6b7ae2f
[test](be) Add table reader edge case unit tests (#63895)
Gabriel39 May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ compile_commands.json
.github

.worktrees/
.worktree_initialized
29 changes: 29 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
Expand Down Expand Up @@ -451,6 +452,34 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT64) {
return Status::NotSupported("DATETIMEV2 decoded reader expects INT64 source");
}
if (view.values == nullptr && view.row_count > 0) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateTimeV2&>(column).get_data();
const auto* values = reinterpret_cast<const int64_t*>(view.values);
static const cctz::time_zone utc_time_zone = cctz::utc_time_zone();
const int64_t second_mask = view.time_unit == DecodedTimeUnit::MILLIS ? 1000 : 1000000;
for (int64_t row = 0; row < view.row_count; ++row) {
int64_t epoch_seconds = values[row] / second_mask;
int64_t sub_second = values[row] % second_mask;
if (sub_second < 0) {
sub_second += second_mask;
--epoch_seconds;
}
const int32_t microsecond = static_cast<int32_t>(sub_second * (1000000 / second_mask));
DateV2Value<DateTimeV2ValueType> datetime_value;
datetime_value.from_unixtime(epoch_seconds, utc_time_zone);
datetime_value.set_microsecond(static_cast<uint64_t>(microsecond));
data.push_back(datetime_value);
}
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_D
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;

Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
Expand Down
19 changes: 19 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datev2_impl.hpp"
Expand Down Expand Up @@ -125,6 +126,24 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
return Status::OK();
}

Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32) {
return Status::NotSupported("DATEV2 decoded reader expects INT32 source");
}
if (view.values == nullptr && view.row_count > 0) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateV2&>(column).get_data();
const auto* values = reinterpret_cast<const int32_t*>(view.values);
for (int64_t row = 0; row < view.row_count; ++row) {
DateV2Value<DateV2ValueType> date_v2;
date_v2.get_date_from_daynr(values[row] + date_threshold);
data.push_back(date_v2);
}
return Status::OK();
}

Status DataTypeDateV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_DATEV
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;
Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
64 changes: 64 additions & 0 deletions be/src/core/data_type_serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "core/column/column_decimal.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "exec/common/arithmetic_overflow.h"
#include "exprs/function/cast/cast_to_decimal.h"
Expand All @@ -43,6 +44,53 @@
#include "util/string_parser.hpp"

namespace doris {
namespace {

template <typename NativeType>
NativeType decode_big_endian_signed_integer(const uint8_t* data, int length) {
using UnsignedNativeType =
std::conditional_t<std::is_same_v<NativeType, Int128>, unsigned __int128,
std::make_unsigned_t<NativeType>>;
UnsignedNativeType value = data != nullptr && length > 0 && (data[0] & 0x80) != 0
? static_cast<UnsignedNativeType>(-1)
: 0;
for (int i = 0; i < length; ++i) {
value = static_cast<UnsignedNativeType>((value << 8) | data[i]);
}
return static_cast<NativeType>(value);
}

template <PrimitiveType T>
typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(const DecodedColumnView& view,
int64_t row) {
using FieldType = typename PrimitiveTypeTraits<T>::CppType;
if (view.value_kind == DecodedValueKind::INT32) {
const auto* values = reinterpret_cast<const int32_t*>(view.values);
return FieldType {static_cast<typename FieldType::NativeType>(values[row])};
}
if (view.value_kind == DecodedValueKind::INT64) {
const auto* values = reinterpret_cast<const int64_t*>(view.values);
return FieldType {static_cast<typename FieldType::NativeType>(values[row])};
}
const auto& value = (*view.binary_values)[row];
const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY
? view.fixed_length
: cast_set<int, size_t, false>(value.size);
return FieldType {
static_cast<typename FieldType::NativeType>(decode_big_endian_signed_integer<Int128>(
reinterpret_cast<const uint8_t*>(value.data), length))};
}

template <PrimitiveType T>
Status read_decimal_decoded_values(IColumn& column, const DecodedColumnView& view) {
auto& data = assert_cast<ColumnDecimal<T>&>(column).get_data();
for (int64_t row = 0; row < view.row_count; ++row) {
data.push_back(read_decimal_decoded_value<T>(view, row));
}
return Status::OK();
}

} // namespace

template <PrimitiveType T>
Status DataTypeDecimalSerDe<T>::from_string_batch(const ColumnString& str, ColumnNullable& column,
Expand Down Expand Up @@ -381,6 +429,22 @@ Status DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
return Status::OK();
}

template <PrimitiveType T>
Status DataTypeDecimalSerDe<T>::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if constexpr (T == TYPE_DECIMAL32 || T == TYPE_DECIMAL64 || T == TYPE_DECIMAL128I ||
T == TYPE_DECIMAL256) {
if (view.value_kind == DecodedValueKind::INT32 ||
view.value_kind == DecodedValueKind::INT64 ||
view.value_kind == DecodedValueKind::BINARY ||
view.value_kind == DecodedValueKind::FIXED_BINARY) {
return read_decimal_decoded_values<T>(column, view);
}
}
return Status::NotSupported("Unsupported decoded values for {} from source kind {}", get_name(),
static_cast<int>(view.value_kind));
}

template <PrimitiveType T>
Status DataTypeDecimalSerDe<T>::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_decimal_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class DataTypeDecimalSerDe : public DataTypeSerDe {
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;
Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
20 changes: 19 additions & 1 deletion be/src/core/data_type_serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <memory>
#include <vector>

#include "core/assert_cast.h"
#include "core/column/column.h"
Expand All @@ -31,6 +31,7 @@
#include "core/column/column_vector.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/data_type_serde/data_type_string_serde.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "exprs/function/cast/cast_base.h"
#include "format/transformer/vcsv_transformer.h"
#include "util/jsonb_document.h"
Expand Down Expand Up @@ -350,6 +351,23 @@ Status DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
ctz);
}

Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
auto& nullable_column = assert_cast<ColumnNullable&>(column);
auto& null_map = nullable_column.get_null_map_data();
const auto old_size = null_map.size();
null_map.resize(null_map.size() + view.row_count);
if (view.null_map != nullptr) {
// TODO: skip if no null in map
auto* dst = null_map.data() + old_size;
memcpy(dst, view.null_map, view.row_count);
}
DecodedColumnView nested_view = view;
nested_view.null_map = nullptr;
return nested_serde->read_column_from_decoded_values(nullable_column.get_nested_column(),
nested_view);
}

bool DataTypeNullableSerDe::write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
int64_t row_idx,
const FormatOptions& options) const {
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_nullable_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DataTypeNullableSerDe : public DataTypeSerDe {
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;
Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
52 changes: 52 additions & 0 deletions be/src/core/data_type_serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/packed_int128.h"
#include "core/types.h"
#include "core/value/timestamptz_value.h"
Expand All @@ -42,6 +43,29 @@
#include "util/to_string.h"

namespace doris {
namespace {

template <typename NativeType>
const NativeType* decoded_values_as(const DecodedColumnView& view) {
return reinterpret_cast<const NativeType*>(view.values);
}

template <PrimitiveType DorisType, typename SourceType>
Status read_number_decoded_values(IColumn& column, const DecodedColumnView& view) {
if (view.values == nullptr && view.row_count > 0) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data =
assert_cast<typename PrimitiveTypeTraits<DorisType>::ColumnType&>(column).get_data();
const auto* values = decoded_values_as<SourceType>(view);
for (int64_t row = 0; row < view.row_count; ++row) {
using DorisCppType = typename PrimitiveTypeTraits<DorisType>::CppType;
data.push_back(static_cast<DorisCppType>(values[row]));
}
return Status::OK();
}

} // namespace
// Type map的基本结构
template <typename Key, typename Value, typename... Rest>
struct TypeMap {
Expand Down Expand Up @@ -156,6 +180,34 @@ Status DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, cons
return Status::OK();
}

template <PrimitiveType T>
Status DataTypeNumberSerDe<T>::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if constexpr (T == TYPE_BOOLEAN) {
if (view.value_kind == DecodedValueKind::BOOL) {
return read_number_decoded_values<TYPE_BOOLEAN, bool>(column, view);
}
} else if constexpr (T == TYPE_INT) {
if (view.value_kind == DecodedValueKind::INT32) {
return read_number_decoded_values<TYPE_INT, int32_t>(column, view);
}
} else if constexpr (T == TYPE_BIGINT) {
if (view.value_kind == DecodedValueKind::INT64) {
return read_number_decoded_values<TYPE_BIGINT, int64_t>(column, view);
}
} else if constexpr (T == TYPE_FLOAT) {
if (view.value_kind == DecodedValueKind::FLOAT) {
return read_number_decoded_values<TYPE_FLOAT, float>(column, view);
}
} else if constexpr (T == TYPE_DOUBLE) {
if (view.value_kind == DecodedValueKind::DOUBLE) {
return read_number_decoded_values<TYPE_DOUBLE, double>(column, view);
}
}
return Status::NotSupported("Unsupported decoded values for {} from source kind {}", get_name(),
static_cast<int>(view.value_kind));
}

template <PrimitiveType T>
Status DataTypeNumberSerDe<T>::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/core/data_type_serde/data_type_number_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class DataTypeNumberSerDe : public DataTypeSerDe {
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;

Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;

Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/core/data_type_serde/data_type_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
namespace doris {
DataTypeSerDe::~DataTypeSerDe() = default;

Status DataTypeSerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
return Status::NotSupported("read_column_from_decoded_values is not supported for {}",
get_name());
}

DataTypeSerDeSPtrs create_data_type_serdes(const DataTypes& types) {
DataTypeSerDeSPtrs serdes;
serdes.reserve(types.size());
Expand Down
7 changes: 7 additions & 0 deletions be/src/core/data_type_serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/cast_set.h"
#include "common/status.h"
#include "core/column/column_nullable.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/field.h"
#include "core/string_buffer.hpp"
#include "core/types.h"
Expand Down Expand Up @@ -485,6 +486,12 @@ class DataTypeSerDe {
int64_t start, int64_t end,
const cctz::time_zone& ctz) const = 0;

// Read already decoded column values into a Doris column. The input view is format-neutral:
// file readers translate their decoder output into DecodedColumnView, while SerDe owns
// the Doris-type-specific materialization into IColumn.
virtual Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const;

// ORC serializer
virtual Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map,
Expand Down
Loading
Loading