Skip to content

Commit 61524a0

Browse files
committed
Add hierarchical cgroups and pattern-specific cgroup kill
Signed-off-by: Cong Wang <cwang@multikernel.io>
1 parent 91794de commit 61524a0

11 files changed

Lines changed: 1190 additions & 34 deletions

File tree

src/branching/agent/patterns.py

Lines changed: 230 additions & 7 deletions
Large diffs are not rendered by default.

src/branching/agent/speculate.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
isolate_processes: bool = False,
3838
timeout: float | None = None,
3939
resource_limits: ResourceLimits | None = None,
40+
group_limits: ResourceLimits | None = None,
4041
):
4142
"""
4243
Args:
@@ -49,20 +50,56 @@ def __init__(
4950
timeout: Overall timeout in seconds for all candidates.
5051
resource_limits: Optional per-branch resource limits (implies
5152
process isolation).
53+
group_limits: Optional resource limits applied to the root
54+
cgroup that contains all branches.
5255
"""
5356
self._candidates = list(candidates)
5457
self._first_wins = first_wins
5558
self._max_parallel = max_parallel or len(self._candidates)
5659
self._isolate_processes = isolate_processes
5760
self._timeout = timeout
5861
self._resource_limits = resource_limits
62+
self._group_limits = group_limits
5963

6064
def __call__(self, workspace: Workspace) -> SpeculationOutcome:
65+
import os as _os
66+
67+
root_cgroup: Optional[Path] = None
68+
if self._resource_limits is not None and self._group_limits is not None:
69+
try:
70+
from ..process._cgroup import create_group
71+
root_cgroup = create_group(
72+
f"speculate-{_os.getpid()}",
73+
limits=self._group_limits,
74+
)
75+
except OSError:
76+
root_cgroup = None
77+
78+
try:
79+
return self._run(workspace, root_cgroup)
80+
finally:
81+
if root_cgroup is not None:
82+
from ..process._cgroup import kill_scope
83+
kill_scope(root_cgroup)
84+
85+
def _run(self, workspace: Workspace, root_cgroup: Optional[Path]) -> SpeculationOutcome:
6186
results: list[SpeculationResult] = [None] * len(self._candidates) # type: ignore
6287
winner: Optional[SpeculationResult] = None
6388
committed = False
6489
cancel_event = threading.Event()
6590

91+
# Track live cgroup scope paths so we can kill losers immediately.
92+
# Populated by scope_callback from run_in_process; read by
93+
# _kill_scopes. dict ops are GIL-protected in CPython.
94+
branch_scopes: dict[int, Path] = {}
95+
96+
def _kill_scopes(exclude: int = -1) -> None:
97+
"""Kill all tracked cgroup scopes except *exclude*."""
98+
from ..process._cgroup import kill_scope
99+
for idx, scope in list(branch_scopes.items()):
100+
if idx != exclude:
101+
kill_scope(scope)
102+
66103
def _run_candidate(index: int) -> SpeculationResult:
67104
branch_name = f"speculate_{index}"
68105
result = SpeculationResult(branch_index=index, success=False)
@@ -81,7 +118,13 @@ def _run_candidate(index: int) -> SpeculationResult:
81118
return result
82119

83120
if self._resource_limits is not None:
84-
success = self._run_with_limits(b.path, index)
121+
def _on_scope(scope_path: Path, _i: int = index) -> None:
122+
branch_scopes[_i] = scope_path
123+
124+
success = self._run_with_limits(
125+
b.path, index, root_cgroup,
126+
scope_callback=_on_scope,
127+
)
85128
elif self._isolate_processes:
86129
success = self._run_in_process(b.path, index)
87130
else:
@@ -93,6 +136,7 @@ def _run_candidate(index: int) -> SpeculationResult:
93136
if result.success and not cancel_event.is_set():
94137
if self._first_wins:
95138
cancel_event.set()
139+
_kill_scopes(index)
96140
try:
97141
b.commit()
98142
except ConflictError:
@@ -131,6 +175,7 @@ def _run_candidate(index: int) -> SpeculationResult:
131175
except TimeoutError:
132176
# Signal remaining candidates to abort
133177
cancel_event.set()
178+
_kill_scopes()
134179

