@@ -729,20 +729,18 @@ def store_extra_data(data: Tuple[Dict], data_path: str):
729729 return json .dump (data [0 ], f , cls = NpEncoder )
730730
731731 def store_flat_views (self , flat_views : Dict [ViewKey , pd .DataFrame ]):
732- store_flat_view_tasks = []
733732 for view_key in flat_views :
734733 flat_view_checkpoint_name = self .get_checkpoint_name (CHECKPOINT_FLAT_VIEW , * list (view_key ))
735734 flat_view_checkpoint_path = self .get_checkpoint_path (name = flat_view_checkpoint_name )
736735 if self .has_checkpoint (name = flat_view_checkpoint_name ):
737736 continue
738- store_flat_view_tasks .append (
739- self .dask_client .submit (
740- self ._save_flat_view ,
741- view = flat_views [view_key ],
742- view_path = flat_view_checkpoint_path ,
743- )
737+ # Save local pandas flat views directly to avoid shipping large payloads
738+ # through Dask task graphs.
739+ self ._save_flat_view (
740+ view = flat_views [view_key ],
741+ view_path = flat_view_checkpoint_path ,
744742 )
745- return store_flat_view_tasks
743+ return []
746744
747745 def store_view (self , name : str , view : dd .DataFrame , partition_size = "64MB" ):
748746 """Stores a Dask DataFrame view to a Parquet checkpoint.
0 commit comments