Skip to content

Commit 29f0cd3

Browse files
committed
demand placement finished
1 parent 999285c commit 29f0cd3

5 files changed

Lines changed: 905 additions & 488 deletions

File tree

ngraph/lib/flow_policy.py

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import copy
34
from collections import deque
45
from enum import IntEnum
56
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
@@ -12,7 +13,9 @@
1213

1314

1415
class FlowPolicyConfig(IntEnum):
15-
"""Enumerates supported flow policy configurations."""
16+
"""
17+
Enumerates supported flow policy configurations.
18+
"""
1619

1720
SHORTEST_PATHS_ECMP = 1
1821
SHORTEST_PATHS_UCMP = 2
@@ -25,9 +28,9 @@ class FlowPolicy:
2528
"""
2629
Manages the placement and management of flows (demands) on a network graph.
2730
28-
A FlowPolicy converts a demand into one or more Flow objects subject to capacity
29-
constraints and user-specified configurations such as path selection algorithms
30-
and flow placement methods.
31+
A FlowPolicy converts a demand into one or more Flow objects subject to
32+
capacity constraints and user-specified configurations such as path
33+
selection algorithms and flow placement methods.
3134
"""
3235

3336
def __init__(
@@ -61,15 +64,16 @@ def __init__(
6164
min_flow_count: Minimum number of flows to create for a demand.
6265
max_flow_count: Maximum number of flows allowable for a demand.
6366
max_path_cost: Absolute cost limit for allowable paths.
64-
max_path_cost_factor: Relative cost factor limit (multiplied by the best path cost).
67+
max_path_cost_factor: Relative cost factor limit (multiplying the best path cost).
6568
static_paths: Predefined paths to force flows onto, if provided.
6669
edge_select_func: Custom function for edge selection, if needed.
6770
edge_select_value: Additional parameter for certain edge selection strategies.
68-
reoptimize_flows_on_each_placement: If True, re-run path optimization on every placement.
71+
reoptimize_flows_on_each_placement: If True, re-run path optimization after every placement.
6972
7073
Raises:
71-
ValueError: If static_paths length does not match max_flow_count, or if
72-
EQUAL_BALANCED placement is used without a specified max_flow_count.
74+
ValueError: If static_paths length does not match max_flow_count,
75+
or if EQUAL_BALANCED placement is used without a
76+
specified max_flow_count.
7377
"""
7478
self.path_alg: base.PathAlg = path_alg
7579
self.flow_placement: FlowPlacement = flow_placement
@@ -89,7 +93,7 @@ def __init__(
8993
# Dictionary to track all flows by their FlowIndex.
9094
self.flows: Dict[Tuple, Flow] = {}
9195

92-
# Track the best path cost found to enforce maximum cost constraints.
96+
# Track the best path cost found to enforce maximum path cost constraints.
9397
self.best_path_cost: Optional[base.Cost] = None
9498

9599
# Internal flow ID counter.
@@ -108,14 +112,27 @@ def __init__(
108112
):
109113
raise ValueError("max_flow_count must be set for EQUAL_BALANCED placement.")
110114

115+
def deep_copy(self) -> FlowPolicy:
116+
"""
117+
Creates and returns a deep copy of this FlowPolicy, including all flows.
118+
119+
Returns:
120+
A new FlowPolicy object that is a deep copy of the current instance.
121+
"""
122+
return copy.deepcopy(self)
123+
111124
@property
112125
def flow_count(self) -> int:
113-
"""Returns the number of flows currently tracked by the policy."""
126+
"""
127+
Returns the number of flows currently tracked by the policy.
128+
"""
114129
return len(self.flows)
115130

116131
@property
117132
def placed_demand(self) -> float:
118-
"""Returns the sum of all placed flow volumes across flows."""
133+
"""
134+
Returns the sum of all placed flow volumes across flows.
135+
"""
119136
return sum(flow.placed_flow for flow in self.flows.values())
120137

121138
def _get_next_flow_id(self) -> int:
@@ -160,7 +177,8 @@ def _get_path_bundle(
160177
excluded_nodes: Optional[Set[NodeID]] = None,
161178
) -> Optional[PathBundle]:
162179
"""
163-
Finds a path or set of paths from src_node to dst_node, optionally excluding certain edges or nodes.
180+
Finds a path or set of paths from src_node to dst_node, optionally excluding
181+
certain edges or nodes.
164182
165183
Args:
166184
flow_graph: The network graph.
@@ -171,7 +189,8 @@ def _get_path_bundle(
171189
excluded_nodes: Set of nodes to exclude.
172190
173191
Returns:
174-
A valid PathBundle if one is found and it satisfies cost constraints; otherwise, None.
192+
A valid PathBundle if one is found and it satisfies cost constraints;
193+
otherwise, None.
175194
176195
Raises:
177196
ValueError: If the selected path algorithm is not supported.
@@ -200,7 +219,8 @@ def _get_path_bundle(
200219

201220
if dst_node in pred:
202221
dst_cost = cost[dst_node]
203-
if self.best_path_cost is None:
222+
# Update best_path_cost if we found a cheaper path.
223+
if self.best_path_cost is None or dst_cost < self.best_path_cost:
204224
self.best_path_cost = dst_cost
205225

206226
# Enforce maximum path cost constraints, if specified.
@@ -337,8 +357,8 @@ def _reoptimize_flow(
337357
The updated Flow if re-optimization is successful; otherwise, None.
338358
"""
339359
flow = self.flows[flow_index]
340-
flow_volume = flow.placed_flow
341-
new_min_volume = flow_volume + headroom
360+
current_flow_volume = flow.placed_flow
361+
new_min_volume = current_flow_volume + headroom
342362
flow.remove_flow(flow_graph)
343363

344364
path_bundle = self._get_path_bundle(
@@ -349,15 +369,16 @@ def _reoptimize_flow(
349369
flow.excluded_edges,
350370
flow.excluded_nodes,
351371
)
352-
# If no suitable alternative path is found, revert to the original path.
372+
# If no suitable alternative path is found or the new path is the same set of edges,
373+
# revert to the original path.
353374
if not path_bundle or path_bundle.edges == flow.path_bundle.edges:
354-
flow.place_flow(flow_graph, flow_volume, self.flow_placement)
375+
flow.place_flow(flow_graph, current_flow_volume, self.flow_placement)
355376
return None
356377

357378
new_flow = Flow(
358379
path_bundle, flow_index, flow.excluded_edges, flow.excluded_nodes
359380
)
360-
new_flow.place_flow(flow_graph, flow_volume, self.flow_placement)
381+
new_flow.place_flow(flow_graph, current_flow_volume, self.flow_placement)
361382
self.flows[flow_index] = new_flow
362383
return new_flow
363384

@@ -372,8 +393,8 @@ def place_demand(
372393
min_flow: Optional[float] = None,
373394
) -> Tuple[float, float]:
374395
"""
375-
Places the given demand volume on the network graph by splitting or creating flows as needed.
376-
Optionally re-optimizes flows based on the policy configuration.
396+
Places the given demand volume on the network graph by splitting or creating
397+
flows as needed. Optionally re-optimizes flows based on the policy configuration.
377398
378399
Args:
379400
flow_graph: The network graph.
@@ -385,8 +406,11 @@ def place_demand(
385406
min_flow: Minimum flow threshold for path selection.
386407
387408
Returns:
388-
A tuple (placed_flow, remaining_volume) where placed_flow is the total volume
389-
successfully placed and remaining_volume is any unplaced volume.
409+
A tuple (placed_flow, remaining_volume) where placed_flow is the total
410+
volume successfully placed and remaining_volume is any unplaced volume.
411+
412+
Raises:
413+
RuntimeError: If an infinite loop is detected (safety net).
390414
"""
391415
if not self.flows:
392416
self._create_flows(flow_graph, src_node, dst_node, flow_class, min_flow)
@@ -395,9 +419,8 @@ def place_demand(
395419
target_flow_volume = target_flow_volume or volume
396420

397421
total_placed_flow = 0.0
398-
c = 0
422+
iteration_count = 0
399423

400-
# Safety check to prevent infinite loops.
401424
while volume >= base.MIN_FLOW and flow_queue:
402425
flow = flow_queue.popleft()
403426
placed_flow, _ = flow.place_flow(
@@ -409,7 +432,8 @@ def place_demand(
409432
# If the flow can accept more volume, attempt to create or re-optimize.
410433
if (
411434
target_flow_volume - flow.placed_flow >= base.MIN_FLOW
412-
) and not self.static_paths:
435+
and not self.static_paths
436+
):
413437
if not self.max_flow_count or len(self.flows) < self.max_flow_count:
414438
new_flow = self._create_flow(
415439
flow_graph, src_node, dst_node, flow_class
@@ -421,17 +445,14 @@ def place_demand(
421445
if new_flow:
422446
flow_queue.append(new_flow)
423447

424-
c += 1
425-
if c > 10000:
448+
iteration_count += 1
449+
if iteration_count > 10000:
426450
raise RuntimeError("Infinite loop detected in place_demand.")
427451

428452
# For EQUAL_BALANCED placement, rebalance flows to maintain equal volumes.
429-
if (
430-
self.flow_placement == FlowPlacement.EQUAL_BALANCED
431-
and len(self.flows) > 0 # must not rebalance if no flows
432-
):
453+
if self.flow_placement == FlowPlacement.EQUAL_BALANCED and len(self.flows) > 0:
433454
target_flow_volume = self.placed_demand / float(len(self.flows))
434-
# If the flows are not already near balanced
455+
# If flows are not already near balanced, rebalance them.
435456
if any(
436457
abs(target_flow_volume - f.placed_flow) >= base.MIN_FLOW
437458
for f in self.flows.values()
@@ -458,7 +479,8 @@ def rebalance_demand(
458479
) -> Tuple[float, float]:
459480
"""
460481
Rebalances the demand across existing flows so that their volumes are closer
461-
to the target_flow_volume. This is achieved by removing all flows and re-placing the demand.
482+
to the target_flow_volume. This is achieved by removing all flows from
483+
the network graph and re-placing them.
462484
463485
Args:
464486
flow_graph: The network graph.
@@ -468,7 +490,7 @@ def rebalance_demand(
468490
target_flow_volume: The desired volume per flow.
469491
470492
Returns:
471-
A tuple (placed_flow, remaining_volume) similar to place_demand.
493+
A tuple (placed_flow, remaining_volume) similar to place_demand().
472494
"""
473495
volume = self.placed_demand
474496
self.remove_demand(flow_graph)
@@ -479,7 +501,7 @@ def rebalance_demand(
479501
def remove_demand(self, flow_graph: StrictMultiDiGraph) -> None:
480502
"""
481503
Removes all flows from the network graph without clearing internal state.
482-
This enables subsequent re-optimization of flows.
504+
This allows subsequent re-optimization.
483505
484506
Args:
485507
flow_graph: The network graph.
@@ -508,7 +530,8 @@ def get_flow_policy(flow_policy_config: FlowPolicyConfig) -> FlowPolicy:
508530
flow_placement=FlowPlacement.EQUAL_BALANCED,
509531
edge_select=base.EdgeSelect.ALL_MIN_COST,
510532
multipath=True,
511-
max_flow_count=1, # Single flow following shortest paths.
533+
max_flow_count=1, # Single flow from the perspective of the flow object,
534+
# but multipath can create parallel SPF paths.
512535
)
513536
elif flow_policy_config == FlowPolicyConfig.SHORTEST_PATHS_UCMP:
514537
# Hop-by-hop with proportional flow placement (e.g., per-hop UCMP).
@@ -517,7 +540,7 @@ def get_flow_policy(flow_policy_config: FlowPolicyConfig) -> FlowPolicy:
517540
flow_placement=FlowPlacement.PROPORTIONAL,
518541
edge_select=base.EdgeSelect.ALL_MIN_COST,
519542
multipath=True,
520-
max_flow_count=1, # Single flow following shortest paths.
543+
max_flow_count=1,
521544
)
522545
elif flow_policy_config == FlowPolicyConfig.TE_UCMP_UNLIM:
523546
# "Ideal" TE with multiple MPLS LSPs and UCMP flow placement.

ngraph/traffic_demand.py

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from __future__ import annotations
2-
31
from dataclasses import dataclass, field
4-
from typing import Any, Dict
2+
from typing import Any, Dict, Optional
53

6-
from ngraph.lib.flow_policy import FlowPolicyConfig
4+
from ngraph.lib.flow_policy import FlowPolicyConfig, FlowPolicy
75
from ngraph.network import new_base64_uuid
86

97

@@ -12,31 +10,16 @@ class TrafficDemand:
1210
"""
1311
Represents a single traffic demand in a network.
1412
15-
This class provides:
16-
- Source and sink regex patterns to match sets of nodes in the network.
17-
- A total demand volume and a priority (lower number = higher priority).
18-
- A flow policy configuration to specify routing/placement logic (if
19-
not supplied, defaults to SHORTEST_PATHS_ECMP).
20-
- A 'mode' that determines how the demand expands into per-node-pair
21-
demands. Supported modes include:
22-
* "node_to_node": default behavior (each (src, dst) pair shares
23-
the demand).
24-
* "combine": combine all matched sources and all matched sinks,
25-
then distribute the demand among the cross-product of nodes.
26-
* "pairwise": for each (src_label, dst_label) pair, split up the
27-
total demand so each label cross-product receives an equal fraction.
28-
* "one_to_one": match src_labels[i] to dst_labels[i], then split
29-
demand among node pairs in those matched labels.
30-
3113
Attributes:
3214
source_path (str): A regex pattern (string) for selecting source nodes.
3315
sink_path (str): A regex pattern (string) for selecting sink nodes.
3416
priority (int): A priority class for this demand (default=0).
3517
demand (float): The total demand volume (default=0.0).
36-
demand_placed (float): The portion of this demand that has been placed
37-
so far (default=0.0). This is updated when flows are placed.
38-
flow_policy_config (FlowPolicyConfig): The routing/placement policy.
39-
mode (str): Expansion mode for generating sub-demands (defaults to "node_to_node").
18+
demand_placed (float): The portion of this demand that has been placed so far.
19+
flow_policy_config ((Optional[FlowPolicyConfig]): The routing/placement policy config.
20+
flow_policy (Optional[FlowPolicy]): A fully constructed FlowPolicy instance.
21+
If provided, it overrides flow_policy_config.
22+
mode (str): Expansion mode for generating sub-demands.
4023
attrs (Dict[str, Any]): Additional arbitrary attributes.
4124
id (str): Unique ID assigned at initialization.
4225
"""
@@ -46,8 +29,9 @@ class TrafficDemand:
4629
priority: int = 0
4730
demand: float = 0.0
4831
demand_placed: float = 0.0
49-
flow_policy_config: FlowPolicyConfig = FlowPolicyConfig.SHORTEST_PATHS_ECMP
50-
mode: str = "node_to_node"
32+
flow_policy_config: Optional[FlowPolicyConfig] = None
33+
flow_policy: Optional[FlowPolicy] = None
34+
mode: str = "combine"
5135
attrs: Dict[str, Any] = field(default_factory=dict)
5236
id: str = field(init=False)
5337

0 commit comments

Comments
 (0)