135180
# Wait briefly for in-flight branches to finish cleanup
136181
for f in futures:
@@ -173,7 +218,10 @@ def target(workspace: Path) -> None:
173218
except ProcessBranchError:
174219
return False
175220

176-
def _run_with_limits(self, path: Path, index: int) -> bool:
221+
def _run_with_limits(
222+
self, path: Path, index: int, parent_cgroup: Path | None = None,
223+
scope_callback=None,
224+
) -> bool:
177225
"""Run a candidate in a forked child with resource limits."""
178226
from ..process.runner import run_in_process
179227
from ..exceptions import ProcessBranchError
@@ -191,6 +239,8 @@ def _run_with_limits(self, path: Path, index: int) -> bool:
191239
workspace=path,
192240
limits=self._resource_limits,
193241
timeout=per_candidate,
242+
parent_cgroup=parent_cgroup,
243+
scope_callback=scope_callback,
194244
)
195245
return bool(result)
196246
except ProcessBranchError:

src/branching/process/_cgroup.py

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,89 @@ def _own_cgroup() -> Path:
3333
raise OSError("No cgroup v2 entry found in /proc/self/cgroup")
3434

3535

36-
def create_scope(name: str) -> Path:
37-
"""Create a child cgroup under our own cgroup for process grouping.
36+
def _enable_subtree_controllers(cgroup_dir: Path) -> None:
37+
"""Enable memory and cpu controllers for children of *cgroup_dir*.
3838
39-
Creates under the current process's cgroup, which is compatible
40-
with systemd's delegated hierarchy — we never write into a level
41-
that systemd manages directly.
39+
Reads ``cgroup.controllers`` to discover available controllers, then
40+
writes ``+memory +cpu`` (intersection with available) to
41+
``cgroup.subtree_control``. Best-effort — silently ignores errors.
42+
"""
43+
try:
44+
available = (cgroup_dir / "cgroup.controllers").read_text().split()
45+
except OSError:
46+
return
47+
wanted = [c for c in ("memory", "cpu") if c in available]
48+
if not wanted:
49+
return
50+
payload = " ".join(f"+{c}" for c in wanted)
51+
try:
52+
(cgroup_dir / "cgroup.subtree_control").write_text(payload)
53+
except OSError:
54+
pass
55+
56+
57+
def create_scope(name: str, *, parent: Path | None = None) -> Path:
58+
"""Create a child cgroup under *parent* (or our own cgroup) for process grouping.
59+
60+
When *parent* is given the scope is created as a child of that
61+
directory and ``_enable_subtree_controllers`` is called on the parent
62+
first so that memory/cpu controllers are available in the child.
4263
4364
Args:
4465
name: Scope name suffix (e.g. PID).
66+
parent: Optional parent cgroup directory. Defaults to
67+
``_own_cgroup()`` when ``None``.
4568
4669
Returns:
4770
Path to the cgroup directory.
4871
4972
Raises:
5073
OSError: If cgroup creation fails.
5174
"""
52-
parent = _own_cgroup()
53-
scope_dir = parent / f"branching-{name}.scope"
75+
if parent is not None:
76+
_enable_subtree_controllers(parent)
77+
parent_dir = parent
78+
else:
79+
parent_dir = _own_cgroup()
80+
scope_dir = parent_dir / f"branching-{name}.scope"
5481
scope_dir.mkdir(exist_ok=True)
5582
return scope_dir
5683

5784

85+
def create_group(
86+
name: str,
87+
*,
88+
parent: Path | None = None,
89+
limits: "ResourceLimits | None" = None,
90+
) -> Path:
91+
"""Create an intermediate cgroup for nesting (never holds PIDs directly).
92+
93+
Calls ``_enable_subtree_controllers`` on the new directory so that
94+
child scopes can use memory/cpu controllers. Applies optional
95+
group-level limits (total budget for all children).
96+
97+
Args:
98+
name: Group name suffix.
99+
parent: Optional parent cgroup directory. Defaults to
100+
``_own_cgroup()`` when ``None``.
101+
limits: Optional resource limits applied to the group itself.
102+
103+
Returns:
104+
Path to the new group cgroup directory.
105+
"""
106+
if parent is not None:
107+
_enable_subtree_controllers(parent)
108+
parent_dir = parent
109+
else:
110+
parent_dir = _own_cgroup()
111+
group_dir = parent_dir / f"branching-{name}.scope"
112+
group_dir.mkdir(exist_ok=True)
113+
_enable_subtree_controllers(group_dir)
114+
if limits is not None:
115+
set_limits(group_dir, limits)
116+
return group_dir
117+
118+
58119
def add_pid(scope: Path, pid: int) -> None:
59120
"""Add a PID to a cgroup scope.
60121
@@ -68,7 +129,7 @@ def add_pid(scope: Path, pid: int) -> None:
68129
(scope / "cgroup.procs").write_text(str(pid))
69130

