Skip to content

Commit 537bcde

Browse files
author
Flavio Crisciani
authored
Merge pull request moby#2040 from fcrisciani/memberlist_revendor
Memberlist revendor and optimizations
2 parents 5ab4ab8 + 44298c9 commit 537bcde

13 files changed

Lines changed: 262 additions & 175 deletions

File tree

networkdb/delegate.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
165165
}
166166
}
167167
nDB.RUnlock()
168+
168169
if !ok || network.leaving || !nodePresent {
169170
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
170171
return false
171172
}
172173

174+
nDB.Lock()
173175
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
174176
if err == nil {
175177
// We have the latest state. Ignore the event
176178
// since it is stale.
177179
if e.ltime >= tEvent.LTime {
180+
nDB.Unlock()
178181
return false
179182
}
180183
}
@@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
195198
nDB.config.Hostname, nDB.config.NodeID, tEvent)
196199
e.reapTime = nDB.config.reapEntryInterval
197200
}
198-
199-
nDB.Lock()
200201
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
201202
nDB.Unlock()
202203

networkdb/event_delegate.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
2626
e.broadcastNodeEvent(mn.Addr, opCreate)
2727
e.nDB.Lock()
2828
defer e.nDB.Unlock()
29+
2930
// In case the node is rejoining after a failure or leave,
30-
// wait until an explicit join message arrives before adding
31-
// it to the nodes just to make sure this is not a stale
32-
// join. If you don't know about this node add it immediately.
33-
_, fOk := e.nDB.failedNodes[mn.Name]
34-
_, lOk := e.nDB.leftNodes[mn.Name]
35-
if fOk || lOk {
31+
// just add the node back to active
32+
if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
3633
return
3734
}
3835

networkdb/networkdb.go

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
322322
// GetEntry retrieves the value of a table entry in a given (network,
323323
// table, key) tuple
324324
func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
325+
nDB.RLock()
326+
defer nDB.RUnlock()
325327
entry, err := nDB.getEntry(tname, nid, key)
326328
if err != nil {
327329
return nil, err
@@ -331,9 +333,6 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
331333
}
332334

333335
func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
334-
nDB.RLock()
335-
defer nDB.RUnlock()
336-
337336
e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
338337
if !ok {
339338
return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
@@ -348,13 +347,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
348347
// entry for the same tuple for which there is already an existing
349348
// entry unless the current entry is deleting state.
350349
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
350+
nDB.Lock()
351351
oldEntry, err := nDB.getEntry(tname, nid, key)
352-
if err != nil {
353-
if _, ok := err.(types.NotFoundError); !ok {
354-
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
355-
}
356-
}
357-
if oldEntry != nil && !oldEntry.deleting {
352+
if err == nil || (oldEntry != nil && !oldEntry.deleting) {
353+
nDB.Unlock()
358354
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
359355
}
360356

@@ -364,14 +360,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
364360
value: value,
365361
}
366362

363+
nDB.createOrUpdateEntry(nid, tname, key, entry)
364+
nDB.Unlock()
365+
367366
if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
368367
return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
369368
}
370369

371-
nDB.Lock()
372-
nDB.createOrUpdateEntry(nid, tname, key, entry)
373-
nDB.Unlock()
374-
375370
return nil
376371
}
377372

@@ -380,7 +375,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
380375
// propagates this event to the cluster. It is an error to update a
381376
// non-existent entry.
382377
func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
383-
if _, err := nDB.GetEntry(tname, nid, key); err != nil {
378+
nDB.Lock()
379+
if _, err := nDB.getEntry(tname, nid, key); err != nil {
380+
nDB.Unlock()
384381
return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
385382
}
386383

@@ -390,14 +387,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
390387
value: value,
391388
}
392389

390+
nDB.createOrUpdateEntry(nid, tname, key, entry)
391+
nDB.Unlock()
392+
393393
if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
394394
return fmt.Errorf("cannot send table update event: %v", err)
395395
}
396396

