Skip to content

Commit 31c84d4

Browse files
committed
MB-70453: Merge remote-tracking branch 'couchbase/trinity' into morpheus
Change-Id: I8a7fa40bd3656aa33cf2de6105f9b2ecfa53487c
2 parents 9c41656 + b43c56d commit 31c84d4

3 files changed

Lines changed: 119 additions & 58 deletions

File tree

primitives/couchbase/memcached.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,18 @@ func (b *Bucket) getConnectionToVBucket(vb uint32, desc *doDescriptor) (*memcach
150150
}
151151

152152
// first part of the retry loop: get a connection and handle errors
153-
func (b *Bucket) getVbConnection(vb uint32, desc *doDescriptor) (conn *memcached.Client, pool *connectionPool, err error) {
153+
// The collections manifest is a bucket-level resource available from any KV node, so we use getVbConnection
154+
// with random=true to pick an arbitrary node while still benefiting from its built-in retry and error-handling logic.
155+
func (b *Bucket) getVbConnection(vb uint32, desc *doDescriptor, random bool) (conn *memcached.Client, pool *connectionPool, err error) {
154156
mark := util.Now()
155157
desc.errorString = ""
156158

157-
// if we had a NMVB and have successfully identified the pool for the correct node, use it
158-
// if it doesn't work out, fall back to the old method
159-
if desc.pool != nil {
159+
if random {
160+
desc.pool = nil
161+
conn, pool, err = b.getRandomConnection()
162+
} else if desc.pool != nil {
163+
// if we had a NMVB and have successfully identified the pool for the correct node, use it
164+
// if it doesn't work out, fall back to the old method
160165
pool = desc.pool
161166
desc.pool = nil
162167
conn, err = pool.Get()
@@ -369,6 +374,17 @@ func (b *Bucket) processOpError(vb uint32, lastError error, node string, desc *d
369374
desc.discard = true
370375
desc.backOffAttempts++
371376
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
377+
} else if isAddrNotAvailable(lastError) {
378+
desc.discard = true
379+
desc.backOffAttempts++
380+
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
381+
if desc.retry {
382+
b.Refresh()
383+
}
384+
} else if isSeveredConnectionError(lastError) {
385+
desc.discard = true
386+
desc.backOffAttempts++
387+
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
372388
} else if IsReadTimeOutError(lastError) {
373389
desc.discard = true
374390
desc.retry = true
@@ -478,28 +494,28 @@ func (b *Bucket) backOffRetries() int {
478494
// Note that this automatically handles transient errors by replaying
479495
// your function on a (for example) "not-my-vbucket" error, so don't assume your command will only be executed once.
480496
func (b *Bucket) do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
481-
return b.do2(k, f, true, false, b.backOffRetries())
497+
return b.do2(k, f, true, false, false, b.backOffRetries())
482498
}
483499

484-
func (b *Bucket) do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool, useReplicas bool,
500+
func (b *Bucket) do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline, useReplicas, random bool,
485501
backOffRetries int) (err error) {
486502

487503
vb := b.VBHash(k)
488504

489505
mark := util.Now()
490-
err = b.do3(uint16(vb), f, deadline, useReplicas, backOffRetries)
506+
err = b.do3(uint16(vb), f, deadline, useReplicas, random, backOffRetries)
491507
atomic.AddUint64(&b.doTime, uint64(util.Since(mark)))
492508
return err
493509
}
494510

495-
func (b *Bucket) do3(vb uint16, f func(mc *memcached.Client, vb uint16) error, deadline bool, useReplicas bool,
511+
func (b *Bucket) do3(vb uint16, f func(mc *memcached.Client, vb uint16) error, deadline, useReplicas, random bool,
496512
backOffRetries int) (err error) {
497513

498514
var lastError error
499515

500516
desc := &doDescriptor{useReplicas: useReplicas, version: b.Version, maxTries: backOffRetries}
501517
for desc.attempts = 0; desc.attempts < desc.maxTries; desc.attempts++ {
502-
conn, pool, err := b.getVbConnection(uint32(vb), desc)
518+
conn, pool, err := b.getVbConnection(uint32(vb), desc, random)
503519
if err != nil {
504520
lastError = err
505521
if desc.retry {
@@ -772,9 +788,6 @@ func isOutOfBoundsError(err error) bool {
772788
return err != nil && strings.Contains(err.Error(), "Out of Bounds error")
773789
}
774790

775-
func isSeveredConnectionError(err error) bool {
776-
return err == io.EOF
777-
}
778791

779792
func isAddrNotAvailable(err error) bool {
780793
if err == nil {
@@ -784,6 +797,20 @@ func isAddrNotAvailable(err error) bool {
784797
return strings.Contains(estr, "cannot assign requested address")
785798
}
786799

800+
func isSeveredConnectionError(err error) bool {
801+
if err == nil {
802+
return false
803+
}
804+
if err == io.EOF {
805+
return true
806+
}
807+
estr := err.Error()
808+
return strings.Contains(estr, "use of closed network connection") ||
809+
strings.Contains(estr, "connection closed") ||
810+
strings.Contains(estr, "broken pipe") ||
811+
strings.Contains(estr, "connection reset")
812+
}
813+
787814
// Get the deadline for your connection
788815
func getDeadline(reqDeadline time.Time, kvTimeout time.Duration, duration time.Duration) (time.Time, error) {
789816

@@ -866,7 +893,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, active func() bool, reqDead
866893
// This stack frame exists to ensure we can clean up
867894
// connection at a reasonable time.
868895
err := func() error {
869-
conn, pool, err := b.getVbConnection(uint32(vb), desc)
896+
conn, pool, err := b.getVbConnection(uint32(vb), desc, false)
870897
if err != nil {
871898
if !desc.retry {
872899
if desc.errorString != "" {
@@ -1530,7 +1557,7 @@ func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline t
15301557
collUid = binary.BigEndian.Uint32(response.Extras[8:12])
15311558

15321559
return nil
1533-
}, false, false, b.backOffRetries())
1560+
}, false, false, true, b.backOffRetries())
15341561

15351562
return collUid, manifestUid, err
15361563
}
@@ -1567,7 +1594,7 @@ func (b *Bucket) GetsMC(key string, active func() bool, reqDeadline time.Time, k
15671594
return err1
15681595
}
15691596
return nil
1570-
}, false, useReplica, b.backOffRetries())
1597+
}, false, useReplica, false, b.backOffRetries())
15711598
atomic.AddUint64(&b.readCount, 1)
15721599
return response, err
15731600
}
@@ -1596,7 +1623,7 @@ func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, kvTimeout time.Du
15961623
response, err1 = mc.GetSubdoc(vb, key, paths, context...)
15971624
atomic.AddUint64(&b.opTime, uint64(util.Since(mark)))
15981625
return err1
1599-
}, false, false, b.backOffRetries())
1626+
}, false, false, false, b.backOffRetries())
16001627
return response, err
16011628
}
16021629

@@ -1623,7 +1650,7 @@ func (b *Bucket) SetsSubDoc(key string, ops []memcached.SubDocOp, context ...*me
16231650
err = processSubdocMultiError(ops, err)
16241651
}
16251652
return err
1626-
}, false, false, b.backOffRetries())
1653+
}, false, false, false, b.backOffRetries())
16271654
if err == nil && err1 != nil {
16281655
err = err1
16291656
}
@@ -1636,7 +1663,7 @@ func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, erro
16361663
var currentPool = 0
16371664
pools := b.getConnPools(false /* not already locked */)
16381665
if len(pools) == 0 {
1639-
return nil, nil, fmt.Errorf("No connection pool found")
1666+
return nil, nil, errNoPool
16401667
} else if len(pools) > 1 { // choose a random connection
16411668
currentPool = rand.Intn(len(pools))
16421669
} // if only one pool, currentPool defaults to 0, i.e., the only pool

