-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
98 lines (78 loc) · 3.05 KB
/
cli.py
File metadata and controls
98 lines (78 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from __future__ import annotations
import argparse
import logging
import os
import pathlib
import uuid
from src.pipeline import ingest, cluster, estimate
from src.pipeline.report import run as report_run
from src.utils.config import load_config
from src.utils.metrics import log_metrics, utc_now_iso
def _ensure_dirs(cfg: dict) -> None:
for key, path in cfg["paths"].items():
p = pathlib.Path(path)
if key.endswith("_dir") or key in {"data_raw", "data_derived", "output"}:
p.mkdir(parents=True, exist_ok=True)
else:
p.parent.mkdir(parents=True, exist_ok=True)
def _get_git_commit() -> str | None:
import subprocess
try:
out = subprocess.check_output(["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL)
return out.decode().strip()
except Exception:
return None
def run_pipeline(cfg: dict) -> None:
logging.info("Step 1: ingest")
df_panel, m_ingest = ingest.run(cfg)
logging.info("Step 2: cluster")
df_clustered, m_cluster = cluster.run(cfg)
m_reg: dict = {}
if "regression" in cfg["params"]:
logging.info("Step 3: estimate")
_, m_reg = estimate.run(cfg)
else:
logging.info("Step 3: estimate (skipped — no regression spec in config)")
logging.info("Step 4: report")
report_run(cfg)
metrics_entry = {
"run_id": str(uuid.uuid4()),
"timestamp": utc_now_iso(),
"git_commit": _get_git_commit(),
"seed": int(cfg["params"]["seed"]),
"datasets": {
"panel_rows": m_ingest.get("rows"),
"n_individuals": m_ingest.get("n_individuals"),
"n_waves": m_ingest.get("n_waves"),
"clustered_rows": m_cluster.get("rows"),
},
"frailty": {
"mean": m_ingest.get("frailty_mean"),
"median": m_ingest.get("frailty_median"),
"death_count": m_ingest.get("death_count"),
},
"clustering": {
"enabled": bool(cfg["params"]["clustering"]["enabled"]),
"k": int(cfg["params"]["clustering"]["k"]),
"silhouette": m_cluster.get("silhouette"),
"inertia": m_cluster.get("inertia"),
"cluster_counts": m_cluster.get("cluster_counts"),
},
"regression": m_reg,
}
log_metrics(metrics_entry, cfg["paths"]["metrics_path"])
def main() -> None:
parser = argparse.ArgumentParser(description="Run the UKHLS health-types pipeline")
parser.add_argument("--config", default="configs/config.yaml")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
cfg = load_config(args.config)
_ensure_dirs(cfg)
log_path = pathlib.Path(cfg["paths"]["logs_dir"]) / "pipeline.log"
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logging.getLogger().addHandler(file_handler)
run_pipeline(cfg)
logging.info("Pipeline complete. Outputs in %s", cfg["paths"]["output"])
if __name__ == "__main__":
main()