70131

71-
def set_limits(scope: Path, limits: ResourceLimits) -> None:
132+
def set_limits(scope: Path, limits: "ResourceLimits") -> None:
72133
"""Apply resource limits to a cgroup scope.
73134
74135
Best-effort — skips ``None`` fields and ignores write errors so that
@@ -79,6 +140,16 @@ def set_limits(scope: Path, limits: ResourceLimits) -> None:
79140
(scope / "memory.max").write_text(str(limits.memory))
80141
except OSError:
81142
pass
143+
if limits.memory_high is not None:
144+
try:
145+
(scope / "memory.high").write_text(str(limits.memory_high))
146+
except OSError:
147+
pass
148+
if limits.oom_group:
149+
try:
150+
(scope / "memory.oom.group").write_text("1")
151+
except OSError:
152+
pass
82153
if limits.cpu is not None:
83154
period = 100_000 # 100 ms
84155
quota = int(limits.cpu * period)
@@ -89,11 +160,21 @@ def set_limits(scope: Path, limits: ResourceLimits) -> None:
89160

90161

91162
def kill_scope(scope: Path) -> None:
92-
"""Kill all processes in a cgroup scope and remove it.
163+
"""Kill all processes in a cgroup scope and remove it recursively.
164+
165+
Iterates child directories bottom-up, kills each, then kills and
166+
removes self. Handles the cgroup v2 requirement that ``rmdir``
167+
needs no child cgroups.
93168
94169
Best-effort cleanup — ignores errors.
95170
"""
96171
try:
172+
# Recurse into children first (bottom-up)
173+
if scope.is_dir():
174+
for child in sorted(scope.iterdir()):
175+
if child.is_dir():
176+
kill_scope(child)
177+
# Kill processes in this scope
97178
kill_file = scope / "cgroup.kill"
98179
if kill_file.exists():
99180
kill_file.write_text("1")

src/branching/process/context.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def __init__(
4242
isolate: bool = False,
4343
close_fds: bool = False,
4444
limits: ResourceLimits | None = None,
45+
parent_cgroup: Path | None = None,
4546
):
4647
"""
4748
Args:
@@ -52,12 +53,16 @@ def __init__(
5253
since each child needs its own user ns for bind-mount).
5354
close_fds: BR_CLOSE_FDS — close inherited fds (3+) in child.
5455
limits: Optional resource limits applied via cgroup v2.
56+
parent_cgroup: Optional parent cgroup directory. When given,
57+
the child's scope is created under this directory instead
58+
of the process's own cgroup, enabling hierarchical nesting.
5559
"""
5660
self._target = target
5761
self._workspace = workspace
5862
self._isolate = isolate
5963
self._close_fds = close_fds
6064
self._limits = limits
65+
self._parent_cgroup = parent_cgroup
6166
self._pid: Optional[int] = None
6267
self._exited = False
6368
self._cgroup_scope: Optional[Path] = None
@@ -68,6 +73,11 @@ def pid(self) -> int:
6873
raise ProcessBranchError("Process not started")
6974
return self._pid
7075