primitives/couchbase/memcached_scan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@ func (cqueue *rswCancelQueue) runWorker() {
20252025
retry: true,
20262026
}
20272027
for desc.attempts = 0; desc.attempts < desc.maxTries; {
2028-
conn, pool, err = b.getVbConnection(uint32(vbucket), desc)
2028+
conn, pool, err = b.getVbConnection(uint32(vbucket), desc, false)
20292029
if err != nil {
20302030
if desc.retry {
20312031
desc.attempts++
@@ -2204,7 +2204,7 @@ func (queue *rswQueue) runWorker() {
22042204
retry: true,
22052205
}
22062206
for desc.attempts = 0; desc.attempts < desc.maxTries; {
2207-
conn, pool, err = b.getVbConnection(uint32(vb), desc)
2207+
conn, pool, err = b.getVbConnection(uint32(vb), desc, false)
22082208
if err != nil {
22092209
if desc.retry {
22102210
desc.attempts++

primitives/couchbase/ns_server.go

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,47 +1181,81 @@ func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
11811181
return nil, fmt.Errorf("Collections not enabled.")
11821182
}
11831183

1184-
b.RLock()
1185-
pools := b.getConnPools(true /* already locked */)
1186-
if len(pools) == 0 {
1187-
b.RUnlock()
1188-
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: no connection pool. No collections "+
1189-
"access to bucket %s.", b.Name)
1190-
}
1191-
pool := pools[0] // Any pool will do, so use the first one.
1192-
b.RUnlock()
1193-
client, err := pool.Get()
1194-
if err != nil {
1195-
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to "+
1196-
"bucket %s.", err, b.Name)
1197-
}
1198-
dl, _ := getDeadline(noDeadline, _NO_TIMEOUT, DefaultTimeout)
1199-
client.SetDeadline(dl)
1200-
1201-
// We need to select the bucket before GetCollectionsManifest()
1202-
// will work. This is sometimes done at startup (see defaultMkConn())
1203-
// but not always, depending on the auth type.
1204-
// Doing this is safe because we collect the the connections
1205-
// by bucket, so the bucket being selected will never change.
1206-
_, err = client.SelectBucket(b.Name)
1207-
if err != nil {
1208-
pool.Return(client)
1209-
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
1210-
}
1184+
vb := uint32(b.VBHash("DUMMY")) // any key
1185+
desc := &doDescriptor{version: b.Version, maxTries: b.backOffRetries()}
1186+
var res *gomemcached.MCResponse
1187+
var lastError error
1188+
selectFailed := false
1189+
for desc.attempts = 0; desc.attempts < desc.maxTries; desc.attempts++ {
1190+
// The collections manifest is a bucket-level resource available from any KV node,
1191+
// so we use getVbConnection with random=true to pick an arbitrary node while still
1192+
// benefiting from its built-in retry and error-handling logic.
1193+
conn, pool, err := b.getVbConnection(vb, desc, true)
1194+
if err != nil {
1195+
lastError = err
1196+
if desc.retry {
1197+
continue
1198+
}
1199+
if desc.errorString != "" {
1200+
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest for bucket %s: %v",
1201+
b.Name, fmt.Errorf(desc.errorString, b.Name, err))
1202+
}
1203+
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest for bucket %s: %v",
1204+
b.Name, err)
1205+
}
12111206

1212-
res, err := client.GetCollectionsManifest()
1213-
if err != nil {
1214-
pool.Return(client)
1215-
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
1207+
dl, _ := getDeadline(noDeadline, _NO_TIMEOUT, DefaultTimeout)
1208+
conn.SetDeadline(dl)
1209+
// We need to select the bucket before GetCollectionsManifest()
1210+
// will work. This is sometimes done at startup (see defaultMkConn())
1211+
// but not always, depending on the auth type.
1212+
// Doing this is safe because we collect the the connections
1213+
// by bucket, so the bucket being selected will never change.
1214+
selectFailed = false
1215+
_, lastError = conn.SelectBucket(b.Name)
1216+
if lastError == nil {
1217+
res, lastError = conn.GetCollectionsManifest()
1218+
} else {
1219+
selectFailed = true
1220+
}
1221+
b.processOpError(vb, lastError, pool.Node(), desc)
1222+
if desc.discard {
1223+
pool.Discard(conn)
1224+
} else {
1225+
conn.SetReplica(false)
1226+
pool.Return(conn)
1227+
}
1228+
if lastError == nil {
1229+
mani, err := parseCollectionsManifest(res)
1230+
if err != nil {
1231+
return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.",
1232+
err, b.Name)
1233+
}
1234+
return mani, nil
1235+
} else if !desc.retry {
1236+
if selectFailed {
1237+
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.",
1238+
b.Name, lastError, b.Name)
1239+
}
1240+
1241+
break
1242+
}
12161243
}
1217-
mani, err := parseCollectionsManifest(res)
1218-
if err != nil {
1219-
pool.Return(client)
1220-
return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
1244+
if lastError != nil {
1245+
if selectFailed {
1246+
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.",
1247+
b.Name, lastError, b.Name)
1248+
}
1249+
if desc.errorString != "" {
1250+
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.",
1251+
fmt.Errorf(desc.errorString, b.Name, lastError), b.Name)
1252+
}
1253+
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v after %d attempts. No collections access to bucket %s.", lastError, desc.attempts, b.Name)
12211254
}
1222-
1223-
pool.Return(client)
1224-
return mani, nil
1255+
// The final return here is only reachable when lastError == nil after the loop exits. That can only happen if desc.maxTries is 0 — the loop condition
1256+
// desc.attempts < desc.maxTries is immediately false, the body never executes, and lastError stays at its zero value (nil). This is just a defensive
1257+
// guard for the maxTries == 0 edge case and also satisfies Go's requirement that all code paths return a value.
1258+
return nil, fmt.Errorf("Unable to retrieve collections manifest after %d attempts. No collections access to bucket %s.", desc.attempts, b.Name)
12251259
}
12261260

12271261
func (b *Bucket) RefreshFully() error {

0 commit comments

Comments
 (0)