Skip to content

Commit 4bbd118

Browse files
committed
Always fork in all patterns
Signed-off-by: Cong Wang <cwang@multikernel.io>
1 parent 2150ad1 commit 4bbd118

5 files changed

Lines changed: 169 additions & 131 deletions

File tree

src/branching/agent/patterns.py

Lines changed: 60 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import Callable, Optional, Sequence, TYPE_CHECKING
1111

1212
from ..core.workspace import Workspace
13+
from ..process.runner import run_in_process
1314
from .result import SpeculationResult, SpeculationOutcome
1415

1516
if TYPE_CHECKING:
@@ -90,22 +91,17 @@ def _run_candidate(index: int) -> None:
9091
) as b:
9192
result.branch_path = b.path
9293
try:
93-
if self._resource_limits is not None:
94-
from ..process.runner import run_in_process
95-
96-
def _on_scope(sp: Path, _i: int = index) -> None:
97-
branch_scopes[_i] = sp
98-
99-
ret = run_in_process(
100-
self._task, (b.path, index),
101-
workspace=b.path,
102-
limits=self._resource_limits,
103-
parent_cgroup=root_cgroup,
104-
scope_callback=_on_scope,
105-
)
106-
success, score = ret
107-
else:
108-
success, score = self._task(b.path, index)
94+
def _on_scope(sp: Path, _i: int = index) -> None:
95+
branch_scopes[_i] = sp
96+
97+
ret = run_in_process(
98+
self._task, (b.path, index),
99+
workspace=b.path,
100+
limits=self._resource_limits,
101+
parent_cgroup=root_cgroup,
102+
scope_callback=_on_scope if self._resource_limits else None,
103+
)
104+
success, score = ret
109105
result.success = bool(success)
110106
result.score = score
111107
result.return_value = (success, score)
@@ -254,16 +250,12 @@ def _run(self, workspace: Workspace, root_cgroup: Optional[Path]) -> Speculation
254250
branch_name, on_success=None, on_error="abort"
255251
) as b:
256252
result.branch_path = b.path
257-
if self._resource_limits is not None:
258-
from ..process.runner import run_in_process
259-
success = run_in_process(
260-
self._task, (b.path, attempt, feedback),
261-
workspace=b.path,
262-
limits=self._resource_limits,
263-
parent_cgroup=root_cgroup,
264-
)
265-
else:
266-
success = self._task(b.path, attempt, feedback)
253+
success = run_in_process(
254+
self._task, (b.path, attempt, feedback),
255+
workspace=b.path,
256+
limits=self._resource_limits,
257+
parent_cgroup=root_cgroup,
258+
)
267259
result.success = bool(success)
268260
result.return_value = success
269261

