Skip to content

Commit bf3d9cc

Browse files
author
Santhosh Manohar
authored
Merge pull request #1467 from mrjana/networkdb
Purge stale nodes with same prefix and IP
2 parents 378d556 + 520a4c5 commit bf3d9cc

3 files changed

Lines changed: 49 additions & 18 deletions

File tree

networkdb/cluster.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() {
237237
}
238238
nDB.RUnlock()
239239

240-
// Update all the local state to a new time to force update on
241-
// the node we are trying to rejoin, just in case that node
242-
// has these in leaving/deleting state still. This is
243-
// facilitate fast convergence after recovering from a gossip
244-
// failure.
245-
nDB.updateLocalStateTime()
246-
247240
node := nodes[randomOffset(len(nodes))]
248241
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
249242

@@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() {
256249
return
257250
}
258251

252+
// Update all the local table state to a new time to
253+
// force update on the node we are trying to rejoin, just in
254+
// case that node has these in deleting state still. This is
255+
// facilitate fast convergence after recovering from a gossip
256+
// failure.
257+
nDB.updateLocalTableTime()
258+
259259
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
260260
nDB.bulkSync([]string{node.Name}, true)
261261
}

networkdb/delegate.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package networkdb
33
import (
44
"fmt"
55
"net"
6+
"strings"
67
"time"
78

89
"github.com/Sirupsen/logrus"
@@ -31,24 +32,44 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
3132
return nil
3233
}
3334

34-
delete(nDB.failedNodes, n.Name)
35+
delete(nodes, n.Name)
3536
return n
3637
}
3738
}
3839

3940
return nil
4041
}
4142

42-
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
43-
// Update our local clock if the received messages has newer
44-
// time.
45-
nDB.networkClock.Witness(nEvent.LTime)
43+
func (nDB *NetworkDB) purgeSameNode(n *node) {
44+
nDB.Lock()
45+
defer nDB.Unlock()
4646

47+
prefix := strings.Split(n.Name, "-")[0]
48+
for _, nodes := range []map[string]*node{
49+
nDB.failedNodes,
50+
nDB.leftNodes,
51+
nDB.nodes,
52+
} {
53+
var nodeNames []string
54+
for name, node := range nodes {
55+
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
56+
nodeNames = append(nodeNames, name)
57+
}
58+
}
59+
60+
for _, name := range nodeNames {
61+
delete(nodes, name)
62+
}
63+
}
64+
}
65+
66+
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
4767
n := nDB.checkAndGetNode(nEvent)
4868
if n == nil {
4969
return false
5070
}
5171

72+
nDB.purgeSameNode(n)
5273
n.ltime = nEvent.LTime
5374

5475
switch nEvent.Type {
@@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
357378
}
358379

359380
func (d *delegate) LocalState(join bool) []byte {
381+
if join {
382+
// Update all the local node/network state to a new time to
383+
// force update on the node we are trying to rejoin, just in
384+
// case that node has these in leaving state still. This is
385+
// facilitate fast convergence after recovering from a gossip
386+
// failure.
387+
d.nDB.updateLocalNetworkTime()
388+
}
389+
360390
d.nDB.RLock()
361391
defer d.nDB.RUnlock()
362392

@@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
408438
return
409439
}
410440

411-
if pp.LTime > 0 {
412-
d.nDB.networkClock.Witness(pp.LTime)
413-
}
414-
415441
nodeEvent := &NodeEvent{
416442
LTime: pp.LTime,
417443
NodeName: pp.NodeName,

networkdb/networkdb.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,16 +524,21 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
524524
return networks
525525
}
526526

527-
func (nDB *NetworkDB) updateLocalStateTime() {
527+
func (nDB *NetworkDB) updateLocalNetworkTime() {
528528
nDB.Lock()
529529
defer nDB.Unlock()
530530

531531
ltime := nDB.networkClock.Increment()
532532
for _, n := range nDB.networks[nDB.config.NodeName] {
533533
n.ltime = ltime
534534
}
535+
}
536+
537+
func (nDB *NetworkDB) updateLocalTableTime() {
538+
nDB.Lock()
539+
defer nDB.Unlock()
535540

536-
ltime = nDB.tableClock.Increment()
541+
ltime := nDB.tableClock.Increment()
537542
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
538543
entry := v.(*entry)
539544
if entry.node != nDB.config.NodeName {

0 commit comments

Comments
 (0)