-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhybrid_mwc.py
More file actions
143 lines (121 loc) · 5.58 KB
/
hybrid_mwc.py
File metadata and controls
143 lines (121 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Hybrid Minimum Weight Cycle (MWC) framework
------------------------------------------------
This module integrates pluggable SSSP strategies (Dijkstra, BMSSP-lite, …)
with pluggable LCA back-ends (binary lifting, Euler tour). It provides a
common driver to experiment with algorithmic variants.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Dict, Tuple, Any, Iterable, Set
import networkx as nx
# ------------------------------------------------------------------
# SSSP strategy interface + baseline implementations
# ------------------------------------------------------------------
class SSSPStrategy(ABC):
@abstractmethod
def run(self, G: nx.Graph, source: int | Set[int], bound: float):
"""Return (distances, predecessors, depth) dictionaries."""
from shortest_cycle import dijkstra_base # re-use proven code
class DijkstraStrategy(SSSPStrategy):
def __init__(self, use_heapdict: bool = False):
self.use_heapdict = use_heapdict
def run(self, G, source, bound): # type: ignore[override]
distances, preds, depth, _ = dijkstra_base(
G, next(iter(source)) if isinstance(source, set) else source,
bound * 2,
use_heapdict=self.use_heapdict,
)
return distances, preds, depth
# Lazy import to avoid circular deps when bmssp_lite imports this file
try:
from bmssp_lite import BMSSPLiteStrategy # noqa: F401
from bmssp_full import BMSSPFullStrategy # noqa: F401
except ModuleNotFoundError:
BMSSPLiteStrategy = None # type: ignore
BMSSPFullStrategy = None # type: ignore
# ------------------------------------------------------------------
# LCA back-end selection
# ------------------------------------------------------------------
from shortest_cycle import LCATree # binary lifting
try:
from lca_euler import EulerLCA
except ModuleNotFoundError:
EulerLCA = None # type: ignore
class LCAStrategy(ABC):
@abstractmethod
def build(self, parents: Dict[int, int], depth: Dict[int, int], nodes: list[int]):
pass
class BinaryLCAStrategy(LCAStrategy):
def build(self, parents, depth, nodes): # type: ignore[override]
return LCATree(parents, depth, nodes)
class EulerLCAStrategy(LCAStrategy):
def build(self, parents, depth, nodes): # type: ignore[override]
if EulerLCA is None:
raise ImportError("lca_euler module not found")
return EulerLCA(parents, depth, nodes)
# ------------------------------------------------------------------
# Hybrid driver
# ------------------------------------------------------------------
class HybridMWC:
def __init__(
self,
G: nx.Graph,
*,
sssp: SSSPStrategy | None = None,
lca: LCAStrategy | None = None,
):
if G.is_directed():
raise ValueError("Graph must be undirected")
self.G = G
self.sssp = sssp or DijkstraStrategy()
self.lca_backend = lca or BinaryLCAStrategy()
self.gamma = float("inf")
def _reconstruct_cycle(self, u: int, v: int, p: int, preds: Dict[int,int]):
"""Return set of edges for cycle u→…→p→…→v→u."""
path_u=[]; curr=u
while curr!=p:
parent=preds.get(curr)
if parent is None: break
path_u.append((curr,parent)); curr=parent
path_v=[]; curr=v
while curr!=p:
parent=preds.get(curr)
if parent is None: break
path_v.append((parent,curr)); curr=parent
edges=set(tuple(sorted(e)) for e in path_u+path_v+[(u,v)])
return edges
def minimum_weight_cycle(self, *, bound: float | None = None, sources: set[int] | None = None, return_edges: bool = False):
if len(self.G) <= 1 or self.G.number_of_edges() == 0:
return None
nodes = list(self.G.nodes())
min_edge_w = min(w for *_, w in self.G.edges(data="weight", default=1.0))
best_edges = None
iter_nodes = sources if sources else nodes
for s in iter_nodes:
limit = bound/2 if bound is not None else self.gamma/2
dist, preds, depth = self.sssp.run(self.G, s, limit)
lca_struct = self.lca_backend.build(preds, depth, nodes)
for u, v, attr in self.G.edges(data=True):
if dist[u] == float("inf") or dist[v] == float("inf"):
continue
p = lca_struct.lca(u, v)
if p and p != u and p != v:
cycle_len = dist[u] + dist[v] + attr.get("weight", 1.0) - 2 * dist[p]
if cycle_len < self.gamma:
self.gamma = cycle_len
if return_edges:
best_edges = self._reconstruct_cycle(u, v, p, preds)
if self.gamma <= 4 * min_edge_w:
break
if self.gamma == float("inf"):
return None
return (best_edges, self.gamma) if return_edges else self.gamma
def hybrid_mwc_length(G: nx.Graph, *, use_bmssp_lite: bool = False, use_euler_lca: bool = False, bound: float | None = None, source_seeds: set[int] | None = None, return_edges: bool=False):
sssp: SSSPStrategy | None = None
if use_bmssp_lite:
from bmssp_lite import BMSSPLiteStrategy # local import
sssp = BMSSPLiteStrategy()
elif not use_bmssp_lite and BMSSPFullStrategy is not None:
sssp = BMSSPFullStrategy()
lca_strat = EulerLCAStrategy() if use_euler_lca else BinaryLCAStrategy()
return HybridMWC(G, sssp=sssp, lca=lca_strat).minimum_weight_cycle(bound=bound, sources=source_seeds, return_edges=return_edges)