Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions be/src/udf/python/python_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,6 @@ def convert_arrow_field_to_python(field, column_metadata=None):
if field is None:
return None

if pa.types.is_map(field.type):
# pyarrow.lib.MapScalar's as_py() returns a list of tuples, convert to dict
list_of_tuples = field.as_py()
return dict(list_of_tuples) if list_of_tuples is not None else None

# Check if we should apply special IP type conversion based on metadata
if column_metadata:
# Arrow metadata keys can be either bytes or str depending on how they were created
Expand Down Expand Up @@ -300,8 +295,44 @@ def convert_arrow_field_to_python(field, column_metadata=None):
)
return value
return None

return field.as_py()

return convert_arrow_value_to_python(field.as_py(), field.type)


def convert_arrow_value_to_python(value, arrow_type):
"""
Recursively convert Arrow nested values to Doris Python UDF values.

PyArrow exposes MapScalar.as_py() as a list of key/value tuples. If the map is
nested under ARRAY or STRUCT, the top-level scalar is no longer MapScalar, so
field.as_py() alone would leak list-of-tuples to user UDF code.
"""
if value is None:
return None

if pa.types.is_map(arrow_type):
key_type = arrow_type.key_type
item_type = arrow_type.item_type
return {
convert_arrow_value_to_python(k, key_type): convert_arrow_value_to_python(
v, item_type
)
for k, v in value
}

if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
element_type = arrow_type.value_type
return [convert_arrow_value_to_python(v, element_type) for v in value]

if pa.types.is_struct(arrow_type):
return {
arrow_type[i].name: convert_arrow_value_to_python(
value.get(arrow_type[i].name), arrow_type[i].type
)
for i in range(len(arrow_type))
}

return value


def convert_python_to_arrow_value(value, output_type=None):
Expand Down Expand Up @@ -563,9 +594,14 @@ def _cast_arrow_to_vector(arrow_array: pa.Array, vec_type: VectorType):
Convert a pa.Array to an instance of the specified VectorType.
"""
if vec_type == VectorType.LIST:
return arrow_array.to_pylist()
return [
convert_arrow_value_to_python(value, arrow_array.type)
for value in arrow_array.to_pylist()
]
elif vec_type == VectorType.PANDAS_SERIES:
return arrow_array.to_pandas()
return arrow_array.to_pandas().apply(
lambda value: convert_arrow_value_to_python(value, arrow_array.type)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now runs a Python-level recursive conversion for every element of every vectorized UDF argument, even when arrow_array.type is a primitive type that cannot contain a nested MAP. For pd.Series this replaces the previous arrow_array.to_pandas() fast path with .apply(lambda ...) over the whole column, so existing vectorized Python UDFs on primitive columns regress even though they do not need this fix. Please gate the recursive conversion to Arrow types that can actually contain nested values needing normalization, and keep the old direct to_pandas()/to_pylist() path for primitive/non-nested inputs.

)
else:
raise ValueError(f"Unsupported vector type: {vec_type}")

Expand Down Expand Up @@ -665,7 +701,7 @@ def _vectorized_call(self, record_batch: pa.RecordBatch) -> pa.Array:
# instead of converting to list
pylist = arrow_col.to_pylist()
if len(pylist) > 0:
converted = pylist[0]
converted = convert_arrow_value_to_python(pylist[0], arrow_col.type)
logging.info(
"Converted %s to scalar (first value): %s",
param.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !scalar_constant_nested_complex --
[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(const,[{s:7}],{empty:[],nums:[1,2]})|{const_key:(constTag,[{cm:11}])}
[]|{empty:[]}|(empty,[],{none:NULL})|{empty:(emptyTag,[])}
NULL|NULL|NULL|NULL

-- !vector_list_nested_complex --
[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(row1,[{s:7},{t:8}],{empty:[],nums:[1,2]})|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])}
[]|{empty:[]}|(row2,[],{none:NULL})|{empty:(tagEmpty,[])}
NULL|NULL|NULL|NULL

-- !vector_series_nested_complex --
[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(row1,[{s:7},{t:8}],{empty:[],nums:[1,2]})|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])}
[]|{empty:[]}|(row2,[],{none:NULL})|{empty:(tagEmpty,[])}
NULL|NULL|NULL|NULL

-- !vector_mixed_scalar_nested_complex --
1|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])}
2|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])}
3|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])}
Loading
Loading