Skip to content

Commit 08fde10

Browse files
committed
Add same-host DDS verification harness
1 parent c19d60b commit 08fde10

6 files changed

Lines changed: 419 additions & 1 deletion

File tree

.gitattributes

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
* text=auto eol=lf
2+
3+
*.png binary
4+
*.jpg binary
5+
*.jpeg binary
6+
*.gif binary
7+
*.pdf binary
8+
*.so binary
9+
*.dll binary
10+
*.exe binary

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ bench/
77
experiment/
88
**/results
99
**.pyc
10-
**/_pychache__
10+
**/_pychache__
11+
.artifacts/

tests/synchronization/graph_synchronization.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,57 @@ TEST_CASE("Connect and receive the graph from other agent", "[SYNCHRONIZATION][G
4444

4545
}
4646

47+
TEST_CASE("Same-process agents discover each other and exchange updates", "[SYNCHRONIZATION][GRAPH][REGRESSION][DDS]")
48+
{
49+
const auto same_host = GENERATE(true, false);
50+
auto ctx = make_edge_config_file();
51+
auto id1 = static_cast<uint32_t>(rand() % 1000 + 1000);
52+
auto id2 = id1 + 1;
53+
54+
DSRGraph loader(random_string(10), id1, ctx, same_host);
55+
DSRGraph follower(random_string(11), id2, std::string{}, same_host);
56+
57+
auto wait_until = [](auto&& predicate, std::chrono::milliseconds timeout = 2000ms)
58+
{
59+
const auto deadline = std::chrono::steady_clock::now() + timeout;
60+
while (std::chrono::steady_clock::now() < deadline)
61+
{
62+
if (predicate())
63+
return true;
64+
std::this_thread::sleep_for(50ms);
65+
}
66+
return predicate();
67+
};
68+
69+
REQUIRE(wait_until([&] { return follower.size() == loader.size(); }));
70+
REQUIRE(wait_until([&] { return !loader.get_connected_agents().empty(); }));
71+
REQUIRE(wait_until([&] { return !follower.get_connected_agents().empty(); }));
72+
73+
auto root_loader = loader.get_node("root");
74+
REQUIRE(root_loader.has_value());
75+
root_loader->attrs()["same_process_loader_" + std::to_string(same_host)] =
76+
Attribute(std::string("loader"), get_unix_timestamp(), loader.get_agent_id());
77+
REQUIRE(loader.update_node(root_loader.value()));
78+
79+
REQUIRE(wait_until([&] {
80+
auto root_follower = follower.get_node("root");
81+
return root_follower.has_value() &&
82+
root_follower->attrs().contains("same_process_loader_" + std::to_string(same_host));
83+
}));
84+
85+
auto root_follower = follower.get_node("root");
86+
REQUIRE(root_follower.has_value());
87+
root_follower->attrs()["same_process_follower_" + std::to_string(same_host)] =
88+
Attribute(std::string("follower"), get_unix_timestamp(), follower.get_agent_id());
89+
REQUIRE(follower.update_node(root_follower.value()));
90+
91+
REQUIRE(wait_until([&] {
92+
auto updated_root_loader = loader.get_node("root");
93+
return updated_root_loader.has_value() &&
94+
updated_root_loader->attrs().contains("same_process_follower_" + std::to_string(same_host));
95+
}));
96+
}
97+
4798
TEST_CASE("Full graph join does not leave empty node registers after local deletion", "[SYNCHRONIZATION][GRAPH][REGRESSION]")
4899
{
49100
auto ctx = make_empty_config_file();
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import json
4+
import os
5+
import sys
6+
import time
7+
from pathlib import Path
8+
9+
10+
def parse_args() -> argparse.Namespace:
11+
parser = argparse.ArgumentParser(description="Run one DSR agent worker process")
12+
parser.add_argument("--agent-name", required=True)
13+
parser.add_argument("--agent-id", required=True, type=int)
14+
parser.add_argument("--domain-id", required=True, type=int)
15+
parser.add_argument("--same-host", required=True, choices=("true", "false"))
16+
parser.add_argument("--graph-file", default="")
17+
parser.add_argument("--artifacts-dir", required=True)
18+
parser.add_argument("--local-attr", required=True)
19+
parser.add_argument("--local-value", required=True)
20+
parser.add_argument("--remote-attr", required=True)
21+
parser.add_argument("--remote-value", required=True)
22+
parser.add_argument("--startup-delay", default=0.0, type=float)
23+
parser.add_argument("--sync-timeout", default=30.0, type=float)
24+
parser.add_argument("--hold-seconds", default=0.0, type=float)
25+
return parser.parse_args()
26+
27+
28+
def wait_for(predicate, timeout_s: float, interval_s: float = 0.1, error: str = "timeout"):
29+
deadline = time.monotonic() + timeout_s
30+
while time.monotonic() < deadline:
31+
value = predicate()
32+
if value:
33+
return value
34+
time.sleep(interval_s)
35+
raise TimeoutError(error)
36+
37+
38+
def read_root_attr(graph, attr_name: str):
39+
root = graph.get_node("root")
40+
if root is None:
41+
return None
42+
if attr_name not in root.attrs:
43+
return None
44+
return root.attrs[attr_name].value
45+
46+
47+
def main() -> int:
48+
args = parse_args()
49+
artifacts_dir = Path(args.artifacts_dir)
50+
artifacts_dir.mkdir(parents=True, exist_ok=True)
51+
result_path = artifacts_dir / f"{args.agent_name}.json"
52+
53+
build_python_wrapper = Path(__file__).resolve().parents[2] / "build" / "python-wrapper"
54+
sys.path.insert(0, str(build_python_wrapper))
55+
56+
import pydsr
57+
58+
time.sleep(args.startup_delay)
59+
60+
graph = pydsr.DSRGraph(
61+
0,
62+
args.agent_name,
63+
args.agent_id,
64+
args.graph_file,
65+
args.same_host == "true",
66+
args.domain_id,
67+
)
68+
69+
result = {
70+
"agent_name": args.agent_name,
71+
"agent_id": args.agent_id,
72+
"domain_id": args.domain_id,
73+
"same_host": args.same_host == "true",
74+
"graph_file_loaded": bool(args.graph_file),
75+
}
76+
77+
try:
78+
initial_nodes = wait_for(
79+
lambda: len(graph.get_nodes()) if graph.get_node("root") is not None else 0,
80+
timeout_s=args.sync_timeout,
81+
error="graph root never became available",
82+
)
83+
result["initial_node_count"] = initial_nodes
84+
85+
root = wait_for(
86+
lambda: graph.get_node("root"),
87+
timeout_s=args.sync_timeout,
88+
error="root node not available",
89+
)
90+
root.attrs[args.local_attr] = pydsr.Attribute(args.local_value)
91+
update_ok = graph.update_node(root)
92+
if not update_ok:
93+
raise RuntimeError(f"failed to update root with {args.local_attr}")
94+
95+
observed_remote = wait_for(
96+
lambda: read_root_attr(graph, args.remote_attr),
97+
timeout_s=args.sync_timeout,
98+
error=f"remote attribute {args.remote_attr} not observed",
99+
)
100+
if observed_remote != args.remote_value:
101+
raise RuntimeError(
102+
f"unexpected value for {args.remote_attr}: {observed_remote!r} != {args.remote_value!r}"
103+
)
104+
105+
final_root = graph.get_node("root")
106+
result["final_node_count"] = len(graph.get_nodes())
107+
result["local_attr_value"] = final_root.attrs[args.local_attr].value
108+
result["remote_attr_value"] = final_root.attrs[args.remote_attr].value
109+
if args.hold_seconds > 0:
110+
time.sleep(args.hold_seconds)
111+
result["status"] = "ok"
112+
except Exception as exc:
113+
result["status"] = "error"
114+
result["error"] = str(exc)
115+
finally:
116+
result_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
117+
118+
return 0 if result["status"] == "ok" else 1
119+
120+
121+
if __name__ == "__main__":
122+
raise SystemExit(main())
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
4+
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
5+
ARTIFACT_ROOT="${ROOT_DIR}/.artifacts/same_host_smoke"
6+
GRAPH_FILE="${ROOT_DIR}/python-wrapper/etc/autonomyLab_objects.simscene.json"
7+
WORKER="${ROOT_DIR}/tools/same_host_smoke/agent_worker.py"
8+
9+
export PYTHONPATH="${ROOT_DIR}/build/python-wrapper${PYTHONPATH:+:${PYTHONPATH}}"
10+
export LD_LIBRARY_PATH="${ROOT_DIR}/build/api:${ROOT_DIR}/build/core${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}"
11+
12+
mkdir -p "${ARTIFACT_ROOT}"
13+
14+
run_case() {
15+
local same_host="$1"
16+
local domain_id="$2"
17+
local case_dir="${ARTIFACT_ROOT}/same_host_${same_host}"
18+
19+
rm -rf "${case_dir}"
20+
mkdir -p "${case_dir}"
21+
22+
python3 "${WORKER}" \
23+
--agent-name "same_host_${same_host}_loader" \
24+
--agent-id $((domain_id * 10 + 1)) \
25+
--domain-id "${domain_id}" \
26+
--same-host "${same_host}" \
27+
--graph-file "${GRAPH_FILE}" \
28+
--artifacts-dir "${case_dir}" \
29+
--local-attr "sync_from_loader_${same_host}" \
30+
--local-value "loader_${same_host}" \
31+
--remote-attr "sync_from_follower_${same_host}" \
32+
--remote-value "follower_${same_host}" \
33+
> "${case_dir}/loader.log" 2>&1 &
34+
local pid_a=$!
35+
36+
python3 "${WORKER}" \
37+
--agent-name "same_host_${same_host}_follower" \
38+
--agent-id $((domain_id * 10 + 2)) \
39+
--domain-id "${domain_id}" \
40+
--same-host "${same_host}" \
41+
--artifacts-dir "${case_dir}" \
42+
--local-attr "sync_from_follower_${same_host}" \
43+
--local-value "follower_${same_host}" \
44+
--remote-attr "sync_from_loader_${same_host}" \
45+
--remote-value "loader_${same_host}" \
46+
--startup-delay 1.0 \
47+
> "${case_dir}/follower.log" 2>&1 &
48+
local pid_b=$!
49+
50+
local rc=0
51+
wait "${pid_a}" || rc=1
52+
wait "${pid_b}" || rc=1
53+
54+
if [[ "${rc}" -ne 0 ]]; then
55+
echo "Scenario same_host=${same_host} failed. See ${case_dir}" >&2
56+
return "${rc}"
57+
fi
58+
59+
python3 - "${case_dir}" "${same_host}" <<'PY'
60+
import json
61+
import sys
62+
from pathlib import Path
63+
64+
case_dir = Path(sys.argv[1])
65+
same_host = sys.argv[2]
66+
loader = json.loads((case_dir / f"same_host_{same_host}_loader.json").read_text(encoding="utf-8"))
67+
follower = json.loads((case_dir / f"same_host_{same_host}_follower.json").read_text(encoding="utf-8"))
68+
69+
for result in (loader, follower):
70+
if result["status"] != "ok":
71+
raise SystemExit(f"{result['agent_name']} failed: {result.get('error', 'unknown error')}")
72+
73+
if follower["initial_node_count"] <= 0:
74+
raise SystemExit("Follower did not receive the initial graph")
75+
76+
if loader["remote_attr_value"] != f"follower_{same_host}":
77+
raise SystemExit("Loader did not observe follower mutation")
78+
79+
if follower["remote_attr_value"] != f"loader_{same_host}":
80+
raise SystemExit("Follower did not observe loader mutation")
81+
82+
print(f"same_host={same_host}: PASS")
83+
PY
84+
}
85+
86+
run_case true 41
87+
run_case false 42
88+
89+
echo "Artifacts written to ${ARTIFACT_ROOT}"

0 commit comments

Comments
 (0)