Skip to content

Commit 520a4c5

Browse files
committed
Purge stale nodes with same prefix and IP
Since the node name randomization fix, we need to make sure that we purge the old node with the same prefix and same IP from the nodes database if it still present. This causes unnecessary reconnect attempts. Also added a change to avoid unnecessary update of local lamport time and only do it of we are ready to do a push pull on a join. Join should happen only when the node is bootstrapped or when trying to reconnect with a failed node. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
1 parent 378d556 commit 520a4c5

3 files changed

Lines changed: 49 additions & 18 deletions

File tree

networkdb/cluster.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() {
237237
}
238238
nDB.RUnlock()
239239

240-
// Update all the local state to a new time to force update on
241-
// the node we are trying to rejoin, just in case that node
242-
// has these in leaving/deleting state still. This is
243-
// facilitate fast convergence after recovering from a gossip
244-
// failure.
245-
nDB.updateLocalStateTime()
246-
247240
node := nodes[randomOffset(len(nodes))]
248241
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
249242

@@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() {
256249
return
257250
}
258251

252+
// Update all the local table state to a new time to
253+
// force update on the node we are trying to rejoin, just in
254+
// case that node has these in deleting state still. This is
255+
// facilitate fast convergence after recovering from a gossip
256+
// failure.
257+
nDB.updateLocalTableTime()
258+
259259
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
260260
nDB.bulkSync([]string{node.Name}, true)
261261
}

networkdb/delegate.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package networkdb
33
import (
44
"fmt"
55
"net"
6+
"strings"
67
"time"
78

89
"github.com/Sirupsen/logrus"
@@ -31,24 +32,44 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
3132
return nil
3233
}
3334

34-
delete(nDB.failedNodes, n.Name)
35+
delete(nodes, n.Name)
3536
return n
3637
}
3738
}
3839

3940
return nil
4041
}
4142

42-
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
43-
// Update our local clock if the received messages has newer
44-
// time.
45-
nDB.networkClock.Witness(nEvent.LTime)
43+
func (nDB *NetworkDB) purgeSameNode(n *node) {
44+
nDB.Lock()
45+
defer nDB.Unlock()
4646

47+
prefix := strings.Split(n.Name, "-")[0]
48+
for _, nodes := range []map[string]*node{
49+
nDB.failedNodes,
50+
nDB.leftNodes,
51+
nDB.nodes,
52+
} {
53+
var nodeNames []string
54+
for name, node := range nodes {
55+
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
56+
nodeNames = append(nodeNames, name)
57+
}
58+
}
59+
60+
for _, name := range nodeNames {
61+
delete(nodes, name)
62+
}
63+
}
64+
}
65+
66+
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
4767
n := nDB.checkAndGetNode(nEvent)
4868
if n == nil {
4969
return false
5070
}
5171

72+
nDB.purgeSameNode(n)
5273
n.ltime = nEvent.LTime
5374

5475
switch nEvent.Type {
@@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
357378
}
358379

359380
func (d *delegate) LocalState(join bool) []byte {
381+
if join {
382+
// Update all the local node/network state to a new time to
383+
// force update on the node we are trying to rejoin, just in
384+
// case that node has these in leaving state still. This is
385+
// facilitate fast convergence after recovering from a gossip
386+
// failure.
387+
d.nDB.updateLocalNetworkTime()
388+
}
389+
360390
d.nDB.RLock()
361391
defer d.nDB.RUnlock()
362392

@@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
408438
return
409439
}
410440

411-
if pp.LTime > 0 {
412-
d.nDB.networkClock.Witness(pp.LTime)
413-
}
414-
415441
nodeEvent := &NodeEvent{
416442
LTime: pp.LTime,
417443
NodeName: pp.NodeName,

networkdb/networkdb.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,16 +524,21 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
524524
return networks
525525
}
526526

527-
func (nDB *NetworkDB) updateLocalStateTime() {
527+
func (nDB *NetworkDB) updateLocalNetworkTime() {
528528
nDB.Lock()
529529
defer nDB.Unlock()
530530

531531
ltime := nDB.networkClock.Increment()
532532
for _, n := range nDB.networks[nDB.config.NodeName] {
533533
n.ltime = ltime
534534
}
535+
}
536+
537+
func (nDB *NetworkDB) updateLocalTableTime() {
538+
nDB.Lock()
539+
defer nDB.Unlock()
535540

536-
ltime = nDB.tableClock.Increment()
541+
ltime := nDB.tableClock.Increment()
537542
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
538543
entry := v.(*entry)
539544
if entry.node != nDB.config.NodeName {

0 commit comments

Comments
 (0)