@@ -1093,11 +1093,20 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
10931093 };
10941094
10951095 std::optional<std::unordered_set<std::pair<uint64_t , std::string>,hash_tuple>> cache_map_to_edges = {};
1096+ // Snapshot the data needed for signal emission while the lock is held.
1097+ // nodes.at(id) must NOT be accessed after the lock is released: a concurrent
1098+ // insert_node_/update_node call on the same id runs nodes[id].write() which
1099+ // calls dk.rmv() (clears dk.ds) followed by dk.add(), leaving a window where
1100+ // read_reg()'s assert(dk.ds.size() >= 1) would fire.
1101+ std::string node_type_snapshot;
1102+ std::vector<std::pair<uint64_t , std::string>> from_edges_snapshot;
10961103 {
10971104 std::unique_lock<std::shared_mutex> lock (_mutex);
10981105 if (!deleted.contains (id)) {
10991106 joined = true ;
1100- maybe_deleted_node = (nodes[id].empty ()) ? std::nullopt : std::make_optional (nodes.at (id).read_reg ());
1107+ if (auto it = nodes.find (id); it != nodes.end () && !it->second .empty ()) {
1108+ maybe_deleted_node = it->second .read_reg ();
1109+ }
11011110 nodes[id].join (std::move (crdt_delta));
11021111 if (nodes.at (id).empty () or d_empty) {
11031112 nodes.erase (id);
@@ -1106,8 +1115,14 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
11061115 delete_unprocessed_deltas ();
11071116 } else {
11081117 signal = true ;
1109- update_maps_node_insert (id, nodes.at (id).read_reg ());
1118+ const auto & reg = nodes.at (id).read_reg ();
1119+ update_maps_node_insert (id, reg);
11101120 consume_unprocessed_deltas ();
1121+ // Snapshot type and outgoing edges before the lock is released.
1122+ node_type_snapshot = reg.type ();
1123+ for (const auto &[k, v] : reg.fano ()) {
1124+ from_edges_snapshot.emplace_back (k.first , k.second );
1125+ }
11111126 }
11121127 } else {
11131128 delete_unprocessed_deltas ();
@@ -1116,11 +1131,11 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
11161131
11171132 if (joined) {
11181133 if (signal) {
1119- DSR_LOG_DEBUG (" [JOIN_NODE] node inserted/updated:" , id, nodes. at (id). read_reg (). type () );
1120- emitter.update_node_signal (id, nodes. at (id). read_reg (). type () , SignalInfo{ mvreg.agent_id () });
1121- for (const auto &[k, v ] : nodes. at (id). read_reg (). fano () ) {
1122- DSR_LOG_DEBUG (" [JOIN_NODE] add edge FROM:" , id, k. first , k. second );
1123- emitter.update_edge_signal (id, k. first , k. second , SignalInfo{ mvreg.agent_id () });
1134+ DSR_LOG_DEBUG (" [JOIN_NODE] node inserted/updated:" , id, node_type_snapshot );
1135+ emitter.update_node_signal (id, node_type_snapshot , SignalInfo{ mvreg.agent_id () });
1136+ for (const auto &[to_id, edge_type ] : from_edges_snapshot ) {
1137+ DSR_LOG_DEBUG (" [JOIN_NODE] add edge FROM:" , id, to_id, edge_type );
1138+ emitter.update_edge_signal (id, to_id, edge_type , SignalInfo{ mvreg.agent_id () });
11241139 }
11251140
11261141 for (const auto &[k, v]: map_new_to_edges)
@@ -1443,7 +1458,10 @@ std::optional<std::string> DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m
14431458
14441459void DSRGraph::join_full_graph (IDL::OrMap &&full_graph)
14451460{
1446- std::vector<std::tuple<bool , uint64_t , std::string, std::optional<CRDTNode>>> updates;
1461+ // 5th element: post-join node snapshot captured inside the lock, used for
1462+ // signal emission after the lock is released to avoid racing with
1463+ // insert_node_/update_node (same pattern as join_delta_node).
1464+ std::vector<std::tuple<bool , uint64_t , std::string, std::optional<CRDTNode>, std::optional<CRDTNode>>> updates;
14471465
14481466 uint64_t id{0 }, timestamp{0 };
14491467 uint32_t agent_id_ch{0 };
@@ -1541,24 +1559,26 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph)
15411559 it->second .join (std::move (mv));
15421560 if (mv_empty or it->second .empty ()) {
15431561 update_maps_node_delete (k, nd);
1544- updates.emplace_back (false , k, " " , std::nullopt );
1562+ updates.emplace_back (false , k, " " , std::nullopt , std:: nullopt );
15451563 delete_unprocessed_deltas ();
15461564 } else {
1547- update_maps_node_insert (k, it->second .read_reg ());
1548- updates.emplace_back (true , k, it->second .read_reg ().type (), nd);
1565+ const auto & reg = it->second .read_reg ();
1566+ update_maps_node_insert (k, reg);
1567+ updates.emplace_back (true , k, reg.type (), nd, reg);
15491568 consume_unprocessed_deltas ();
15501569 }
15511570 }
15521571 }
15531572
15541573 }
1555- for (auto &[signal, id, type, nd] : updates)
1574+ for (auto &[signal, id, type, nd, current_nd ] : updates)
15561575 if (signal) {
1557- // check what change is joined
1558- if (!nd.has_value () || nd->attrs () != nodes[id].read_reg ().attrs ()) {
1559- emitter.update_node_signal (id, nodes[id].read_reg ().type (), SignalInfo{ agent_id_ch });
1560- } else if (nd.value () != nodes[id].read_reg ()) {
1561- auto iter = nodes[id].read_reg ().fano ();
1576+ // check what change is joined — use the snapshot captured inside the lock,
1577+ // not nodes[id], which races with concurrent insert_node_/update_node calls.
1578+ if (!nd.has_value () || nd->attrs () != current_nd->attrs ()) {
1579+ emitter.update_node_signal (id, type, SignalInfo{ agent_id_ch });
1580+ } else if (nd.value () != *current_nd) {
1581+ const auto & iter = current_nd->fano ();
15621582 for (const auto &[k, v] : nd->fano ()) {
15631583 if (!iter.contains (k)) {
15641584 emitter.del_edge_signal (id, k.first , k.second , SignalInfo{ agent_id_ch });
@@ -1908,8 +1928,8 @@ void DSRGraph::fullgraph_server_thread()
19081928
19091929std::pair<bool , bool > DSRGraph::fullgraph_request_thread ()
19101930{
1911- bool sync = false ;
1912- bool repeated = false ;
1931+ std::atomic< bool > sync{ false } ;
1932+ std::atomic< bool > repeated{ false } ;
19131933 auto lambda_request_answer = [&](eprosima::fastdds::dds::DataReader *reader, DSR::DSRGraph *graph)
19141934 {
19151935 while (true )
0 commit comments