|
| 1 | +import dask |
| 2 | +import warnings |
| 3 | +from dataclasses import dataclass |
| 4 | +from distributed import Client |
| 5 | +from hydra import compose, initialize |
| 6 | +from hydra.core.hydra_config import DictConfig, HydraConfig |
| 7 | +from hydra.utils import instantiate |
| 8 | +from omegaconf import OmegaConf |
| 9 | +from typing import List, Union, Optional |
| 10 | + |
| 11 | +from .analyzer import Analyzer |
| 12 | +from .cluster import ClusterType, ExternalCluster |
| 13 | +from .config import init_hydra_config_store |
| 14 | +from .dftracer import DFTracerAnalyzer |
| 15 | +from .output import ConsoleOutput, CSVOutput, SQLiteOutput |
| 16 | +from .recorder import RecorderAnalyzer |
| 17 | +from .types import ViewType |
| 18 | + |
| 19 | +try: |
| 20 | + from .darshan import DarshanAnalyzer |
| 21 | +except ModuleNotFoundError: |
| 22 | + DarshanAnalyzer = Analyzer |
| 23 | + |
| 24 | +AnalyzerType = Union[DarshanAnalyzer, DFTracerAnalyzer, RecorderAnalyzer] |
| 25 | +OutputType = Union[ConsoleOutput, CSVOutput, SQLiteOutput] |
| 26 | + |
| 27 | +# Suppress Dask warnings that are not relevant to the user |
| 28 | +dask.config.set({"dataframe.query-planning-warning": False}) |
| 29 | + |
| 30 | +# Suppress FutureWarnings related to pandas grouper |
| 31 | +warnings.filterwarnings( |
| 32 | + action="ignore", |
| 33 | + message=".*grouper", |
| 34 | + category=FutureWarning, |
| 35 | +) |
| 36 | + |
| 37 | + |
| 38 | +@dataclass |
| 39 | +class DFAnalyzerInstance: |
| 40 | + analyzer: Analyzer |
| 41 | + client: Client |
| 42 | + cluster: ClusterType |
| 43 | + hydra_config: DictConfig |
| 44 | + output: OutputType |
| 45 | + |
| 46 | + def analyze_trace( |
| 47 | + self, |
| 48 | + percentile: Optional[float] = None, |
| 49 | + view_types: Optional[List[ViewType]] = None, |
| 50 | + ): |
| 51 | + return self.analyzer.analyze_trace( |
| 52 | + exclude_characteristics=self.hydra_config.exclude_characteristics, |
| 53 | + logical_view_types=self.hydra_config.logical_view_types, |
| 54 | + metric_boundaries=OmegaConf.to_object(self.hydra_config.metric_boundaries), |
| 55 | + percentile=self.hydra_config.percentile if not percentile else percentile, |
| 56 | + time_view_type=self.hydra_config.time_view_type, |
| 57 | + trace_path=self.hydra_config.trace_path, |
| 58 | + unoverlapped_posix_only=self.hydra_config.unoverlapped_posix_only, |
| 59 | + view_types=self.hydra_config.view_types if not view_types else view_types, |
| 60 | + ) |
| 61 | + |
| 62 | + |
| 63 | +def init_with_hydra(hydra_overrides: List[str]): |
| 64 | + with initialize(version_base=None, config_path=None): |
| 65 | + init_hydra_config_store() |
| 66 | + hydra_config = compose( |
| 67 | + config_name="config", |
| 68 | + overrides=hydra_overrides, |
| 69 | + return_hydra_config=True, |
| 70 | + ) |
| 71 | + HydraConfig.instance().set_config(hydra_config) |
| 72 | + cluster = instantiate(hydra_config.cluster) |
| 73 | + if isinstance(cluster, ExternalCluster): |
| 74 | + client = Client(cluster.scheduler_address) |
| 75 | + else: |
| 76 | + client = Client(cluster) |
| 77 | + analyzer = instantiate( |
| 78 | + hydra_config.analyzer, |
| 79 | + debug=hydra_config.debug, |
| 80 | + verbose=hydra_config.verbose, |
| 81 | + ) |
| 82 | + output = instantiate(hydra_config.output) |
| 83 | + return DFAnalyzerInstance( |
| 84 | + analyzer=analyzer, |
| 85 | + client=client, |
| 86 | + cluster=cluster, |
| 87 | + hydra_config=hydra_config, |
| 88 | + output=output, |
| 89 | + ) |
0 commit comments