@@ -420,21 +412,16 @@ def _run(index: int) -> None:
420412
) as b:
421413
result.branch_path = b.path
422414
try:
423-
if self._resource_limits is not None:
424-
from ..process.runner import run_in_process
425-
426-
def _on_scope(sp: Path, _i: int = index) -> None:
427-
branch_scopes[_i] = sp
428-
429-
ret = run_in_process(
430-
strategies[index], (b.path,),
431-
workspace=b.path,
432-
limits=self._resource_limits,
433-
parent_cgroup=parent_cgroup,
434-
scope_callback=_on_scope,
435-
)
436-
else:
437-
ret = strategies[index](b.path)
415+
def _on_scope(sp: Path, _i: int = index) -> None:
416+
branch_scopes[_i] = sp
417+
418+
ret = run_in_process(
419+
strategies[index], (b.path,),
420+
workspace=b.path,
421+
limits=self._resource_limits,
422+
parent_cgroup=parent_cgroup,
423+
scope_callback=_on_scope if self._resource_limits else None,
424+
)
438425
if isinstance(ret, tuple):
439426
success, score = ret
440427
else:
@@ -702,21 +689,16 @@ def _beam_worker(index: int) -> None:
702689
result.branch_path = b.path
703690
beam_branches[index] = b
704691
try:
705-
if self._resource_limits is not None:
706-
from ..process.runner import run_in_process
707-
708-
def _on_scope(sp: Path, _i: int = index) -> None:
709-
beam_task_scopes[_i] = sp
710-
711-
ret = run_in_process(
712-
self._strategies[index], (b.path,),
713-
workspace=b.path,
714-
limits=self._resource_limits,
715-
parent_cgroup=beam_cgroups[index],
716-
scope_callback=_on_scope,
717-
)
718-
else:
719-
ret = self._strategies[index](b.path)
692+
def _on_scope(sp: Path, _i: int = index) -> None:
693+
beam_task_scopes[_i] = sp
694+
695+
ret = run_in_process(
696+
self._strategies[index], (b.path,),
697+
workspace=b.path,
698+
limits=self._resource_limits,
699+
parent_cgroup=beam_cgroups[index],
700+
scope_callback=_on_scope if self._resource_limits else None,
701+
)
720702
result.success, result.score = self._score(
721703
ret, b.path
722704
)
@@ -813,23 +795,18 @@ def _sub_worker(idx: int, _d: int = _depth) -> None:
813795
) as sb:
814796
result.branch_path = sb.path
815797
try:
816-
if self._resource_limits is not None:
817-
from ..process.runner import run_in_process
818-
819-
def _on_sub_scope(
820-
sp: Path, _j: int = idx,
821-
) -> None:
822-
sub_scopes[_j] = sp
823-
824-
ret = run_in_process(
825-
strategy, (sb.path,),
826-
workspace=sb.path,
827-
limits=self._resource_limits,
828-
parent_cgroup=beam_cgroups[beam_idx],
829-
scope_callback=_on_sub_scope,
830-
)
831-
else:
832-
ret = strategy(sb.path)
798+
def _on_sub_scope(
799+
sp: Path, _j: int = idx,
800+
) -> None:
801+
sub_scopes[_j] = sp
802+
803+
ret = run_in_process(
804+
strategy, (sb.path,),
805+
workspace=sb.path,
806+
limits=self._resource_limits,
807+
parent_cgroup=beam_cgroups[beam_idx],
808+
scope_callback=_on_sub_scope if self._resource_limits else None,
809+
)
833810
result.success, result.score = self._score(
834811
ret, sb.path
835812
)
@@ -1038,21 +1015,16 @@ def _run_candidate(index: int) -> None:
10381015
result.branch_path = b.path
10391016
branch_paths[index] = b.path
10401017
try:
1041-
if self._resource_limits is not None:
1042-
from ..process.runner import run_in_process
1043-
1044-
def _on_scope(sp: Path, _i: int = index) -> None:
1045-
branch_scopes[_i] = sp
1046-
1047-
success = run_in_process(
1048-
self._task, (b.path, index),
1049-
workspace=b.path,
1050-
limits=self._resource_limits,
1051-
parent_cgroup=root_cgroup,
1052-
scope_callback=_on_scope,
1053-
)
1054-
else:
1055-
success = self._task(b.path, index)
1018+
def _on_scope(sp: Path, _i: int = index) -> None:
1019+
branch_scopes[_i] = sp
1020+
1021+
success = run_in_process(
1022+
self._task, (b.path, index),
1023+
workspace=b.path,
1024+
limits=self._resource_limits,
1025+
parent_cgroup=root_cgroup,
1026+
scope_callback=_on_scope if self._resource_limits else None,
1027+
)
10561028
result.success = bool(success)
10571029
result.return_value = success
10581030
except Exception as e:

