@@ -1051,43 +1051,43 @@ def convert_to_parquet(self, output_dir: str | Path, batch_size: int = 10000):
10511051 )
10521052
10531053 def _flush_buffer_to_parquet (self , name , buffer , writers , output_dir ):
1054+ import pandas as pd
10541055 import pyarrow as pa
10551056 import pyarrow .parquet as pq
10561057
10571058 if not buffer :
10581059 return
10591060
10601061 try :
1061- # PyArrow's from_pylist is robust but might need explicit schema if types vary.
1062- # We let it infer for now.
1063- table = pa .Table .from_pylist (buffer )
1062+ # Flatten nested dicts into dot-separated columns (e.g. measurement.data.steps).
1063+ # This avoids PyArrow inferring empty struct types (like "metadata": {})
1064+ # which Parquet cannot represent, and also eliminates schema drift caused
1065+ # by nested fields appearing/disappearing across batches.
1066+ df = pd .json_normalize (buffer )
1067+ table = pa .Table .from_pandas (df )
10641068 except Exception as e :
10651069 console .print (f"[red]Error converting batch for { name } : { e } [/red]" )
10661070 return
10671071
10681072 if name not in writers :
10691073 file_path = output_dir / f"{ name } .parquet"
1070- # Use the schema from the first batch
10711074 writers [name ] = pq .ParquetWriter (file_path , table .schema )
10721075
10731076 try :
1074- # If the new batch has a different schema (e.g. missing fields or new fields),
1075- # write_table might fail or produce a file with multiple schemas (which is bad).
1076- # Ideally we should unify schemas, but that requires reading all data first.
1077- # For now, we assume schema consistency or that PyArrow handles minor diffs.
1078- # If strict schema validation fails, we might need to cast.
1079-
1080- # Check if schema matches writer's schema
10811077 if not table .schema .equals (writers [name ].schema ):
1082- # Try to cast to the writer's schema
1083- # This handles cases where a field is missing (null) or type promotion is needed
1084- try :
1085- table = table .cast (writers [name ].schema )
1086- except Exception :
1087- # If casting fails, we might have a problem.
1088- # For now, log and skip or try to write anyway (which might fail)
1089- # console.print(f"[yellow]Schema mismatch for {name}. Attempting cast... {cast_error}[/yellow]")
1090- pass
1078+ # Unify schemas: merge the writer's existing schema with the new batch's
1079+ # schema so that columns present in either side are kept (new columns
1080+ # get nulls in earlier batches, missing columns get nulls here).
1081+ merged = pa .unify_schemas (
1082+ [writers [name ].schema , table .schema ], promote_options = "permissive"
1083+ )
1084+
1085+ # Reopen writer with the wider schema
1086+ writers [name ].close ()
1087+ file_path = output_dir / f"{ name } .parquet"
1088+ writers [name ] = pq .ParquetWriter (file_path , merged )
1089+
1090+ table = table .cast (merged )
10911091
10921092 writers [name ].write_table (table )
10931093 except Exception as e :
0 commit comments