Skip to content

Commit 967490f

Browse files
hr-alebelmatthewmcneely
authored andcommitted
Added a backpressure mechanism to shortestPath
The backpressure mechanism should prevent dropping possible paths from the priority queue, by not filling the queue when there is too much in it.
1 parent da18664 commit 967490f

1 file changed

Lines changed: 19 additions & 2 deletions

File tree

query/shortest.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ var errFacet = errors.Errorf("Skip the edge")
5353

5454
type priorityQueue []*queueItem
5555

56+
func computeLowWatermark(maxFrontierSize int64) int64 {
57+
if maxFrontierSize <= 0 {
58+
return 1
59+
}
60+
lowWatermark := (maxFrontierSize * 60) / 100
61+
if lowWatermark < 1 {
62+
lowWatermark = 1
63+
}
64+
return lowWatermark
65+
}
66+
5667
func (r *route) indexOf(uid uint64) int {
5768
for i, val := range *r.route {
5869
if val.uid == uid {
@@ -311,6 +322,8 @@ func runKShortestPaths(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
311322
return nil, nil
312323
}
313324

325+
lowWatermark := computeLowWatermark(sg.Params.MaxFrontierSize)
326+
314327
minWeight := sg.Params.MinWeight
315328
maxWeight := sg.Params.MaxWeight
316329
next := make(chan bool, 2)
@@ -344,7 +357,8 @@ func runKShortestPaths(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
344357
break
345358
}
346359
}
347-
if item.hop > numHops-1 && numHops < maxHops {
360+
shouldExpand := int64(pq.Len()) < lowWatermark
361+
if item.hop > numHops-1 && numHops < maxHops && shouldExpand {
348362
// Explore the next level by calling processGraph and add them
349363
// to the queue.
350364
if !stopExpansion {
@@ -490,6 +504,8 @@ func shortestPath(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
490504
return nil, nil
491505
}
492506

507+
lowWatermark := computeLowWatermark(sg.Params.MaxFrontierSize)
508+
493509
// next is a channel on to which we send a signal so as to perform another level of expansion.
494510
next := make(chan bool, 2)
495511
expandErr := make(chan error, 2)
@@ -521,7 +537,8 @@ func shortestPath(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
521537
break
522538
}
523539

524-
if numHops < maxHops && item.hop > numHops-1 {
540+
shouldExpand := int64(pq.Len()) < lowWatermark
541+
if numHops < maxHops && item.hop > numHops-1 && shouldExpand {
525542
// Explore the next level by calling processGraph and add them to the queue.
526543
if !stopExpansion {
527544
next <- true

0 commit comments

Comments
 (0)