Skip to content

Commit 23d632e

Browse files
committed
Initial integration of dfreport into dfanalyzer
1 parent 50e6cdd commit 23d632e

4 files changed

Lines changed: 291 additions & 5 deletions

File tree

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,20 @@
1-
# dfanalyzer
1+
# dfanalyzer
2+
## Report mode
3+
4+
You can now run all of the existing `dfreport.py` reports directly through the main `dfanalyzer` executable:
5+
6+
```bash
7+
# per-node summary:
8+
dfanalyzer --report --node /path/to/COMPACT/
9+
10+
# per-process, highlight the max across processes:
11+
dfanalyzer --report --process --aggregate /path/to/COMPACT/
12+
13+
#### How it works
14+
15+
- **`cli()`** inspects `sys.argv` before Hydra ever sees it.
16+
- If `--report` is present, we strip it out (so `argparse` in `dfreport.py` still works unchanged) and call `dfreport.main()`.
17+
- Otherwise we call the original `main()`, so nothing else in your toolchain is disturbed.
18+
- Finally, by repointing the `dfanalyzer` console script to `cli()`, any `dfanalyzer ...` invocation will first check for `--report`.
19+
20+
This gives you exactly what you asked for `dfanalyzer --report [dfreport options]` with minimal changes to the rest of the repo.

dfanalyzer/__main__.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# File: dfanalyzer/dfanalyzer/__main__.py
2+
3+
import sys
14
import hydra
25
from distributed import Client
36
from hydra.utils import instantiate
@@ -8,9 +11,19 @@
811
from .cluster import ExternalCluster
912
from .types import Rule
1013

11-
1214
init_hydra_config_store()
1315

16+
def cli():
17+
"""
18+
Dispatch between the new --report mode and the usual Hydrabpowered analysis.
19+
"""
20+
if '--report' in sys.argv:
21+
# Remove our flag so dfreport.py sees only its own args
22+
sys.argv.remove('--report')
23+
from .utils.dfreport import main as report_main
24+
report_main()
25+
else:
26+
main()
1427

1528
@hydra.main(version_base=None, config_name="config")
1629
def main(cfg: Config) -> None:
@@ -21,6 +34,7 @@ def main(cfg: Config) -> None:
2134
client.restart()
2235
else:
2336
client = Client(cluster)
37+
2438
analyzer: AnalyzerType = instantiate(
2539
cfg.analyzer,
2640
debug=cfg.debug,
@@ -36,11 +50,13 @@ def main(cfg: Config) -> None:
3650
unoverlapped_posix_only=cfg.unoverlapped_posix_only,
3751
view_types=cfg.view_types,
3852
)
53+
3954
output: OutputType = instantiate(cfg.output)
4055
output.handle_result(result=result)
56+
4157
client.close()
4258
cluster.close() # type: ignore
4359

44-
4560
if __name__ == "__main__":
46-
main()
61+
cli()
62+

dfanalyzer/utils/dfreport.py

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Command-line utility to summarize dftracer .pfw.gz traces.
4+
Summaries are per-node, per-process, per-thread, or individual events.
5+
Use --all-events with --node, --process, or --thread to output event-level summaries instead of grouped categories.
6+
Add --aggregate to highlight the node with the max of each metric across nodes.
7+
8+
Usage examples:
9+
./dfanalyze_v0.06.py --node /path/to/COMPACT/
10+
./dfanalyze_v0.06.py --node --all-events /path/to/COMPACT/
11+
./dfanalyze_v0.06.py --process /path/to/COMPACT/
12+
./dfanalyze_v0.06.py --process --all-events /path/to/COMPACT/
13+
./dfanalyze_v0.06.py --thread /path/to/COMPACT/
14+
./dfanalyze_v0.06.py --thread --all-events /path/to/COMPACT/
15+
./dfanalyze_v0.06.py --events /path/to/COMPACT/
16+
./dfanalyze_v0.06.py --node --aggregate /path/to/COMPACT/
17+
18+
Options are mutually exclusive, one of --node, --process, --thread, or --events is required.
19+
"""
20+
import argparse
21+
import os
22+
import glob
23+
import gzip
24+
import json
25+
from collections import defaultdict
26+
import pandas as pd
27+
from colorama import Fore, Style, init as colorama_init
28+
29+
colorama_init()
30+
31+
#--------------- Load & Summaries ----------------
32+
33+
def assign_group(name: str) -> str:
34+
nl = name.lower()
35+
if nl.startswith('torchframework'): return 'TorchFramework'
36+
if nl.startswith('pytorchdataloader'): return 'PytorchDataLoader'
37+
if nl.startswith('pytorchcheckpointing'):return 'PytorchCheckpointing'
38+
if nl.startswith('filestorage'): return 'FileStorage'
39+
if nl.startswith('dlio'): return 'DLIO'
40+
if any(tok in nl for tok in ('open','close','start')): return 'file_ops'
41+
if any(tok in nl for tok in ('read','seek')): return 'read_seek'
42+
if 'loop' in nl: return 'loop'
43+
if 'stat' in nl or 'xstat' in nl: return 'attr_checks'
44+
if 'npz' in nl: return 'npz_ops'
45+
return 'other'
46+
47+
48+
def load_node_df(node_dir: str) -> pd.DataFrame|None:
49+
records=[]
50+
files=glob.glob(os.path.join(node_dir,'*.pfw.gz'))
51+
compact=os.path.join(node_dir,'COMPACT')
52+
if os.path.isdir(compact):
53+
files+=glob.glob(os.path.join(compact,'*.pfw.gz'))
54+
for p in sorted(files):
55+
with gzip.open(p,'rt') as f:
56+
for raw in f:
57+
line=raw.strip().rstrip(',')
58+
if not line or line in ('[',']'): continue
59+
try:
60+
obj=json.loads(line)
61+
except json.JSONDecodeError:
62+
continue
63+
records.append(obj)
64+
if not records: return None
65+
df=pd.json_normalize(records)
66+
df['name']=df.get('name','').astype(str)
67+
df['dur']=pd.to_numeric(df.get('dur',0),errors='coerce').fillna(0.0)
68+
df['pid']=pd.to_numeric(df.get('pid',0),errors='coerce').fillna(0).astype(int)
69+
df['tid']=pd.to_numeric(df.get('tid',0),errors='coerce').fillna(0).astype(int)
70+
return df[['name','dur','pid','tid']]
71+
72+
73+
def summarize_groups(df):
74+
df=df[df['dur']>0].copy()
75+
df['group']=df['name'].apply(assign_group)
76+
total=df['dur'].sum()
77+
agg=(df.groupby('group')['dur']
78+
.agg(Total_Time='sum',Num_Instances='count',Average='mean',Min='min',Max='max',StdDev='std')
79+
.reset_index())
80+
agg['% Total Time']=(100*agg['Total_Time']/total).round(3)
81+
for c in ['Total_Time','Average','Min','Max','StdDev']:
82+
agg[c]=agg[c].round(3)
83+
return agg.sort_values('% Total Time',ascending=False)[['group','% Total Time','Total_Time','Num_Instances','Average','Min','Max','StdDev']]
84+
85+
86+
def summarize_events(df):
87+
df=df[df['dur']>0]
88+
total=df['dur'].sum()
89+
agg=(df.groupby('name')['dur']
90+
.agg(Total_Time='sum',Num_Instances='count',Average='mean',Min='min',Max='max',StdDev='std')
91+
.reset_index())
92+
agg['% Total Time']=(100*agg['Total_Time']/total).round(3)
93+
for c in ['Total_Time','Average','Min','Max','StdDev']:
94+
agg[c]=agg[c].round(3)
95+
return agg.sort_values('% Total Time',ascending=False)[['name','% Total Time','Total_Time','Num_Instances','Average','Min','Max','StdDev']]
96+
97+
98+
def summarize_process_groups(df):
99+
df=df[df['dur']>0].copy()
100+
df['group']=df['name'].apply(assign_group)
101+
agg=(df.groupby(['pid','group'])['dur']
102+
.agg(Total_Time='sum',Num_Instances='count',Average='mean',Min='min',Max='max',StdDev='std')
103+
.reset_index())
104+
pct=(agg.groupby('pid')['Total_Time'].sum().rename('pid_total').reset_index())
105+
agg=agg.merge(pct,on='pid')
106+
agg['% Total Time']=(100*agg['Total_Time']/agg['pid_total']).round(3)
107+
for c in ['Total_Time','Average','Min','Max','StdDev']:
108+
agg[c]=agg[c].round(3)
109+
return agg.sort_values(['pid','% Total Time'],ascending=[True,False])[['pid','group','% Total Time','Total_Time','Num_Instances','Average','Min','Max','StdDev']]
110+
111+
112+
def summarize_thread_groups(df):
113+
df=df[df['dur']>0].copy()
114+
df['group']=df['name'].apply(assign_group)
115+
agg=(df.groupby(['tid','group'])['dur']
116+
.agg(Total_Time='sum',Num_Instances='count',Average='mean',Min='min',Max='max',StdDev='std')
117+
.reset_index())
118+
pct=(agg.groupby('tid')['Total_Time'].sum().rename('tid_total').reset_index())
119+
agg=agg.merge(pct,on='tid')
120+
agg['% Total Time']=(100*agg['Total_Time']/agg['tid_total']).round(3)
121+
for c in ['Total_Time','Average','Min','Max','StdDev']:
122+
agg[c]=agg[c].round(3)
123+
return agg.sort_values(['tid','% Total Time'],ascending=[True,False])[['tid','group','% Total Time','Total_Time','Num_Instances','Average','Min','Max','StdDev']]
124+
125+
126+
def build_group_map(df):
127+
gm=defaultdict(set)
128+
for nm in df['name'].unique(): gm[assign_group(nm)].add(nm)
129+
return {g:sorted(list(ev)) for g,ev in gm.items()}
130+
131+
132+
def print_tree_for_node(node,df,group_map,mode,all_ev):
133+
print(f"\n===== Summary for {node} =====\n")
134+
if mode=='events':
135+
print(summarize_events(df).to_string(index=False)); return
136+
if mode=='node':
137+
print((summarize_events(df) if all_ev else summarize_groups(df)).to_string(index=False));return
138+
if mode=='process':
139+
if all_ev:
140+
for pid,sub in df.groupby('pid'):
141+
print(f"--- Process {pid} ---")
142+
print(summarize_events(sub).to_string(index=False));print("============")
143+
else:
144+
for pid,sub in summarize_process_groups(df).groupby('pid'):
145+
print(f"--- Process {pid} ---")
146+
print(sub.drop(columns='pid').to_string(index=False));print("============")
147+
return
148+
if mode=='thread':
149+
if all_ev:
150+
for tid,sub in df.groupby('tid'):
151+
print(f"--- Thread {tid} ---")
152+
print(summarize_events(sub).to_string(index=False));print("-------")
153+
else:
154+
for tid,sub in summarize_thread_groups(df).groupby('tid'):
155+
print(f"--- Thread {tid} ---")
156+
print(sub.drop(columns='tid').to_string(index=False));print("-------")
157+
return
158+
159+
160+
def highlight_across_nodes(per_node, key_col,metrics):
161+
nodes=list(per_node.keys())
162+
keys=sorted({k for df in per_node.values() for k in df[key_col].tolist()})
163+
for key in keys:
164+
print(f"\n>>> {key}")
165+
max_holder={m: max(((n,per_node[n].set_index(key_col).get(m,0).get(key,0)) for n in nodes),key=lambda x:x[1])[0]
166+
for m in metrics}
167+
hdr=" node"+"".join(m.rjust(12) for m in metrics)
168+
print(hdr)
169+
for n in nodes:
170+
row=per_node[n]
171+
vals={m:row.set_index(key_col).get(m,0).get(key,0) for m in metrics}
172+
line=" "+n.ljust(11)
173+
for m in metrics:
174+
col=Fore.RED if n==max_holder[m] else Fore.GREEN
175+
line+=col+f"{vals[m]:12.3f}"+Style.RESET_ALL
176+
print(line)
177+
178+
179+
def main():
180+
p=argparse.ArgumentParser()
181+
p.add_argument('directory')
182+
g=p.add_mutually_exclusive_group(required=True)
183+
g.add_argument('--node',action='store_true')
184+
g.add_argument('--process',action='store_true')
185+
g.add_argument('--thread',action='store_true')
186+
g.add_argument('--events',action='store_true')
187+
p.add_argument('--all-events',action='store_true')
188+
p.add_argument('--aggregate',action='store_true',help='Highlight max metric across nodes')
189+
args=p.parse_args()
190+
base=args.directory
191+
if not os.path.isdir(base): p.error(f"{base} not a dir")
192+
if glob.glob(os.path.join(base,'*.pfw.gz')): nodes=[base]
193+
else: nodes=[os.path.join(base,d) for d in sorted(os.listdir(base)) if os.path.isdir(os.path.join(base,d))]
194+
raw={}
195+
for nd in nodes:
196+
df=load_node_df(nd)
197+
if df is None: continue
198+
nm=os.path.basename(nd.rstrip(os.sep))
199+
raw[nm if nm.lower()!='compact' else os.path.basename(os.path.dirname(nd))]=df
200+
if not raw:
201+
print("No traces");return
202+
group_map=build_group_map(pd.concat(raw.values(),ignore_index=True))
203+
mode=('events' if args.events else 'process' if args.process else 'thread' if args.thread else 'node')
204+
for n, df in raw.items():
205+
# Choose behavior for process + aggregate: ascii tree with avg only
206+
if mode == 'process' and args.aggregate:
207+
# Aggregate per-process: ascii tree by average, highlight group-wise max across all processes
208+
print(f"\n===== Summary for {n} (aggregate per-process) =====\n")
209+
proc_df = summarize_process_groups(df)
210+
# Determine, for each group, which pid has the maximum average
211+
max_pid_map = proc_df.loc[
212+
proc_df.groupby('group')['Average'].idxmax()
213+
].set_index('group')['pid'].to_dict()
214+
for pid, sub in proc_df.groupby('pid'):
215+
print(f"Process {pid}")
216+
for _, row in sub.iterrows():
217+
print("|")
218+
# backslash escaped to print literal \___
219+
prefix = "\\___"
220+
grp = row['group']
221+
avg = row['Average']
222+
line = f"{prefix} {grp} (avg: {avg:.6e})"
223+
# highlight in red if this process holds the max for this group, otherwise green
224+
if max_pid_map.get(grp) == pid:
225+
print(Fore.RED + line + Style.RESET_ALL)
226+
else:
227+
print(Fore.GREEN + line + Style.RESET_ALL)
228+
print()
229+
continue
230+
231+
# Standard tree output
232+
print_tree_for_node(n, df, group_map, mode, args.all_events)
233+
234+
# Global aggregate highlighting for other modes
235+
if args.aggregate and not (mode == 'process'):
236+
summ = {}
237+
metrics = ['% Total Time','Total_Time','Num_Instances','Average','Min','Max','StdDev']
238+
for n, df in raw.items():
239+
if mode=='node':
240+
summ[n] = summarize_events(df) if args.all_events else summarize_groups(df)
241+
elif mode=='events':
242+
summ[n] = summarize_events(df)
243+
elif mode=='process':
244+
summ[n] = summarize_events(df) if args.all_events else summarize_process_groups(df)
245+
else:
246+
summ[n] = summarize_events(df) if args.all_events else summarize_thread_groups(df)
247+
key_col = ('name' if mode=='events' else 'group')
248+
highlight_across_nodes(summ, key_col, metrics)
249+
250+
if __name__=='__main__': main()
251+

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ dependencies = [
4848
darshan = ["darshan>=3.4"]
4949

5050
[project.scripts]
51-
dfanalyzer = "dfanalyzer.__main__:main"
51+
dfanalyzer = "dfanalyzer.__main__:cli"
5252
dfanalyzer-cluster = "dfanalyzer.cluster:main"
5353
dfanalyzer-plot = "dfanalyzer.plots:main"
5454

0 commit comments

Comments
 (0)