@@ -498,6 +498,10 @@ def __init__(self, system, config):
498498 # Check the output directories and create names of output files.
499499 self ._filenames = self ._prepare_output ()
500500
501+ # Per-window cache of the last saved energy-components time (ns),
502+ # used to skip duplicate rows on restart.
503+ self ._last_ec_time = {}
504+
501505 # Store the current system as a reference.
502506 self ._reference_system = self ._system .clone ()
503507
@@ -1194,7 +1198,7 @@ def increment_filename(base_filename, suffix):
11941198 filenames ["trajectory" ] = str (output_directory / f"traj_{ lam } .dcd" )
11951199 filenames ["trajectory_chunk" ] = str (output_directory / f"traj_{ lam } _" )
11961200 filenames ["energy_components" ] = str (
1197- output_directory / f"energy_components_{ lam } .csv "
1201+ output_directory / f"energy_components_{ lam } .parquet "
11981202 )
11991203 filenames ["gcmc_ghosts" ] = str (output_directory / f"gcmc_ghosts_{ lam } .txt" )
12001204 filenames ["sampler_stats" ] = str (output_directory / f"sampler_stats_{ lam } .pkl" )
@@ -2020,12 +2024,23 @@ def _backup_checkpoint(self, index):
20202024 except Exception as e :
20212025 return index , e
20222026
2027+ try :
2028+ # Backup the existing energy components file, if it exists.
2029+ path = _Path (self ._filenames [index ]["energy_components" ])
2030+ if path .exists () and path .stat ().st_size > 0 :
2031+ _copyfile (
2032+ self ._filenames [index ]["energy_components" ],
2033+ str (self ._filenames [index ]["energy_components" ]) + ".bak" ,
2034+ )
2035+ except Exception as e :
2036+ return index , e
2037+
20232038 return index , None
20242039
20252040 def _save_energy_components (self , index , context , time_ns ):
20262041 """
20272042 Internal function to save the energy components for each force group to a
2028- CSV file.
2043+ Parquet file.
20292044
20302045 Parameters
20312046 ----------
@@ -2040,11 +2055,28 @@ def _save_energy_components(self, index, context, time_ns):
20402055 The current simulation time in nanoseconds.
20412056 """
20422057
2043- import csv as _csv
2058+ import json as _json
20442059 import openmm
2060+ import pandas as _pd
2061+ import pyarrow as _pa
2062+ import pyarrow .parquet as _pq_local
20452063
20462064 filepath = self ._filenames [index ]["energy_components" ]
2047- file_exists = _Path (filepath ).exists ()
2065+
2066+ # Lazy-initialise the last saved time for restart deduplication.
2067+ # On the first call for this window, read the existing file (if any)
2068+ # to find the maximum time already written.
2069+ if index not in self ._last_ec_time :
2070+ path = _Path (filepath )
2071+ if path .exists () and path .stat ().st_size > 0 :
2072+ existing = _pq_local .read_table (filepath ).to_pandas ()
2073+ self ._last_ec_time [index ] = float (existing ["time" ].max ())
2074+ else :
2075+ self ._last_ec_time [index ] = - 1.0
2076+
2077+ # Skip rows that have already been written (restart deduplication).
2078+ if time_ns <= self ._last_ec_time [index ]:
2079+ return
20482080
20492081 # Use the named force groups already assigned by sire_to_openmm_system,
20502082 # sorted alphabetically for a consistent column order across runs.
@@ -2055,18 +2087,25 @@ def _save_energy_components(self, index, context, time_ns):
20552087 openmm .unit .kilocalories_per_mole
20562088 )
20572089
2058- columns = ["time" ] + list (energies .keys ())
2059- row = {"time" : round (time_ns , 6 )} | {
2060- name : round (nrg , 4 ) for name , nrg in energies .items ()
2061- }
2090+ row = {"time" : round (time_ns , 6 )} | energies
2091+ df = _pd .DataFrame ([row ])
2092+
2093+ path = _Path (filepath )
2094+ if path .exists () and path .stat ().st_size > 0 :
2095+ _parquet_append (filepath , df )
2096+ else :
2097+ # First write: embed units as schema metadata under the "somd2" key,
2098+ # consistent with how the energy trajectory parquet files are written.
2099+ table = _pa .Table .from_pandas (df )
2100+ meta = _json .dumps (
2101+ {"time_units" : "ns" , "energy_units" : "kcal/mol" }
2102+ ).encode ()
2103+ table = table .replace_schema_metadata (
2104+ {b"somd2" : meta , ** table .schema .metadata }
2105+ )
2106+ _pq_local .write_table (table , filepath )
20622107
2063- with open (filepath , "a" , newline = "" ) as f :
2064- writer = _csv .DictWriter (f , fieldnames = columns )
2065- if not file_exists :
2066- # Write a comment line with units before the header.
2067- f .write ("# time: ns, energy: kcal/mol\n " )
2068- writer .writeheader ()
2069- writer .writerow (row )
2108+ self ._last_ec_time [index ] = time_ns
20702109
20712110 def _restore_backup_files (self ):
20722111 """
0 commit comments