-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.py
More file actions
138 lines (112 loc) · 4.48 KB
/
parallel.py
File metadata and controls
138 lines (112 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
A context manager for parallel and distributed processing using
multiprocessing.Manager to share state across processes.
"""
from collections.abc import Callable
from collections.abc import Iterator
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import threading
from threading import Lock
from typing import Any
from typing import TypeVar
from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager
R = TypeVar("R")
class ParallelContextHandle(IContextHandle):
"""
A lightweight, picklable handle that carries the actual shared objects
(the DictProxy and Lock) to worker processes.
"""
def __init__(self, shared_dict: DictProxy, lock: Lock):
self._shared_dict = shared_dict
self._lock = lock
def create_proxy(self) -> "IContextManager":
"""
Creates a new ParallelContextManager instance that wraps the shared
objects received by the worker process.
"""
return ParallelContextManager(handle=self)
class ParallelContextManager(IContextManager):
"""
A context manager that enables state sharing across processes.
It operates in two modes:
1. Main Mode: When created normally, it starts a multiprocessing.Manager
and creates a shared dictionary and lock.
2. Proxy Mode: When created from a handle, it wraps a DictProxy and Lock
that were received from another process. It does not own the manager.
"""
def __init__(self, initial_context: dict[str, Any] | None = None, handle: ParallelContextHandle | None = None):
"""
Initializes the manager. If a handle is provided, it initializes in
proxy mode; otherwise, it starts a new manager.
"""
if handle:
# --- PROXY MODE INITIALIZATION ---
# This instance is a client wrapping objects from an existing server.
self._manager = None # Proxies do not own the manager process.
self._shared_dict = handle._shared_dict
self._lock = handle._lock
else:
# --- MAIN MODE INITIALIZATION ---
# This instance owns the manager and its shared objects.
self._manager = mp.Manager()
self._shared_dict = self._manager.dict(initial_context or {})
self._lock = self._manager.Lock()
# Thread-local storage for lock state to handle concurrent access
self._local = threading.local()
def _lock_context(self) -> None:
"""Acquire the lock for this context manager."""
if not getattr(self._local, "is_locked", False):
self._lock.acquire()
self._local.is_locked = True
def _unlock_context(self) -> None:
"""Release the lock for this context manager."""
if getattr(self._local, "is_locked", False):
self._lock.release()
self._local.is_locked = False
def _execute_locked(self, operation: Callable[[], R]) -> R:
"""A private helper to execute an operation within a lock."""
if not getattr(self._local, "is_locked", False):
self._lock_context()
try:
return operation()
finally:
self._unlock_context()
else:
return operation()
def get_handle(self) -> ParallelContextHandle:
"""
Returns a picklable handle containing the shared dict and lock.
Only the main instance can generate handles.
"""
if not self._manager:
raise TypeError("Cannot get a handle from a proxy context instance.")
return ParallelContextHandle(self._shared_dict, self._lock)
def shutdown(self) -> None:
"""
Shuts down the background manager process.
This is a no-op for proxy instances.
"""
if self._manager:
self._manager.shutdown()
def __enter__(self) -> "ParallelContextManager":
"""Acquires the lock for use in a 'with' statement."""
self._lock_context()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Releases the lock."""
self._unlock_context()
def __getitem__(self, key: str) -> Any:
return self._shared_dict[key]
def __setitem__(self, key: str, value: Any) -> None:
self._execute_locked(lambda: self._shared_dict.__setitem__(key, value))
def __delitem__(self, key: str) -> None:
self._execute_locked(lambda: self._shared_dict.__delitem__(key))
def __iter__(self) -> Iterator[str]:
# Iteration needs to copy the keys to be safe across processes
return self._execute_locked(lambda: iter(list(self._shared_dict.keys())))
def __len__(self) -> int:
return self._execute_locked(lambda: len(self._shared_dict))
def to_dict(self) -> dict[str, Any]:
return self._execute_locked(lambda: dict(self._shared_dict))