src/branching/agent/speculate.py

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def __init__(
3434
*,
3535
first_wins: bool = True,
3636
max_parallel: int | None = None,
37-
isolate_processes: bool = False,
3837
timeout: float | None = None,
3938
resource_limits: ResourceLimits | None = None,
4039
group_limits: ResourceLimits | None = None,
@@ -46,17 +45,14 @@ def __init__(
4645
first_wins: If True, commit the first successful candidate and
4746
abort siblings. If False, run all and commit the first success.
4847
max_parallel: Maximum parallel workers (default: len(candidates)).
49-
isolate_processes: Run each candidate in a forked process.
5048
timeout: Overall timeout in seconds for all candidates.
51-
resource_limits: Optional per-branch resource limits (implies
52-
process isolation).
49+
resource_limits: Optional per-branch resource limits.
5350
group_limits: Optional resource limits applied to the root
5451
cgroup that contains all branches.
5552
"""
5653
self._candidates = list(candidates)
5754
self._first_wins = first_wins
5855
self._max_parallel = max_parallel or len(self._candidates)
59-
self._isolate_processes = isolate_processes
6056
self._timeout = timeout
6157
self._resource_limits = resource_limits
6258
self._group_limits = group_limits
@@ -117,18 +113,13 @@ def _run_candidate(index: int) -> SpeculationResult:
117113
b.abort()
118114
return result
119115

120-
if self._resource_limits is not None:
121-
def _on_scope(scope_path: Path, _i: int = index) -> None:
122-
branch_scopes[_i] = scope_path
116+
def _on_scope(scope_path: Path, _i: int = index) -> None:
117+
branch_scopes[_i] = scope_path
123118

124-
success = self._run_with_limits(
125-
b.path, index, root_cgroup,
126-
scope_callback=_on_scope,
127-
)
128-
elif self._isolate_processes:
129-
success = self._run_in_process(b.path, index)
130-
else:
131-
success = self._candidates[index](b.path)
119+
success = self._run_with_limits(
120+
b.path, index, root_cgroup,
121+
scope_callback=_on_scope if self._resource_limits else None,
122+
)
132123

133124
result.success = bool(success)
134125
result.return_value = success
@@ -196,28 +187,6 @@ def _on_scope(scope_path: Path, _i: int = index) -> None:
196187
committed=committed,
197188
)
198189

199-
def _run_in_process(self, path: Path, index: int) -> bool:
200-
"""Run a candidate in a forked child process."""
201-
from ..process.context import BranchContext
202-
from ..exceptions import ProcessBranchError
203-
204-
def target(workspace: Path) -> None:
205-
if not self._candidates[index](workspace):
206-
raise ProcessBranchError("Candidate returned failure")
207-
208-
per_candidate = (
209-
self._timeout / len(self._candidates)
210-
if self._timeout is not None
211-
else None
212-
)
213-
214-
with BranchContext(target, workspace=path) as pb:
215-
try:
216-
pb.wait(timeout=per_candidate)
217-
return True
218-
except ProcessBranchError:
219-
return False
220-
221190
def _run_with_limits(
222191
self, path: Path, index: int, parent_cgroup: Path | None = None,
223192
scope_callback=None,

tests/test_fork.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
"""Tests verifying actual fork behavior via run_in_process."""
3+
4+
import os
5+
import subprocess
6+
import tempfile
7+
from pathlib import Path
8+
9+
import pytest
10+
11+
from branching.process.runner import run_in_process
12+
13+
14+
def _can_unshare_userns() -> bool:
15+
"""Check if the system supports unprivileged user namespaces."""
16+
try:
17+
result = subprocess.run(
18+
["unshare", "--user", "--map-root-user", "true"],
19+
capture_output=True, timeout=5,
20+
)
21+
return result.returncode == 0
22+
except (FileNotFoundError, subprocess.TimeoutExpired):
23+
return False
24+
25+
26+
needs_userns = pytest.mark.skipif(
27+
not _can_unshare_userns(),
28+
reason="Kernel does not support unprivileged user namespaces",
29+
)
30+
31+
32+
@needs_userns
33+
def test_fork_runs_in_child_process():
34+
"""Task runs in a child process with a different PID."""
35+
parent_pid = os.getpid()
36+
37+
def task(workspace):
38+
pid_file = workspace / "child_pid"
39+
pid_file.write_text(str(os.getpid()))
40+
return True
41+
42+
with tempfile.TemporaryDirectory() as ws:
43+
ws_path = Path(ws)
44+
run_in_process(task, (ws_path,), workspace=ws_path)
45+
child_pid = int((ws_path / "child_pid").read_text())
46+
assert child_pid != parent_pid
47+
48+
49+
@needs_userns
50+
def test_fork_inherits_parent_memory():
51+
"""Forked child inherits parent's global state via COW."""
52+
import test_fork as _self_module
53+
54+
_self_module._inherited_value = "hello from parent"
55+
56+
def task(workspace):
57+
import test_fork as mod
58+
val = getattr(mod, "_inherited_value", None)
59+
(workspace / "inherited").write_text(str(val))
60+
return True
61+
62+
with tempfile.TemporaryDirectory() as ws:
63+
ws_path = Path(ws)
64+
run_in_process(task, (ws_path,), workspace=ws_path)
65+
result = (ws_path / "inherited").read_text()
66+
assert result == "hello from parent"
67+
68+
# Clean up
69+
del _self_module._inherited_value
70+
71+
72+
@needs_userns
73+
def test_fork_without_resource_limits():
74+
"""run_in_process works with limits=None (fork without cgroup)."""
75+
def task(workspace):
76+
return 42
77+
78+
with tempfile.TemporaryDirectory() as ws:
79+
ws_path = Path(ws)
80+
result = run_in_process(task, (ws_path,), workspace=ws_path, limits=None)
81+
assert result == 42

0 commit comments

Comments
 (0)