|
16 | 16 | #include <algorithm> |
17 | 17 | #include <cmath> |
18 | 18 | #include <cstdint> |
| 19 | +#include <cstdlib> |
| 20 | +#include <future> |
19 | 21 | #include <limits> |
20 | 22 | #include <memory> |
21 | 23 | #include <optional> |
22 | 24 | #include <queue> |
| 25 | +#include <thread> |
23 | 26 | #include <unordered_map> |
24 | 27 | #include <utility> |
25 | 28 | #include <vector> |
26 | 29 |
|
27 | 30 | namespace netgraph::core { |
28 | 31 |
|
| 32 | +namespace { |
| 33 | + |
| 34 | +std::size_t sensitivity_thread_budget(std::size_t candidate_count) { |
| 35 | + if (candidate_count <= 1) { |
| 36 | + return 1; |
| 37 | + } |
| 38 | + |
| 39 | + const char* env = std::getenv("NGRAPH_CORE_SENSITIVITY_THREADS"); |
| 40 | + if (env != nullptr && env[0] != '\0') { |
| 41 | + char* end = nullptr; |
| 42 | + unsigned long parsed = std::strtoul(env, &end, 10); |
| 43 | + if (end != env) { |
| 44 | + if (parsed == 0ul) { |
| 45 | + return 1; |
| 46 | + } |
| 47 | + return std::min<std::size_t>(static_cast<std::size_t>(parsed), candidate_count); |
| 48 | + } |
| 49 | + } |
| 50 | + |
| 51 | + const auto hw = std::thread::hardware_concurrency(); |
| 52 | + const auto budget = hw > 0 ? static_cast<std::size_t>(hw) : 1u; |
| 53 | + return std::min<std::size_t>(budget, candidate_count); |
| 54 | +} |
| 55 | + |
| 56 | +std::optional<std::pair<EdgeId, Flow>> |
| 57 | +evaluate_sensitivity_candidate(const StrictMultiDiGraph& g, |
| 58 | + NodeId src, |
| 59 | + NodeId dst, |
| 60 | + FlowPlacement placement, |
| 61 | + bool shortest_path, |
| 62 | + bool require_capacity, |
| 63 | + Flow baseline_flow, |
| 64 | + std::span<const bool> node_mask, |
| 65 | + EdgeId eid, |
| 66 | + bool* local_mask, |
| 67 | + std::size_t mask_size) { |
| 68 | + local_mask[static_cast<std::size_t>(eid)] = false; |
| 69 | + |
| 70 | + auto [new_flow, _] = calc_max_flow( |
| 71 | + g, src, dst, placement, |
| 72 | + shortest_path, |
| 73 | + require_capacity, |
| 74 | + /*with_edge_flows=*/false, |
| 75 | + /*with_reachable=*/false, |
| 76 | + /*with_residuals=*/false, |
| 77 | + node_mask, std::span<const bool>(local_mask, mask_size)); |
| 78 | + |
| 79 | + local_mask[static_cast<std::size_t>(eid)] = true; |
| 80 | + |
| 81 | + double delta = baseline_flow - new_flow; |
| 82 | + if (delta > kMinFlow) { |
| 83 | + return std::pair<EdgeId, Flow>{eid, delta}; |
| 84 | + } |
| 85 | + return std::nullopt; |
| 86 | +} |
| 87 | + |
| 88 | +} // namespace |
| 89 | + |
29 | 90 | std::pair<Flow, FlowSummary> |
30 | 91 | calc_max_flow(const StrictMultiDiGraph& g, NodeId src, NodeId dst, |
31 | 92 | FlowPlacement placement, bool shortest_path, |
@@ -247,33 +308,58 @@ sensitivity_analysis(const StrictMultiDiGraph& g, NodeId src, NodeId dst, |
247 | 308 | } else { |
248 | 309 | std::copy(edge_mask.begin(), edge_mask.end(), test_mask_buf.get()); |
249 | 310 | } |
250 | | - // View for passing to calc_max_flow |
251 | | - std::span<const bool> test_mask_span(test_mask_buf.get(), N); |
252 | | - |
253 | 311 | std::vector<std::pair<EdgeId, Flow>> results; |
254 | 312 | results.reserve(candidates.size()); |
255 | 313 |
|
256 | 314 | // Step 2: Iterate candidates, testing flow reduction when each is removed |
257 | | - for (EdgeId eid : candidates) { |
258 | | - // Mask out the edge |
259 | | - test_mask_buf[eid] = false; |
260 | | - |
261 | | - auto [new_flow, _] = calc_max_flow( |
262 | | - g, src, dst, placement, |
263 | | - shortest_path, |
264 | | - require_capacity, |
265 | | - /*with_edge_flows=*/false, |
266 | | - /*with_reachable=*/false, |
267 | | - /*with_residuals=*/false, |
268 | | - node_mask, test_mask_span); |
269 | | - |
270 | | - double delta = baseline_flow - new_flow; |
271 | | - if (delta > kMinFlow) { |
272 | | - results.emplace_back(eid, delta); |
| 315 | + const auto thread_budget = sensitivity_thread_budget(candidates.size()); |
| 316 | + if (thread_budget <= 1) { |
| 317 | + for (EdgeId eid : candidates) { |
| 318 | + auto maybe_result = evaluate_sensitivity_candidate( |
| 319 | + g, src, dst, placement, |
| 320 | + shortest_path, require_capacity, |
| 321 | + baseline_flow, node_mask, |
| 322 | + eid, test_mask_buf.get(), N); |
| 323 | + if (maybe_result.has_value()) { |
| 324 | + results.push_back(*maybe_result); |
| 325 | + } |
273 | 326 | } |
| 327 | + return results; |
| 328 | + } |
| 329 | + |
| 330 | + const auto chunk_size = (candidates.size() + thread_budget - 1) / thread_budget; |
| 331 | + std::vector<std::future<std::vector<std::pair<EdgeId, Flow>>>> futures; |
| 332 | + futures.reserve(thread_budget); |
| 333 | + |
| 334 | + for (std::size_t begin = 0; begin < candidates.size(); begin += chunk_size) { |
| 335 | + const std::size_t end = std::min<std::size_t>(begin + chunk_size, candidates.size()); |
| 336 | + futures.emplace_back(std::async(std::launch::async, [&, begin, end]() { |
| 337 | + std::unique_ptr<bool[]> local_mask_buf(new bool[N]); |
| 338 | + if (!edge_mask.empty()) { |
| 339 | + std::copy(edge_mask.begin(), edge_mask.end(), local_mask_buf.get()); |
| 340 | + } else { |
| 341 | + std::fill(local_mask_buf.get(), local_mask_buf.get() + N, true); |
| 342 | + } |
| 343 | + |
| 344 | + std::vector<std::pair<EdgeId, Flow>> local_results; |
| 345 | + local_results.reserve(end - begin); |
| 346 | + for (std::size_t idx = begin; idx < end; ++idx) { |
| 347 | + auto maybe_result = evaluate_sensitivity_candidate( |
| 348 | + g, src, dst, placement, |
| 349 | + shortest_path, require_capacity, |
| 350 | + baseline_flow, node_mask, |
| 351 | + candidates[idx], local_mask_buf.get(), N); |
| 352 | + if (maybe_result.has_value()) { |
| 353 | + local_results.push_back(*maybe_result); |
| 354 | + } |
| 355 | + } |
| 356 | + return local_results; |
| 357 | + })); |
| 358 | + } |
274 | 359 |
|
275 | | - // Restore mask |
276 | | - test_mask_buf[eid] = true; |
| 360 | + for (auto& future : futures) { |
| 361 | + auto local_results = future.get(); |
| 362 | + results.insert(results.end(), local_results.begin(), local_results.end()); |
277 | 363 | } |
278 | 364 |
|
279 | 365 | return results; |
|
0 commit comments