397-
nDB.Lock()
398-
nDB.createOrUpdateEntry(nid, tname, key, entry)
399-
nDB.Unlock()
400-
401397
return nil
402398
}
403399

@@ -427,27 +423,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem
427423
// table, key) tuple and if the NetworkDB is part of the cluster
428424
// propagates this event to the cluster.
429425
func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
430-
value, err := nDB.GetEntry(tname, nid, key)
431-
if err != nil {
432-
return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
426+
nDB.Lock()
427+
oldEntry, err := nDB.getEntry(tname, nid, key)
428+
if err != nil || oldEntry == nil || oldEntry.deleting {
429+
nDB.Unlock()
430+
return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
431+
"does not exist or is already being deleted", tname, nid, key)
433432
}
434433

435434
entry := &entry{
436435
ltime: nDB.tableClock.Increment(),
437436
node: nDB.config.NodeID,
438-
value: value,
437+
value: oldEntry.value,
439438
deleting: true,
440439
reapTime: nDB.config.reapEntryInterval,
441440
}
442441

442+
nDB.createOrUpdateEntry(nid, tname, key, entry)
443+
nDB.Unlock()
444+
443445
if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
444446
return fmt.Errorf("cannot send table delete event: %v", err)
445447
}
446448

447-
nDB.Lock()
448-
nDB.createOrUpdateEntry(nid, tname, key, entry)
449-
nDB.Unlock()
450-
451449
return nil
452450
}
453451

networkdb/networkdb_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,3 +735,64 @@ func TestNodeReincarnation(t *testing.T) {
735735

736736
closeNetworkDBInstances(dbs)
737737
}
738+
739+
func TestParallelCreate(t *testing.T) {
740+
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
741+
742+
startCh := make(chan int)
743+
doneCh := make(chan error)
744+
var success int32
745+
for i := 0; i < 20; i++ {
746+
go func() {
747+
<-startCh
748+
err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
749+
if err == nil {
750+
atomic.AddInt32(&success, 1)
751+
}
752+
doneCh <- err
753+
}()
754+
}
755+
756+
close(startCh)
757+
758+
for i := 0; i < 20; i++ {
759+
<-doneCh
760+
}
761+
close(doneCh)
762+
// Only 1 write should have succeeded
763+
assert.Equal(t, int32(1), success)
764+
765+
closeNetworkDBInstances(dbs)
766+
}
767+
768+
func TestParallelDelete(t *testing.T) {
769+
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
770+
771+
err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
772+
assert.NoError(t, err)
773+
774+
startCh := make(chan int)
775+
doneCh := make(chan error)
776+
var success int32
777+
for i := 0; i < 20; i++ {
778+
go func() {
779+
<-startCh
780+
err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
781+
if err == nil {
782+
atomic.AddInt32(&success, 1)
783+
}
784+
doneCh <- err
785+
}()
786+
}
787+
788+
close(startCh)
789+
790+
for i := 0; i < 20; i++ {
791+
<-doneCh
792+
}
793+
close(doneCh)
794+
// Only 1 write should have succeeded
795+
assert.Equal(t, int32(1), success)
796+
797+
closeNetworkDBInstances(dbs)
798+
}

vendor.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ github.com/gorilla/mux v1.1
2727
github.com/hashicorp/consul v0.5.2
2828
github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
2929
github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e
30-
github.com/hashicorp/memberlist v0.1.0
30+
github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c
3131
github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372
3232
github.com/hashicorp/go-sockaddr acd314c5781ea706c710d9ea70069fd2e110d61d
3333
github.com/hashicorp/serf 598c54895cc5a7b1a24a398d635e8c0ea0959870

vendor/github.com/hashicorp/memberlist/README.md

Lines changed: 7 additions & 76 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/hashicorp/memberlist/config.go

Lines changed: 16 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)