33import dask
44import dataclasses as dc
55import inflect
6+ import json
67import numpy as np
78import pandas as pd
9+ from hydra .core .hydra_config import HydraConfig
10+ from pathlib import Path
811from rich .console import Console
912from rich .table import Table
1013from typing import Dict , List , Optional
1518 RawStats ,
1619 ViewKey ,
1720 humanized_view_name ,
21+ view_name ,
1822)
1923
2024
@@ -63,11 +67,16 @@ def __init__(
6367 def handle_result (self , result : AnalyzerResultType ):
6468 raise NotImplementedError
6569
66- def _create_summary (self , result : AnalyzerResultType , view_key : ViewKey ) -> OutputSummary :
67- flat_view = result .flat_views [view_key ]
70+ def _compute_raw_stats (self , result : AnalyzerResultType ) -> RawStats :
6871 raw_stats = dask .compute (result .raw_stats )[0 ]
6972 if isinstance (raw_stats , dict ):
7073 raw_stats = RawStats (** raw_stats )
74+ return raw_stats
75+
76+ def _create_summary (self , result : AnalyzerResultType , view_key : ViewKey , raw_stats : Optional [RawStats ] = None ) -> OutputSummary :
77+ flat_view = result .flat_views [view_key ]
78+ if raw_stats is None :
79+ raw_stats = self ._compute_raw_stats (result )
7180 summary = OutputSummary (
7281 job_time = float (raw_stats .job_time ),
7382 layer_metrics = {},
@@ -132,6 +141,19 @@ def _humanized_layer_name(self, name: str) -> str:
132141 .replace ('Ssd' , '(SSD)' )
133142 )
134143
144+ @staticmethod
145+ def _additional_metric_scale_and_unit (metric : str ):
146+ metric_lower = metric .lower ()
147+ if metric_lower .endswith ('_gbps' ):
148+ return GiB , 'GB/s'
149+ if metric_lower .endswith ('_mbps' ):
150+ return MiB , 'MB/s'
151+ if metric_lower .endswith ('_gb' ):
152+ return GiB , 'GB'
153+ if metric_lower .endswith ('_mb' ):
154+ return MiB , 'MB'
155+ return 1.0 , '-'
156+
135157
136158class ConsoleOutput (Output ):
137159 def __init__ (
@@ -148,11 +170,12 @@ def __init__(
148170 self .show_header = show_header
149171
150172 def handle_result (self , result : AnalyzerResultType ):
173+ raw_stats = self ._compute_raw_stats (result )
151174 print_objects = []
152175 for view_key in result .flat_views :
153176 if view_key [- 1 ] not in result .view_types :
154177 continue
155- summary = self ._create_summary (result = result , view_key = view_key )
178+ summary = self ._create_summary (result = result , view_key = view_key , raw_stats = raw_stats )
156179 summary_table = self ._create_summary_table (summary = summary , view_key = view_key )
157180 layer_breakdown_table = self ._create_layer_breakdown_table (summary = summary , view_key = view_key )
158181 print_objects .append (summary_table )
@@ -280,19 +303,6 @@ def _create_additional_metrics_table(self, result: AnalyzerResultType, view_key:
280303 return None
281304 return additional_table
282305
283- @staticmethod
284- def _additional_metric_scale_and_unit (metric : str ):
285- metric_lower = metric .lower ()
286- if metric_lower .endswith ('_gbps' ):
287- return GiB , 'GB/s'
288- if metric_lower .endswith ('_mbps' ):
289- return MiB , 'MB/s'
290- if metric_lower .endswith ('_gb' ):
291- return GiB , 'GB'
292- if metric_lower .endswith ('_mb' ):
293- return MiB , 'MB'
294- return 1.0 , '-'
295-
296306 def _format_val (self , value : float , fmt_int = False ) -> str :
297307 if value is None or value == 0 :
298308 return '-'
@@ -324,6 +334,130 @@ def _percentage_color(self, percentage: float) -> str:
324334 return f"#{ int (r * 255 ):02x} { int (g * 255 ):02x} { int (b * 255 ):02x} "
325335
326336
337+ class JSONOutput (Output ):
338+ def __init__ (
339+ self ,
340+ compact : bool = False ,
341+ file_path : str = "" ,
342+ name : str = "" ,
343+ root_only : bool = False ,
344+ view_names : List [str ] = [],
345+ ):
346+ super ().__init__ (compact , name , root_only , view_names )
347+ self .file_path = file_path
348+
349+ def handle_result (self , result : AnalyzerResultType ):
350+ raw_stats = self ._compute_raw_stats (result )
351+ output = {
352+ "schema_version" : "1" ,
353+ "raw_stats" : self ._create_raw_stats (raw_stats = raw_stats ),
354+ "views" : {},
355+ }
356+ for view_key in result .flat_views :
357+ if view_key [- 1 ] not in result .view_types :
358+ continue
359+ summary = self ._create_summary (result = result , view_key = view_key , raw_stats = raw_stats )
360+ output ["views" ][view_name (view_key , separator = "/" )] = {
361+ "summary" : self ._create_summary_payload (summary = summary ),
362+ "additional_metrics" : self ._create_additional_metrics_payload (result = result , view_key = view_key ),
363+ }
364+
365+ output_path = self ._resolve_output_path ()
366+ output_path .parent .mkdir (parents = True , exist_ok = True )
367+ with output_path .open ("w" , encoding = "utf-8" ) as f :
368+ json .dump (output , f , indent = 2 , allow_nan = False )
369+ f .write ("\n " )
370+
371+ def _resolve_output_path (self ) -> Path :
372+ if self .file_path :
373+ return Path (self .file_path )
374+ try :
375+ output_dir = HydraConfig .get ().runtime .output_dir
376+ except Exception :
377+ output_dir = "."
378+ return Path (output_dir ) / "dfanalyzer_output.json"
379+
380+ @staticmethod
381+ def _to_int_or_none (value ):
382+ if value is None or pd .isna (value ):
383+ return None
384+ return int (value )
385+
386+ @staticmethod
387+ def _to_float_or_none (value ):
388+ if value is None or pd .isna (value ):
389+ return None
390+ return float (value )
391+
392+ def _create_raw_stats (self , raw_stats : RawStats ):
393+ return {
394+ "job_time_s" : self ._to_float_or_none (raw_stats .job_time ),
395+ "time_granularity_s" : self ._to_float_or_none (raw_stats .time_granularity ),
396+ "time_resolution_ns" : self ._to_int_or_none (raw_stats .time_resolution ),
397+ "total_event_count" : self ._to_int_or_none (raw_stats .total_event_count ),
398+ "unique_file_count" : self ._to_int_or_none (raw_stats .unique_file_count ),
399+ "unique_host_count" : self ._to_int_or_none (raw_stats .unique_host_count ),
400+ "unique_process_count" : self ._to_int_or_none (raw_stats .unique_process_count ),
401+ }
402+
403+ def _create_summary_payload (self , summary : OutputSummary ):
404+ summary_payload = {
405+ "job_time_s" : self ._to_float_or_none (summary .job_time ),
406+ "total_event_count" : self ._to_int_or_none (summary .total_event_count ),
407+ "unique_file_count" : self ._to_int_or_none (summary .unique_file_count ),
408+ "unique_host_count" : self ._to_int_or_none (summary .unique_host_count ),
409+ "unique_process_count" : self ._to_int_or_none (summary .unique_process_count ),
410+ "time_granularity_s" : self ._to_float_or_none (summary .time_granularity ),
411+ "time_resolution_ns" : self ._to_int_or_none (summary .time_resolution ),
412+ "layers" : {},
413+ }
414+ for layer in summary .layers :
415+ metrics = summary .layer_metrics [layer ]
416+ summary_payload ["layers" ][layer ] = {
417+ "time_s" : self ._to_float_or_none (metrics .time ),
418+ "count" : self ._to_int_or_none (metrics .count ),
419+ "size_bytes" : self ._to_float_or_none (metrics .size ),
420+ "ops_per_s" : self ._to_float_or_none (metrics .ops ),
421+ "bandwidth_bps" : self ._to_float_or_none (metrics .bandwidth ),
422+ "num_files" : self ._to_int_or_none (metrics .num_files ),
423+ "num_processes" : self ._to_int_or_none (metrics .num_processes ),
424+ "u_time_s" : self ._to_float_or_none (metrics .u_time ),
425+ "u_count" : self ._to_int_or_none (metrics .u_count ),
426+ "u_size_bytes" : self ._to_float_or_none (metrics .u_size ),
427+ }
428+ return summary_payload
429+
430+ def _create_additional_metrics_payload (self , result : AnalyzerResultType , view_key : ViewKey ):
431+ payload = {}
432+ flat_view = result .flat_views [view_key ]
433+ view_type = view_key [- 1 ]
434+ view_additional_metrics = result .additional_metrics .get (view_type , [])
435+ for metric in view_additional_metrics :
436+ if metric not in flat_view .columns :
437+ continue
438+ metric_series = pd .to_numeric (flat_view [metric ], errors = 'coerce' ).replace ([np .inf , - np .inf ], np .nan )
439+ scale , unit = self ._additional_metric_scale_and_unit (metric )
440+ metric_series = metric_series / scale
441+ non_null = int (metric_series .notna ().sum ())
442+ metric_payload = {
443+ "unit" : unit ,
444+ "non_null" : non_null ,
445+ "min" : None ,
446+ "mean" : None ,
447+ "max" : None ,
448+ }
449+ if non_null > 0 :
450+ metric_payload .update (
451+ {
452+ "min" : float (metric_series .min ()),
453+ "mean" : float (metric_series .mean ()),
454+ "max" : float (metric_series .max ()),
455+ }
456+ )
457+ payload [metric ] = metric_payload
458+ return payload
459+
460+
327461class CSVOutput (Output ):
328462 def handle_result (self , result : AnalyzerResultType ):
329463 raise NotImplementedError ("CSVOutput is not implemented yet." )
0 commit comments