76+
@property
77+
def cgroup_scope(self) -> Optional[Path]:
78+
"""The cgroup v2 scope directory, or ``None`` if unavailable."""
79+
return self._cgroup_scope
80+
7181
@property
7282
def alive(self) -> bool:
7383
if self._pid is None or self._exited:
@@ -175,7 +185,9 @@ def __enter__(self) -> "BranchContext":
175185
# (required for per-branch limits).
176186
scope_name = f"{os.getpid()}-{uuid.uuid4().hex[:8]}"
177187
try:
178-
self._cgroup_scope = _cgroup.create_scope(scope_name)
188+
self._cgroup_scope = _cgroup.create_scope(
189+
scope_name, parent=self._parent_cgroup,
190+
)
179191
except OSError:
180192
self._cgroup_scope = None
181193

@@ -253,6 +265,7 @@ def create(
253265
isolate: bool = False,
254266
close_fds: bool = False,
255267
limits: ResourceLimits | None = None,
268+
parent_cgroup: Path | None = None,
256269
) -> Iterator[list["BranchContext"]]:
257270
"""Create N branch contexts, mirroring branch(BR_CREATE, n_branches=N).
258271
@@ -264,6 +277,7 @@ def create(
264277
isolate: BR_ISOLATE — separate user ns per child.
265278
close_fds: BR_CLOSE_FDS — close inherited fds in children.
266279
limits: Optional resource limits applied via cgroup v2.
280+
parent_cgroup: Optional parent cgroup for hierarchical nesting.
267281
268282
Yields:
269283
List of entered BranchContext instances (already forked).
@@ -276,7 +290,7 @@ def create(
276290
for target, workspace in zip(targets, workspaces):
277291
ctx = BranchContext(
278292
target, workspace, isolate=isolate, close_fds=close_fds,
279-
limits=limits,
293+
limits=limits, parent_cgroup=parent_cgroup,
280294
)
281295
ctx.__enter__()
282296
contexts.append(ctx)

src/branching/process/limits.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,17 @@ class ResourceLimits:
5252

5353
cpu: float | None = None
5454
"""Fraction of one CPU (0.5 = 50%). Written to ``cpu.max``."""
55+
56+
memory_high: int | None = None
57+
"""Soft memory throttle in bytes (written to ``memory.high``).
58+
59+
When usage exceeds this value the kernel reclaims aggressively but
60+
does *not* OOM-kill the process.
61+
"""
62+
63+
oom_group: bool = False
64+
"""Atomic OOM termination (written to ``memory.oom.group``).
65+
66+
When ``True``, all processes in the cgroup are killed together on
67+
OOM rather than picking a single victim.
68+
"""

src/branching/process/runner.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ def run_in_process(
2121
*,
2222
limits: ResourceLimits | None = None,
2323
timeout: float | None = None,
24+
parent_cgroup: Path | None = None,
25+
scope_callback: Callable[[Path], None] | None = None,
2426
) -> Any:
2527
"""Run *fn(*args)* in a forked child process, optionally with cgroup limits.
2628
@@ -34,6 +36,10 @@ def run_in_process(
3436
BranchContext workspace).
3537
limits: Optional resource limits applied to the child's cgroup.
3638
timeout: Maximum seconds to wait for the child.
39+
parent_cgroup: Optional parent cgroup for hierarchical nesting.
40+
scope_callback: Optional callback invoked with the cgroup scope path
41+
after the child's scope is created. Allows callers to track live
42+
cgroup paths for external kill/throttle.
3743
3844
Returns:
3945
Whatever *fn* returned.
@@ -61,7 +67,12 @@ def _target(ws_path: Path) -> None:
6167
pass
6268
raise
6369

64-
with BranchContext(_target, workspace=workspace, limits=limits) as ctx:
70+
with BranchContext(
71+
_target, workspace=workspace, limits=limits,
72+
parent_cgroup=parent_cgroup,
73+
) as ctx:
74+
if scope_callback is not None and ctx.cgroup_scope is not None:
75+
scope_callback(ctx.cgroup_scope)
6576
try:
6677
ctx.wait(timeout=timeout)
6778
except ProcessBranchError:

0 commit comments

Comments
 (0)