diff --git a/be/src/udf/python/python_server.py b/be/src/udf/python/python_server.py index ecdbef691b9fac..c33883cadfc7c6 100644 --- a/be/src/udf/python/python_server.py +++ b/be/src/udf/python/python_server.py @@ -252,11 +252,6 @@ def convert_arrow_field_to_python(field, column_metadata=None): if field is None: return None - if pa.types.is_map(field.type): - # pyarrow.lib.MapScalar's as_py() returns a list of tuples, convert to dict - list_of_tuples = field.as_py() - return dict(list_of_tuples) if list_of_tuples is not None else None - # Check if we should apply special IP type conversion based on metadata if column_metadata: # Arrow metadata keys can be either bytes or str depending on how they were created @@ -300,8 +295,44 @@ def convert_arrow_field_to_python(field, column_metadata=None): ) return value return None - - return field.as_py() + + return convert_arrow_value_to_python(field.as_py(), field.type) + + +def convert_arrow_value_to_python(value, arrow_type): + """ + Recursively convert Arrow nested values to Doris Python UDF values. + + PyArrow exposes MapScalar.as_py() as a list of key/value tuples. If the map is + nested under ARRAY or STRUCT, the top-level scalar is no longer MapScalar, so + field.as_py() alone would leak list-of-tuples to user UDF code. + """ + if value is None: + return None + + if pa.types.is_map(arrow_type): + key_type = arrow_type.key_type + item_type = arrow_type.item_type + return { + convert_arrow_value_to_python(k, key_type): convert_arrow_value_to_python( + v, item_type + ) + for k, v in value + } + + if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type): + element_type = arrow_type.value_type + return [convert_arrow_value_to_python(v, element_type) for v in value] + + if pa.types.is_struct(arrow_type): + return { + arrow_type[i].name: convert_arrow_value_to_python( + value.get(arrow_type[i].name), arrow_type[i].type + ) + for i in range(len(arrow_type)) + } + + return value def convert_python_to_arrow_value(value, output_type=None): @@ -563,9 +594,14 @@ def _cast_arrow_to_vector(arrow_array: pa.Array, vec_type: VectorType): Convert a pa.Array to an instance of the specified VectorType. """ if vec_type == VectorType.LIST: - return arrow_array.to_pylist() + return [ + convert_arrow_value_to_python(value, arrow_array.type) + for value in arrow_array.to_pylist() + ] elif vec_type == VectorType.PANDAS_SERIES: - return arrow_array.to_pandas() + return arrow_array.to_pandas().apply( + lambda value: convert_arrow_value_to_python(value, arrow_array.type) + ) else: raise ValueError(f"Unsupported vector type: {vec_type}") @@ -665,7 +701,7 @@ def _vectorized_call(self, record_batch: pa.RecordBatch) -> pa.Array: # instead of converting to list pylist = arrow_col.to_pylist() if len(pylist) > 0: - converted = pylist[0] + converted = convert_arrow_value_to_python(pylist[0], arrow_col.type) logging.info( "Converted %s to scalar (first value): %s", param.name, diff --git a/regression-test/data/pythonudf_p0/test_pythonudf_nested_complex_type.out b/regression-test/data/pythonudf_p0/test_pythonudf_nested_complex_type.out new file mode 100644 index 00000000000000..4fe7cb82953f5d --- /dev/null +++ b/regression-test/data/pythonudf_p0/test_pythonudf_nested_complex_type.out @@ -0,0 +1,20 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !scalar_constant_nested_complex -- +[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(const,[{s:7}],{empty:[],nums:[1,2]})|{const_key:(constTag,[{cm:11}])} +[]|{empty:[]}|(empty,[],{none:NULL})|{empty:(emptyTag,[])} +NULL|NULL|NULL|NULL + +-- !vector_list_nested_complex -- +[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(row1,[{s:7},{t:8}],{empty:[],nums:[1,2]})|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])} +[]|{empty:[]}|(row2,[],{none:NULL})|{empty:(tagEmpty,[])} +NULL|NULL|NULL|NULL + +-- !vector_series_nested_complex -- +[{a:1,b:2},{c:3}]|{left:[{x:10}],right:[{y:20},{z:30}]}|(row1,[{s:7},{t:8}],{empty:[],nums:[1,2]})|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])} +[]|{empty:[]}|(row2,[],{none:NULL})|{empty:(tagEmpty,[])} +NULL|NULL|NULL|NULL + +-- !vector_mixed_scalar_nested_complex -- +1|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])} +2|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])} +3|{first:(tagA,[{m:1},{n:2}]),second:(tagB,[])} diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_nested_complex_type.groovy b/regression-test/suites/pythonudf_p0/test_pythonudf_nested_complex_type.groovy new file mode 100644 index 00000000000000..a50c2ffc571de9 --- /dev/null +++ b/regression-test/suites/pythonudf_p0/test_pythonudf_nested_complex_type.groovy @@ -0,0 +1,445 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_pythonudf_nested_complex_type") { + def runtime_version = getPythonUdfRuntimeVersion() + + try { + sql """ DROP TABLE IF EXISTS test_pythonudf_nested_complex_type; """ + sql """ + CREATE TABLE test_pythonudf_nested_complex_type ( + id INT, + array_map ARRAY>, + map_array_map MAP>>, + struct_nested STRUCT< + label: STRING, + maps: ARRAY>, + attrs: MAP> + >, + map_struct_nested MAP> + >> + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES("replication_num" = "1"); + """ + + sql """ + INSERT INTO test_pythonudf_nested_complex_type VALUES + ( + 1, + [{'a': 1, 'b': 2}, {'c': 3}], + {'left': [{'x': 10}], 'right': [{'y': 20}, {'z': 30}]}, + {'row1', [{'s': 7}, {'t': 8}], {'nums': [1, 2], 'empty': []}}, + {'first': {'tagA', [{'m': 1}, {'n': 2}]}, 'second': {'tagB', []}} + ), + ( + 2, + [], + {'empty': []}, + {'row2', [], {'none': NULL}}, + {'empty': {'tagEmpty', []}} + ), + ( + 3, + NULL, + NULL, + NULL, + NULL + ); + """ + + sql """ + DROP FUNCTION IF EXISTS py_nested_complex_scalar( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ); + """ + sql """ + CREATE FUNCTION py_nested_complex_scalar( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ) + RETURNS STRING + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +def format_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{m[k]}' for k in sorted(m)) + '}' + +def format_array_map(arr): + if arr is None: + return 'NULL' + return '[' + ','.join(format_map(item) for item in arr) + ']' + +def format_map_array_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_ARRAY_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{format_array_map(m[k])}' for k in sorted(m)) + '}' + +def format_attrs(attrs): + if attrs is None: + return 'NULL' + if not isinstance(attrs, dict): + return 'BAD_ATTRS:' + type(attrs).__name__ + parts = [] + for key in sorted(attrs): + val = attrs[key] + if val is None: + parts.append(f'{key}:NULL') + else: + parts.append(f'{key}:[' + ','.join(str(x) for x in val) + ']') + return '{' + ','.join(parts) + '}' + +def format_struct(s): + if s is None: + return 'NULL' + if not isinstance(s, dict): + return 'BAD_STRUCT:' + type(s).__name__ + return '(' + str(s.get('label')) + ',' + format_array_map(s.get('maps')) + ',' + format_attrs(s.get('attrs')) + ')' + +def format_map_struct_nested(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_STRUCT:' + type(m).__name__ + parts = [] + for key in sorted(m): + val = m[key] + if val is None: + parts.append(f'{key}:NULL') + elif not isinstance(val, dict): + parts.append(f'{key}:BAD_STRUCT:' + type(val).__name__) + else: + parts.append(f'{key}:(' + str(val.get('tag')) + ',' + format_array_map(val.get('metrics')) + ')') + return '{' + ','.join(parts) + '}' + +def evaluate(array_map, map_array_map, struct_nested, map_struct_nested): + return '|'.join([ + format_array_map(array_map), + format_map_array_map(map_array_map), + format_struct(struct_nested), + format_map_struct_nested(map_struct_nested), + ]) +\$\$; + """ + + qt_scalar_constant_nested_complex """ + SELECT py_nested_complex_scalar( + CAST([{'a': 1, 'b': 2}, {'c': 3}] AS ARRAY>), + CAST({'left': [{'x': 10}], 'right': [{'y': 20}, {'z': 30}]} AS MAP>>), + CAST({'const', [{'s': 7}], {'nums': [1, 2], 'empty': []}} AS STRUCT>, attrs: MAP>>), + CAST({'const_key': {'constTag', [{'cm': 11}]}} AS MAP>>>) + ) + UNION ALL + SELECT py_nested_complex_scalar( + CAST([] AS ARRAY>), + CAST({'empty': []} AS MAP>>), + CAST({'empty', [], {'none': NULL}} AS STRUCT>, attrs: MAP>>), + CAST({'empty': {'emptyTag', []}} AS MAP>>>) + ) + UNION ALL + SELECT py_nested_complex_scalar( + CAST(NULL AS ARRAY>), + CAST(NULL AS MAP>>), + CAST(NULL AS STRUCT>, attrs: MAP>>), + CAST(NULL AS MAP>>>) + ); + """ + + sql """ + DROP FUNCTION IF EXISTS py_nested_complex_vector_list( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ); + """ + sql """ + CREATE FUNCTION py_nested_complex_vector_list( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ) + RETURNS STRING + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +def format_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{m[k]}' for k in sorted(m)) + '}' + +def format_array_map(arr): + if arr is None: + return 'NULL' + return '[' + ','.join(format_map(item) for item in arr) + ']' + +def format_map_array_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_ARRAY_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{format_array_map(m[k])}' for k in sorted(m)) + '}' + +def format_attrs(attrs): + if attrs is None: + return 'NULL' + if not isinstance(attrs, dict): + return 'BAD_ATTRS:' + type(attrs).__name__ + parts = [] + for key in sorted(attrs): + val = attrs[key] + if val is None: + parts.append(f'{key}:NULL') + else: + parts.append(f'{key}:[' + ','.join(str(x) for x in val) + ']') + return '{' + ','.join(parts) + '}' + +def format_struct(s): + if s is None: + return 'NULL' + if not isinstance(s, dict): + return 'BAD_STRUCT:' + type(s).__name__ + return '(' + str(s.get('label')) + ',' + format_array_map(s.get('maps')) + ',' + format_attrs(s.get('attrs')) + ')' + +def format_map_struct_nested(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_STRUCT:' + type(m).__name__ + parts = [] + for key in sorted(m): + val = m[key] + if val is None: + parts.append(f'{key}:NULL') + elif not isinstance(val, dict): + parts.append(f'{key}:BAD_STRUCT:' + type(val).__name__) + else: + parts.append(f'{key}:(' + str(val.get('tag')) + ',' + format_array_map(val.get('metrics')) + ')') + return '{' + ','.join(parts) + '}' + +def evaluate(array_maps: list, map_array_maps: list, struct_nesteds: list, map_struct_nesteds: list): + result = [] + for array_map, map_array_map, struct_nested, map_struct_nested in zip(array_maps, map_array_maps, struct_nesteds, map_struct_nesteds): + result.append('|'.join([ + format_array_map(array_map), + format_map_array_map(map_array_map), + format_struct(struct_nested), + format_map_struct_nested(map_struct_nested), + ])) + return result +\$\$; + """ + + qt_vector_list_nested_complex """ + SELECT py_nested_complex_vector_list(array_map, map_array_map, struct_nested, map_struct_nested) AS result + FROM test_pythonudf_nested_complex_type + ORDER BY id; + """ + + sql """ + DROP FUNCTION IF EXISTS py_nested_complex_vector_series( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ); + """ + sql """ + CREATE FUNCTION py_nested_complex_vector_series( + ARRAY>, + MAP>>, + STRUCT>, attrs: MAP>>, + MAP>>> + ) + RETURNS STRING + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +import pandas as pd + +def format_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{m[k]}' for k in sorted(m)) + '}' + +def format_array_map(arr): + if arr is None: + return 'NULL' + return '[' + ','.join(format_map(item) for item in arr) + ']' + +def format_map_array_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_ARRAY_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{format_array_map(m[k])}' for k in sorted(m)) + '}' + +def format_attrs(attrs): + if attrs is None: + return 'NULL' + if not isinstance(attrs, dict): + return 'BAD_ATTRS:' + type(attrs).__name__ + parts = [] + for key in sorted(attrs): + val = attrs[key] + if val is None: + parts.append(f'{key}:NULL') + else: + parts.append(f'{key}:[' + ','.join(str(x) for x in val) + ']') + return '{' + ','.join(parts) + '}' + +def format_struct(s): + if s is None: + return 'NULL' + if not isinstance(s, dict): + return 'BAD_STRUCT:' + type(s).__name__ + return '(' + str(s.get('label')) + ',' + format_array_map(s.get('maps')) + ',' + format_attrs(s.get('attrs')) + ')' + +def format_map_struct_nested(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP_STRUCT:' + type(m).__name__ + parts = [] + for key in sorted(m): + val = m[key] + if val is None: + parts.append(f'{key}:NULL') + elif not isinstance(val, dict): + parts.append(f'{key}:BAD_STRUCT:' + type(val).__name__) + else: + parts.append(f'{key}:(' + str(val.get('tag')) + ',' + format_array_map(val.get('metrics')) + ')') + return '{' + ','.join(parts) + '}' + +def evaluate(array_maps: pd.Series, map_array_maps: pd.Series, struct_nesteds: pd.Series, map_struct_nesteds: pd.Series) -> pd.Series: + if not all(isinstance(arg, pd.Series) for arg in [array_maps, map_array_maps, struct_nesteds, map_struct_nesteds]): + return pd.Series(['BAD_VECTOR_ARGS'] * len(array_maps)) + result = [] + for array_map, map_array_map, struct_nested, map_struct_nested in zip(array_maps, map_array_maps, struct_nesteds, map_struct_nesteds): + result.append('|'.join([ + format_array_map(array_map), + format_map_array_map(map_array_map), + format_struct(struct_nested), + format_map_struct_nested(map_struct_nested), + ])) + return pd.Series(result) +\$\$; + """ + + qt_vector_series_nested_complex """ + SELECT py_nested_complex_vector_series(array_map, map_array_map, struct_nested, map_struct_nested) AS result + FROM test_pythonudf_nested_complex_type + ORDER BY id; + """ + + sql """ + DROP FUNCTION IF EXISTS py_nested_complex_vector_mixed( + INT, + MAP>>> + ); + """ + sql """ + CREATE FUNCTION py_nested_complex_vector_mixed( + INT, + MAP>>> + ) + RETURNS STRING + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +import pandas as pd + +def format_map(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MAP:' + type(m).__name__ + return '{' + ','.join(f'{k}:{m[k]}' for k in sorted(m)) + '}' + +def format_array_map(arr): + if arr is None: + return 'NULL' + return '[' + ','.join(format_map(item) for item in arr) + ']' + +def format_map_struct_nested(m): + if m is None: + return 'NULL' + if not isinstance(m, dict): + return 'BAD_MIXED_SCALAR:' + type(m).__name__ + parts = [] + for key in sorted(m): + val = m[key] + if val is None: + parts.append(f'{key}:NULL') + elif not isinstance(val, dict): + parts.append(f'{key}:BAD_STRUCT:' + type(val).__name__) + else: + parts.append(f'{key}:(' + str(val.get('tag')) + ',' + format_array_map(val.get('metrics')) + ')') + return '{' + ','.join(parts) + '}' + +def evaluate(ids: pd.Series, mixed_map_struct_nested) -> pd.Series: + if not isinstance(ids, pd.Series): + return pd.Series(['BAD_VECTOR_ARG']) + formatted = format_map_struct_nested(mixed_map_struct_nested) + return pd.Series([str(id_value) + '|' + formatted for id_value in ids]) +\$\$; + """ + + qt_vector_mixed_scalar_nested_complex """ + SELECT py_nested_complex_vector_mixed(id, map_struct_nested) AS result + FROM test_pythonudf_nested_complex_type + ORDER BY id; + """ + } finally { + try_sql("DROP TABLE IF EXISTS test_pythonudf_nested_complex_type;") + } +}