Skip to content

Commit 0c0f77e

Browse files
authored
sharrow thread limits and cache clear script (#1032)
* sharrow cache clear script * prevent multithreading unless --fast * log environment variables
1 parent 83613b2 commit 0c0f77e

3 files changed

Lines changed: 85 additions & 2 deletions

File tree

activitysim/cli/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,18 @@ def prog():
4444

4545

4646
def main():
47-
# set all these before we import numpy or any other math library
47+
prevent_multithreading = False
4848
if len(sys.argv) > 1 and sys.argv[1] == "benchmark":
49+
# We cannot use multiple threads for benchmarking, as it can foil timing
50+
prevent_multithreading = True
51+
if "--fast" not in sys.argv:
52+
# If we are not in fast mode, we want to prevent multithreading to avoid
53+
# issues with some libraries crashing. Fast mode is not stable enough
54+
# at scale to use by default, but can be used for testing and development.
55+
prevent_multithreading = True
56+
57+
# set all these before we import numpy or any other math library
58+
if prevent_multithreading:
4959
os.environ["MKL_NUM_THREADS"] = "1"
5060
os.environ["OMP_NUM_THREADS"] = "1"
5161
os.environ["OPENBLAS_NUM_THREADS"] = "1"

activitysim/core/mp_tasks.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from activitysim.core import config, mem, tracing, util, workflow
2121
from activitysim.core.configuration import FileSystem, Settings
22+
from activitysim.core.exceptions import *
2223
from activitysim.core.run_id import RunId
2324
from activitysim.core.workflow.checkpoint import (
2425
CHECKPOINT_NAME,
@@ -27,7 +28,6 @@
2728
NON_TABLE_COLUMNS,
2829
ParquetStore,
2930
)
30-
from activitysim.core.exceptions import *
3131

3232
logger = logging.getLogger(__name__)
3333

@@ -257,6 +257,21 @@ def exception(state: workflow.State, msg, write_to_log_file=True):
257257
logger.log(logging.ERROR, f"\n---\n{traceback.format_exc()}---\n")
258258

259259

260+
def log_environment_info(state: workflow.State):
261+
"""log environment info for debugging purposes."""
262+
if os.environ.get("ASIM_LOG_ENVIRON", False):
263+
process_name = multiprocessing.current_process().name
264+
environ_summary = f"OS.ENVIRON in process {process_name}:"
265+
for k, v in os.environ.items():
266+
environ_summary += f"\n--- {k}: {v}"
267+
info(state, environ_summary)
268+
else:
269+
info(
270+
state,
271+
"ASIM_LOG_ENVIRON not set, skipping logging of environment variables. Set ASIM_LOG_ENVIRON=1 to log environment variables.",
272+
)
273+
274+
260275
"""
261276
### child process methods (called within sub process)
262277
"""
@@ -1097,6 +1112,7 @@ def mp_run_simulation(
10971112
state,
10981113
f"mp_run_simulation {step_info['name']} locutor={state.get_injectable('locutor', False)} ",
10991114
)
1115+
log_environment_info(state)
11001116

11011117
try:
11021118
if step_info["num_processes"] > 1:
@@ -1417,6 +1433,7 @@ def check_proc_status(state: workflow.State):
14171433
completed = set(previously_completed)
14181434
failed = set([]) # so we can log process failure first time it happens
14191435
drop_breadcrumb(state, step_name, "completed", list(completed))
1436+
log_environment_info(state)
14201437

14211438
for i, process_name in enumerate(process_names):
14221439
q = multiprocessing.Queue()
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env -S uv run --script --no-project
2+
#
3+
# /// script
4+
# requires-python = ">=3.10,<3.12"
5+
# dependencies = [
6+
# "platformdirs",
7+
# ]
8+
# ///
9+
10+
from __future__ import annotations
11+
12+
import shutil
13+
from datetime import datetime
14+
from pathlib import Path
15+
16+
import platformdirs
17+
18+
19+
def clear_sharrow_cache(archive: bool = True):
20+
"""Disable all files in the sharrow cache directory.
21+
22+
Parameters
23+
----------
24+
archive : bool, optional
25+
If True, move the files to an 'archive' subdirectory instead of deleting them,
26+
by default True.
27+
"""
28+
main_dir = Path(platformdirs.user_cache_dir(appname="ActivitySim"))
29+
print(f"Scanning ActivitySim cache directory for sharrow files: {main_dir}")
30+
sharrow_cache_dirs = main_dir.glob("sharrow-*-numba-*")
31+
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
32+
33+
# define the archive path if archiving is enabled
34+
if archive:
35+
archive_path = Path(
36+
platformdirs.user_cache_dir(appname="ActivitySim")
37+
).joinpath(f"archive-{timestamp}")
38+
else:
39+
archive_path = None
40+
41+
for sharrow_cache_dir in sharrow_cache_dirs:
42+
if sharrow_cache_dir.is_dir():
43+
print(f"Clearing sharrow cache directory: {sharrow_cache_dir}")
44+
if archive:
45+
archive_path.mkdir(parents=True, exist_ok=True)
46+
shutil.move(
47+
str(sharrow_cache_dir), str(archive_path / sharrow_cache_dir.name)
48+
)
49+
else:
50+
shutil.rmtree(str(sharrow_cache_dir))
51+
else:
52+
print("No sharrow cache directories found.")
53+
54+
55+
if __name__ == "__main__":
56+
clear_sharrow_cache(archive=True)

0 commit comments

Comments
 (0)