@@ -100,21 +100,21 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100
100100
101101 # Get allowed columns from the schema
102102 allowed_columns = {
103- "Id" : "Id" ,
104- "MsMsType" : "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END" ,
105- "NumPeaks" : "NumPeaks" ,
106- "MaxIntensity" : "MaxIntensity" ,
107- "SummedIntensities" : "SummedIntensities" ,
108- "Time" : "Time" ,
109- "Charge" : "Charge" ,
110- "MonoisotopicMz" : "MonoisotopicMz" ,
103+ "Id" : ( "Id" , SCAN ) ,
104+ "MsMsType" : ( "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END" , MS_LEVEL ) ,
105+ "NumPeaks" : ( "NumPeaks" , NUM_PEAKS ) ,
106+ "MaxIntensity" : ( "MaxIntensity" , BASE_PEAK_INTENSITY ) ,
107+ "SummedIntensities" : ( "SummedIntensities" , SUMMED_PEAK_INTENSITY ) ,
108+ "Time" : ( "Time" , RETENTION_TIME ) ,
109+ "Charge" : ( "Charge" , CHARGE ) ,
110+ "MonoisotopicMz" : ( "MonoisotopicMz" , EXPERIMENTAL_MASS_TO_CHARGE ) ,
111111 }
112112
113113 # Construct safe column list
114114 safe_columns = []
115115 for schema_col_name , sql_expr in allowed_columns .items ():
116116 if schema_col_name in columns or schema_col_name == "Id" :
117- safe_columns .append (sql_expr )
117+ safe_columns .append (sql_expr [ 0 ] )
118118
119119 # Construct the query using safe columns
120120 query = f"""SELECT { ', ' .join (safe_columns )} FROM frames"""
@@ -125,7 +125,10 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100
125125 ) as parquet_writer :
126126 # Stream data in batches
127127 for chunk in pd .read_sql_query (query , conn , chunksize = batch_size ):
128- chunk ["AcquisitionDateTime" ] = acquisition_date_time
128+ chunk [ACQUISITION_DATETIME ] = acquisition_date_time
129+ # Change column names to match the schema using allowed columns mapping
130+ chunk .rename (columns = {v [0 ]: v [1 ] for v in allowed_columns .values ()}, inplace = True )
131+ chunk [SCAN ] = chunk [SCAN ].astype (str )
129132 for col in schema .names :
130133 if col not in chunk .columns :
131134 chunk [col ] = None
0 commit comments