1+ use crate :: assumptions:: assumption_set:: AssumptionSet ;
12use crate :: model_points:: { ModelPoint , convert_model_points_df_to_vector} ;
23use crate :: projections:: projection_mp:: project_single_model_point;
3- use crate :: projections:: projection_structs:: { MultipleRunResult , RunResult , RunSetup } ;
44use polars:: prelude:: * ;
55use rayon:: prelude:: * ;
6+ use std:: fs:: { File , read_to_string, write} ;
7+ use std:: path:: Path ;
8+
9+ use crate :: helpers:: create_parent_folder;
10+
11+ //---------------------------------------------------------------------------------------------------------
12+ // STRUCTS
13+ //---------------------------------------------------------------------------------------------------------
14+ #[ derive( Clone , Debug ) ]
15+ pub struct RunSetup {
16+ pub description : String , // Optional description for the run
17+ pub model_points_df : DataFrame ,
18+ pub assumptions : AssumptionSet ,
19+ }
20+
21+ impl RunSetup {
22+ pub fn export ( & self , folder_path_str : & str ) -> PolarsResult < ( ) > {
23+ // Create the folder for the run result - if not given a name, use a UUID
24+ let path = Path :: new ( & folder_path_str) ;
25+
26+ // Create the folder if it does not exist
27+ create_parent_folder ( & path) ;
28+
29+ // Export description
30+ let description_path = path. join ( "description.md" ) ;
31+ let description_content = format ! ( "{}" , self . description) ;
32+ write ( & description_path, description_content) ?;
33+
34+ // Export model points DataFrame
35+ let model_points_path = path. join ( "model_points.parquet" ) ;
36+ let mut model_points_file = File :: create ( model_points_path) ?;
37+ let mut model_points_df = self . model_points_df . clone ( ) ;
38+ ParquetWriter :: new ( & mut model_points_file) . finish ( & mut model_points_df) ?;
39+
40+ // Export assumptions DataFrame
41+ let assumptions_path = path. join ( "assumptions.parquet" ) ;
42+ self . assumptions
43+ . export ( assumptions_path. to_str ( ) . unwrap ( ) ) ?;
44+
45+ Ok ( ( ) )
46+ }
47+
48+ pub fn import ( folder_path_str : & str ) -> PolarsResult < Self > {
49+ let path = Path :: new ( folder_path_str) ;
50+
51+ // Check if the folder exists
52+ if !path. exists ( ) || !path. is_dir ( ) {
53+ return Err ( PolarsError :: ComputeError (
54+ format ! ( "Folder does not exist: {}" , folder_path_str) . into ( ) ,
55+ ) ) ;
56+ }
57+
58+ // Import description
59+ let description_path = path. join ( "description.md" ) ;
60+ let description = read_to_string ( description_path) ?. trim ( ) . to_string ( ) ;
61+
62+ // Import model points DataFrame
63+ let model_points_path = path. join ( "model_points.parquet" ) ;
64+ let mut model_points_file = File :: open ( model_points_path) ?;
65+ let model_points_df = ParquetReader :: new ( & mut model_points_file) . finish ( ) ?;
66+
67+ // Import assumptions DataFrame
68+ let assumptions_path = path. join ( "assumptions.parquet" ) ;
69+ let assumptions = AssumptionSet :: import ( assumptions_path. to_str ( ) . unwrap ( ) ) ?;
70+
71+ // Create the RunSetup instance
72+ let result = RunSetup {
73+ description,
74+ model_points_df,
75+ assumptions,
76+ } ;
77+
78+ Ok ( result)
79+ }
80+ }
81+
82+ #[ derive( Clone , Debug ) ]
83+ pub struct RunResult {
84+ pub setup : RunSetup ,
85+ pub projected_df : DataFrame ,
86+ }
87+
88+ impl RunResult {
89+ pub fn export ( & self , folder_path_str : & str ) -> PolarsResult < ( ) > {
90+ // Create the folder for the run result - if not given a name, use a UUID
91+ let path = Path :: new ( & folder_path_str) ;
92+
93+ // Create the folder if it does not exist
94+ create_parent_folder ( & path) ;
95+
96+ // Export run_setup
97+ let setup_path = path. join ( "run_setup" ) ;
98+ self . setup . export ( setup_path. to_str ( ) . unwrap ( ) ) ?;
99+
100+ // Export projected DataFrame
101+ let projected_df_path = path. join ( "projected_df.parquet" ) ;
102+ let mut projected_df_file = File :: create ( projected_df_path) ?;
103+ let mut projected_df = self . projected_df . clone ( ) ;
104+ ParquetWriter :: new ( & mut projected_df_file) . finish ( & mut projected_df) ?;
105+
106+ Ok ( ( ) )
107+ }
108+
109+ pub fn import ( folder_path_str : & str ) -> PolarsResult < Self > {
110+ let path = Path :: new ( folder_path_str) ;
111+
112+ // Check if the folder exists
113+ if !path. exists ( ) || !path. is_dir ( ) {
114+ return Err ( PolarsError :: ComputeError (
115+ format ! ( "Folder does not exist: {}" , folder_path_str) . into ( ) ,
116+ ) ) ;
117+ }
118+
119+ // Import run setup
120+ let setup_path = path. join ( "run_setup" ) ;
121+ let setup = RunSetup :: import ( setup_path. to_str ( ) . unwrap ( ) ) ?;
122+
123+ // Import projected DataFrame
124+ let projected_df_path = path. join ( "projected_df.parquet" ) ;
125+ let mut projected_df_file = File :: open ( projected_df_path) ?;
126+ let projected_df = ParquetReader :: new ( & mut projected_df_file) . finish ( ) ?;
127+
128+ let result = RunResult {
129+ setup : setup,
130+ projected_df : projected_df,
131+ } ;
132+
133+ Ok ( result)
134+ }
135+ }
6136
7137//---------------------------------------------------------------------------------------------------------
8138// PRIVATE
9139//---------------------------------------------------------------------------------------------------------
10- fn _project_single_run ( run_setup : & RunSetup ) -> PolarsResult < RunResult > {
140+ fn _project_single_run ( setup : & RunSetup ) -> PolarsResult < RunResult > {
11141 // Convert model points DataFrame to vector
12- let model_points_vec = convert_model_points_df_to_vector ( & run_setup . model_points_df ) ?;
142+ let model_points_vec = convert_model_points_df_to_vector ( & setup . model_points_df ) ?;
13143
14144 let mut all_lfs = Vec :: with_capacity ( model_points_vec. len ( ) ) ;
15145
16146 for ( _, mp) in model_points_vec. iter ( ) . enumerate ( ) {
17- let lf = project_single_model_point ( mp, & run_setup . assumptions ) ?;
147+ let lf = project_single_model_point ( mp, & setup . assumptions ) ?;
18148 all_lfs. push ( lf) ;
19149 }
20150
@@ -24,7 +154,7 @@ fn _project_single_run(run_setup: &RunSetup) -> PolarsResult<RunResult> {
24154
25155 // Return the result with run setup and projected DataFrame
26156 let result = RunResult {
27- run_setup : run_setup . clone ( ) ,
157+ setup : setup . clone ( ) ,
28158 projected_df : final_df,
29159 } ;
30160
@@ -37,9 +167,9 @@ fn _project_single_run(run_setup: &RunSetup) -> PolarsResult<RunResult> {
37167// Process data in chunks to avoid stack overflow
38168const CHUNK_SIZE : usize = 100 ;
39169
40- fn _project_single_run_parallel ( run_setup : & RunSetup ) -> PolarsResult < RunResult > {
170+ fn _project_single_run_parallel ( setup : & RunSetup ) -> PolarsResult < RunResult > {
41171 // Convert model points DataFrame to vector
42- let model_points_vec = convert_model_points_df_to_vector ( & run_setup . model_points_df ) ?;
172+ let model_points_vec = convert_model_points_df_to_vector ( & setup . model_points_df ) ?;
43173
44174 // Process chunks of model points in parallel with limited threads
45175 let chunks = model_points_vec
@@ -52,7 +182,7 @@ fn _project_single_run_parallel(run_setup: &RunSetup) -> PolarsResult<RunResult>
52182 // Process each chunk sequentially (no nested parallelism)
53183 let all_lfs = chunk
54184 . iter ( )
55- . map ( |mp| project_single_model_point ( mp, & run_setup . assumptions ) )
185+ . map ( |mp| project_single_model_point ( mp, & setup . assumptions ) )
56186 . collect :: < PolarsResult < Vec < LazyFrame > > > ( ) ?;
57187
58188 // Concatenate LazyFrames within the chunk and collect to DataFrame
@@ -68,7 +198,7 @@ fn _project_single_run_parallel(run_setup: &RunSetup) -> PolarsResult<RunResult>
68198
69199 // Return the result with run setup and projected DataFrame
70200 let result = RunResult {
71- run_setup : run_setup . clone ( ) ,
201+ setup : setup . clone ( ) ,
72202 projected_df : final_df,
73203 } ;
74204
@@ -78,20 +208,15 @@ fn _project_single_run_parallel(run_setup: &RunSetup) -> PolarsResult<RunResult>
78208//---------------------------------------------------------------------------------------------------------
79209// PUBLIC
80210//---------------------------------------------------------------------------------------------------------
81- pub fn project_runs ( run_setups : & Vec < RunSetup > ) -> PolarsResult < MultipleRunResult > {
82- let mut run_results : Vec < RunResult > = Vec :: with_capacity ( run_setups . len ( ) ) ; // To collect run results
211+ pub fn project_runs ( setups : & Vec < RunSetup > ) -> PolarsResult < Vec < RunResult > > {
212+ let mut results : Vec < RunResult > = Vec :: with_capacity ( setups . len ( ) ) ; // To collect run results
83213
84- for ( _, setup) in run_setups . iter ( ) . enumerate ( ) {
214+ for ( _, setup) in setups . iter ( ) . enumerate ( ) {
85215 let single_run_result = _project_single_run ( setup) ?;
86- run_results . push ( single_run_result) ;
216+ results . push ( single_run_result) ;
87217 }
88218
89- let result = MultipleRunResult {
90- run_setups : run_setups. clone ( ) ,
91- run_results : run_results,
92- } ;
93-
94- Ok ( result)
219+ Ok ( results)
95220}
96221
97222//----------------------------------------
@@ -102,32 +227,22 @@ Using the below command to run the code in parallel with limited threads finish
102227The test is not exhaustive, but it shows that parallel processing can significantly speed up the projection of model points.
103228$env:RAYON_NUM_THREADS = 8; $env:RUST_MIN_STACK = 33554432; cargo run
104229*/
105- pub fn project_runs_parallel ( run_setups : & Vec < RunSetup > ) -> PolarsResult < MultipleRunResult > {
106- let mut run_results : Vec < RunResult > = Vec :: with_capacity ( run_setups . len ( ) ) ; // To collect run results
230+ pub fn project_runs_parallel ( setups : & Vec < RunSetup > ) -> PolarsResult < Vec < RunResult > > {
231+ let mut results : Vec < RunResult > = Vec :: with_capacity ( setups . len ( ) ) ; // To collect run results
107232
108- for ( _, setup) in run_setups . iter ( ) . enumerate ( ) {
109- let single_run_result = _project_single_run_parallel ( setup) ?;
110- run_results . push ( single_run_result ) ;
233+ for ( _, setup) in setups . iter ( ) . enumerate ( ) {
234+ let result = _project_single_run_parallel ( setup) ?;
235+ results . push ( result ) ;
111236 }
112237
113- let result = MultipleRunResult {
114- run_setups : run_setups. clone ( ) ,
115- run_results : run_results,
116- } ;
117-
118- Ok ( result)
238+ Ok ( results)
119239}
120240
121- pub fn project_runs_parallel_x2 ( run_setups : & Vec < RunSetup > ) -> PolarsResult < MultipleRunResult > {
122- let run_results = run_setups
241+ pub fn project_runs_parallel_x2 ( setups : & Vec < RunSetup > ) -> PolarsResult < Vec < RunResult > > {
242+ let results = setups
123243 . par_iter ( )
124244 . map ( |setup| _project_single_run_parallel ( setup) )
125245 . collect :: < PolarsResult < Vec < RunResult > > > ( ) ?;
126246
127- let result = MultipleRunResult {
128- run_setups : run_setups. clone ( ) ,
129- run_results : run_results,
130- } ;
131-
132- Ok ( result)
247+ Ok ( results)
133248}
0 commit comments