Skip to content

Commit 4a9b2f9

Browse files
authored
Merge pull request #52 from izzet/feature/set-layer-metrics-batch
Optimize `set_layer_metrics` batching
2 parents 9bd31b0 + 8ccc85d commit 4a9b2f9

7 files changed

Lines changed: 205 additions & 9 deletions

File tree

python/dftracer/analyzer/analyzer.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import os
88
import pandas as pd
99
import structlog
10-
from betterset import BetterSet as S
1110
from dask import compute, persist
1211
from dask.distributed import fire_and_forget, get_client, wait
1312
from omegaconf import OmegaConf
@@ -45,12 +44,14 @@
4544
ViewType,
4645
Views,
4746
)
47+
from .utils.collection_utils import is_set_like_series
4848
from .utils.dask_agg import quantile_stats, unique_set, unique_set_flatten
4949
from .utils.dask_utils import flatten_column_names
5050
from .utils.expr_utils import extract_numerator_and_denominators
5151
from .utils.file_utils import ensure_dir
5252
from .utils.json_encoders import NpEncoder
5353
from .utils.log_utils import console_block, log_block
54+
from .utils.pandas_utils import to_nullable_numeric
5455

5556

5657
CHECKPOINT_FLAT_VIEW = "_flat_view"
@@ -700,19 +701,42 @@ def set_layer_metrics(
700701
hlm = hlm.copy()
701702
hlm_columns = list(hlm.columns)
702703
size_derived_metric_set = set(size_derived_metrics or [])
704+
is_size_col = {col: (col == "size" or "size_bin" in col) for col in hlm_columns}
705+
col_kinds = {}
706+
numeric_cols = {}
707+
708+
for col in hlm_columns:
709+
series = hlm[col]
710+
if is_size_col[col] or pd.api.types.is_numeric_dtype(series.dtype):
711+
col_kinds[col] = "numeric"
712+
numeric_cols[col] = to_nullable_numeric(series)
713+
elif pd.api.types.is_string_dtype(series.dtype):
714+
col_kinds[col] = "string"
715+
elif is_set_like_series(series):
716+
col_kinds[col] = "set_like"
717+
else:
718+
raise TypeError(
719+
f"Unsupported data type '{series.dtype}' for column '{col}'. "
720+
"Developer must add explicit handling for this data type in set_layer_metrics."
721+
)
722+
723+
# Build derived columns in-memory and append once to avoid repeated fragmentation.
724+
derived_cols: Dict[str, pd.Series] = {}
703725
for metric, condition in derived_metrics.items():
726+
metric_mask = hlm.eval(condition)
704727
is_size_metric = metric in size_derived_metric_set
705728
for col in hlm_columns:
706-
is_size_col = col == "size" or "size_bin" in col
707-
if not is_size_metric and is_size_col:
729+
if not is_size_metric and is_size_col[col]:
708730
continue
709731
metric_col = f"{metric}_{col}"
710-
hlm[metric_col] = pd.NA
711-
if pd.api.types.is_string_dtype(hlm.dtypes[col]) and not is_size_col:
712-
hlm[metric_col] = hlm[metric_col].map(lambda x: S())
713-
hlm[metric_col] = hlm[metric_col].mask(hlm.eval(condition), hlm[col])
714-
if not pd.api.types.is_string_dtype(hlm.dtypes[col]):
715-
hlm[metric_col] = pd.to_numeric(hlm[metric_col], errors="coerce")
732+
if col_kinds[col] in {"string", "set_like"}:
733+
# unique_set_flatten skips None for set-like columns downstream.
734+
derived_cols[metric_col] = hlm[col].where(metric_mask, None)
735+
else:
736+
derived_cols[metric_col] = numeric_cols[col].where(metric_mask)
737+
738+
if derived_cols:
739+
hlm = pd.concat([hlm, pd.DataFrame(derived_cols, index=hlm.index)], axis=1)
716740
return hlm
717741

718742
@staticmethod

python/dftracer/analyzer/utils/collection_utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pandas as pd
22
from typing import Iterable, List
3+
from betterset import BetterSet as S
34

45

56
def deepflatten(collection, ignore_types=(bytes, str)):
@@ -42,3 +43,10 @@ def join_with_and(values: List[str]):
4243
return ' and '.join(values)
4344
else:
4445
return ', '.join(values[:-1]) + ', and ' + values[-1]
46+
47+
48+
def is_set_like_series(series: pd.Series) -> bool:
49+
for value in series.array:
50+
if value is not None and value is not pd.NA:
51+
return isinstance(value, S)
52+
return False

python/dftracer/analyzer/utils/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ py.install_sources(
1212
'json_encoders.py',
1313
'log_utils.py',
1414
'notebook_utils.py',
15+
'pandas_utils.py',
1516
'warning_utils.py',
1617
],
1718
subdir: 'dftracer/analyzer/utils',
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import pandas as pd
2+
3+
4+
def to_nullable_numeric(series: pd.Series) -> pd.Series:
5+
numeric = pd.to_numeric(series, errors="coerce")
6+
if pd.api.types.is_integer_dtype(numeric.dtype):
7+
return numeric.astype("Int64")
8+
if pd.api.types.is_float_dtype(numeric.dtype):
9+
return numeric.astype("Float64")
10+
return numeric

tests/test_set_layer_metrics.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import numpy as np
2+
import pandas as pd
3+
import pytest
4+
from betterset import BetterSet as S
5+
6+
from dftracer.analyzer.analyzer import Analyzer
7+
from dftracer.analyzer.utils.dask_agg import unique_set_flatten
8+
9+
pytestmark = [pytest.mark.smoke, pytest.mark.full]
10+
11+
12+
DERIVED_METRICS = {
13+
"read": "io_cat == 1",
14+
"write": "io_cat == 2",
15+
"metadata": "io_cat == 3",
16+
}
17+
18+
SIZE_DERIVED_METRICS = ["read", "write"]
19+
20+
21+
def _build_hlm_df(n_rows: int = 30_000) -> pd.DataFrame:
22+
io_cat = np.tile(np.array([1, 2, 3, 1, 2], dtype=np.int64), int(np.ceil(n_rows / 5)))[:n_rows]
23+
idx = np.arange(n_rows, dtype=np.int64)
24+
return pd.DataFrame(
25+
{
26+
"io_cat": io_cat,
27+
"count": (idx % 17) + 1,
28+
"time": ((idx % 23) + 1).astype(float),
29+
"size": ((idx % 101) + 1) * 4096,
30+
"size_bin_0_4kb": (idx % 2).astype(np.int64),
31+
"func_name": np.where(io_cat == 1, "read", np.where(io_cat == 2, "write", "metadata")),
32+
}
33+
)
34+
35+
36+
def test_set_layer_metrics_correctness() -> None:
37+
hlm = _build_hlm_df(n_rows=2_000)
38+
out = Analyzer.set_layer_metrics(
39+
hlm=hlm,
40+
derived_metrics=DERIVED_METRICS,
41+
size_derived_metrics=SIZE_DERIVED_METRICS,
42+
)
43+
44+
# Size columns should only be created for metrics explicitly listed in size_derived_metrics.
45+
assert "read_size" in out.columns
46+
assert "write_size" in out.columns
47+
assert "metadata_size" not in out.columns
48+
49+
read_mask = hlm["io_cat"] == 1
50+
write_mask = hlm["io_cat"] == 2
51+
metadata_mask = hlm["io_cat"] == 3
52+
53+
assert np.allclose(
54+
out.loc[read_mask, "read_count"].astype(float),
55+
pd.to_numeric(hlm.loc[read_mask, "count"], errors="coerce").astype(float),
56+
equal_nan=True,
57+
)
58+
assert out.loc[~read_mask, "read_count"].isna().all()
59+
assert str(out["read_count"].dtype) == "Int64"
60+
61+
assert np.allclose(
62+
out.loc[write_mask, "write_time"].astype(float),
63+
pd.to_numeric(hlm.loc[write_mask, "time"], errors="coerce").astype(float),
64+
equal_nan=True,
65+
)
66+
assert out.loc[~write_mask, "write_time"].isna().all()
67+
assert str(out["write_time"].dtype) == "Float64"
68+
69+
# String-derived columns carry original values for matching rows and missing values otherwise.
70+
# Downstream unique_set_flatten skips missing values.
71+
assert (out.loc[read_mask, "read_func_name"] == hlm.loc[read_mask, "func_name"]).all()
72+
assert out.loc[~read_mask, "read_func_name"].isna().all()
73+
assert (out.loc[metadata_mask, "metadata_func_name"] == hlm.loc[metadata_mask, "func_name"]).all()
74+
75+
76+
def test_set_layer_metrics_preserves_betterset_columns() -> None:
77+
hlm = pd.DataFrame(
78+
{
79+
"group": ["g0", "g0", "g1", "g1"],
80+
"io_cat": pd.Series([1, 2, 1, 3], dtype="Int64"),
81+
"count": pd.Series([1, 2, 3, 4], dtype="Int64"),
82+
"file_name": pd.Series(
83+
[S(["a"]), S(["b"]), S(["c"]), S(["d"])],
84+
dtype="object",
85+
),
86+
}
87+
)
88+
out = Analyzer.set_layer_metrics(
89+
hlm=hlm,
90+
derived_metrics=DERIVED_METRICS,
91+
size_derived_metrics=SIZE_DERIVED_METRICS,
92+
)
93+
94+
read_mask = hlm["io_cat"] == 1
95+
for idx in hlm.index[read_mask]:
96+
assert out.at[idx, "read_file_name"] == hlm.at[idx, "file_name"]
97+
assert out.loc[~read_mask, "read_file_name"].isna().all()
98+
99+
flatten_agg = unique_set_flatten()
100+
chunked = flatten_agg.chunk(out.groupby("group")["read_file_name"])
101+
aggregated = flatten_agg.agg(chunked.groupby(level=0))
102+
assert set(aggregated.loc["g0"]) == {"a"}
103+
assert set(aggregated.loc["g1"]) == {"c"}
104+
105+
106+
def test_set_layer_metrics_perf_smoke() -> None:
107+
hlm = _build_hlm_df(n_rows=50_000)
108+
out = None
109+
for _ in range(8):
110+
out = Analyzer.set_layer_metrics(
111+
hlm=hlm,
112+
derived_metrics=DERIVED_METRICS,
113+
size_derived_metrics=SIZE_DERIVED_METRICS,
114+
)
115+
assert out is not None
116+
assert int(out["read_count"].notna().sum()) > 0
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import pandas as pd
2+
import pytest
3+
from betterset import BetterSet as S
4+
5+
from dftracer.analyzer.utils.collection_utils import is_set_like_series
6+
7+
pytestmark = [pytest.mark.smoke, pytest.mark.full]
8+
9+
10+
def test_is_set_like_series_detects_betterset_values() -> None:
11+
series = pd.Series([None, S(["a"]), S(["b"])], dtype="object")
12+
assert is_set_like_series(series) is True
13+
14+
15+
def test_is_set_like_series_ignores_plain_strings() -> None:
16+
series = pd.Series(["a", "b", None], dtype="object")
17+
assert is_set_like_series(series) is False

tests/utils/test_pandas_utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import pandas as pd
2+
import pytest
3+
4+
from dftracer.analyzer.utils.pandas_utils import to_nullable_numeric
5+
6+
pytestmark = [pytest.mark.smoke, pytest.mark.full]
7+
8+
9+
def test_to_nullable_numeric_preserves_integer_nullability() -> None:
10+
series = pd.Series([1, 2, 3], dtype="int64")
11+
out = to_nullable_numeric(series).where(pd.Series([True, False, True]))
12+
assert str(out.dtype) == "Int64"
13+
assert out.tolist() == [1, pd.NA, 3]
14+
15+
16+
def test_to_nullable_numeric_preserves_float_nullability() -> None:
17+
series = pd.Series([1.5, 2.5, 3.5], dtype="float64")
18+
out = to_nullable_numeric(series).where(pd.Series([True, False, True]))
19+
assert str(out.dtype) == "Float64"
20+
assert out.tolist() == [1.5, pd.NA, 3.5]

0 commit comments

Comments
 (0)