Skip to content

Commit b43c56d

Browse files
committed
MB-70453: Add random connection support and improve error handling
- Add random bool parameter to getVbConnection() to support random connection selection for bucket-level operations that don't require vbucket-specific routing - Enhance processOpError() to handle isAddrNotAvailable and isSeveredConnectionError with proper retry logic and connection discarding - Add isSeveredConnectionError() helper function to detect closed/severed connection errors - Refactor GetCollectionsManifest() to use getVbConnection() with random=true and leverage existing retry infrastructure via processOpError() - Update all getVbConnection() call sites to pass random parameter: - GetCollectionCID: random=true (bucket-level operation) - GetCollectionsManifest: random=true (bucket-level operation) - All other operations: random=false (vbucket-specific) - Fix getRandomConnection() to return errNoPool instead of formatted error for consistency with error handling Change-Id: I06d474603aea1360d4fb2e286e6ecd65693f0b3d Reviewed-on: https://review.couchbase.org/c/query/+/240776 Well-Formed: Restriction Checker Reviewed-by: Dhanya Gowrish <dhanya.gowrish@couchbase.com> Tested-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
1 parent d624372 commit b43c56d

3 files changed

Lines changed: 120 additions & 55 deletions

File tree

primitives/couchbase/memcached.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,17 @@ func (b *Bucket) getConnectionToVBucket(vb uint32, desc *doDescriptor) (*memcach
150150
}
151151

152152
// first part of the retry loop: get a connection and handle errors
153-
func (b *Bucket) getVbConnection(vb uint32, desc *doDescriptor) (conn *memcached.Client, pool *connectionPool, err error) {
153+
// The collections manifest is a bucket-level resource available from any KV node, so we use getVbConnection
154+
// with random=true to pick an arbitrary node while still benefiting from its built-in retry and error-handling logic.
155+
func (b *Bucket) getVbConnection(vb uint32, desc *doDescriptor, random bool) (conn *memcached.Client, pool *connectionPool, err error) {
154156
desc.errorString = ""
155157

156-
// if we had a NMVB and have successfully identified the pool for the correct node, use it
157-
// if it doesn't work out, fall back to the old method
158-
if desc.pool != nil {
158+
if random {
159+
desc.pool = nil
160+
conn, pool, err = b.getRandomConnection()
161+
} else if desc.pool != nil {
162+
// if we had a NMVB and have successfully identified the pool for the correct node, use it
163+
// if it doesn't work out, fall back to the old method
159164
pool = desc.pool
160165
desc.pool = nil
161166
conn, err = pool.Get()
@@ -360,6 +365,17 @@ func (b *Bucket) processOpError(vb uint32, lastError error, node string, desc *d
360365
desc.discard = true
361366
desc.backOffAttempts++
362367
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
368+
} else if isAddrNotAvailable(lastError) {
369+
desc.discard = true
370+
desc.backOffAttempts++
371+
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
372+
if desc.retry {
373+
b.Refresh()
374+
}
375+
} else if isSeveredConnectionError(lastError) {
376+
desc.discard = true
377+
desc.backOffAttempts++
378+
desc.retry = backOff(desc.backOffAttempts, desc.maxTries, backOffDuration, true)
363379
} else if IsReadTimeOutError(lastError) {
364380
desc.discard = true
365381
desc.retry = true
@@ -460,25 +476,26 @@ func (b *Bucket) backOffRetries() int {
460476
// Note that this automatically handles transient errors by replaying
461477
// your function on a (for example) "not-my-vbucket" error, so don't assume your command will only be executed once.
462478
func (b *Bucket) do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
463-
return b.do2(k, f, true, false, b.backOffRetries())
479+
return b.do2(k, f, true, false, false, b.backOffRetries())
464480
}
465481

466-
func (b *Bucket) do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool, useReplicas bool,
482+
func (b *Bucket) do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline, useReplicas, random bool,
467483
backOffRetries int) (err error) {
468484

469485
vb := b.VBHash(k)
470486

471-
return b.do3(uint16(vb), f, deadline, useReplicas, backOffRetries)
487+
err = b.do3(uint16(vb), f, deadline, useReplicas, random, backOffRetries)
488+
return err
472489
}
473490

474-
func (b *Bucket) do3(vb uint16, f func(mc *memcached.Client, vb uint16) error, deadline bool, useReplicas bool,
491+
func (b *Bucket) do3(vb uint16, f func(mc *memcached.Client, vb uint16) error, deadline, useReplicas, random bool,
475492
backOffRetries int) (err error) {
476493

477494
var lastError error
478495

479496
desc := &doDescriptor{useReplicas: useReplicas, version: b.Version, maxTries: backOffRetries}
480497
for desc.attempts = 0; desc.attempts < desc.maxTries; desc.attempts++ {
481-
conn, pool, err := b.getVbConnection(uint32(vb), desc)
498+
conn, pool, err := b.getVbConnection(uint32(vb), desc, random)
482499
if err != nil {
483500
if desc.retry {
484501
continue
@@ -751,6 +768,20 @@ func isAddrNotAvailable(err error) bool {
751768
return strings.Contains(estr, "cannot assign requested address")
752769
}
753770

771+
func isSeveredConnectionError(err error) bool {
772+
if err == nil {
773+
return false
774+
}
775+
if err == io.EOF {
776+
return true
777+
}
778+
estr := err.Error()
779+
return strings.Contains(estr, "use of closed network connection") ||
780+
strings.Contains(estr, "connection closed") ||
781+
strings.Contains(estr, "broken pipe") ||
782+
strings.Contains(estr, "connection reset")
783+
}
784+
754785
// Get the deadline for your connection
755786
func getDeadline(reqDeadline time.Time, kvTimeout time.Duration, duration time.Duration) (time.Time, error) {
756787

@@ -833,7 +864,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string, active func() bool, reqDead
833864
// This stack frame exists to ensure we can clean up
834865
// connection at a reasonable time.
835866
err := func() error {
836-
conn, pool, err := b.getVbConnection(uint32(vb), desc)
867+
conn, pool, err := b.getVbConnection(uint32(vb), desc, false)
837868
if err != nil {
838869
if !desc.retry {
839870
if desc.errorString != "" {
@@ -1405,7 +1436,7 @@ func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline t
14051436
collUid = binary.BigEndian.Uint32(response.Extras[8:12])
14061437

14071438
return nil
1408-
}, false, false, b.backOffRetries())
1439+
}, false, false, true, b.backOffRetries())
14091440

14101441
return collUid, manifestUid, err
14111442
}
@@ -1440,7 +1471,7 @@ func (b *Bucket) GetsMC(key string, active func() bool, reqDeadline time.Time, k
14401471
return err1
14411472
}
14421473
return nil
1443-
}, false, useReplica, b.backOffRetries())
1474+
}, false, useReplica, false, b.backOffRetries())
14441475
atomic.AddUint64(&b.readCount, 1)
14451476
return response, err
14461477
}
@@ -1467,7 +1498,7 @@ func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, kvTimeout time.Du
14671498
mc.SetDeadline(dl)
14681499
response, err1 = mc.GetSubdoc(vb, key, paths, context...)
14691500
return err1
1470-
}, false, false, b.backOffRetries())
1501+
}, false, false, false, b.backOffRetries())
14711502
return response, err
14721503
}
14731504

@@ -1487,7 +1518,7 @@ func (b *Bucket) SetsSubDoc(key string, ops []memcached.SubDocOp, context ...*me
14871518
mc.SetDeadline(noDeadline)
14881519
response, err = mc.SetSubdoc(vb, key, ops, context...)
14891520
return err
1490-
}, false, false, b.backOffRetries())
1521+
}, false, false, false, b.backOffRetries())
14911522
if err == nil && err1 != nil {
14921523
err = err1
14931524
}
@@ -1500,7 +1531,7 @@ func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, erro
15001531
var currentPool = 0
15011532
pools := b.getConnPools(false /* not already locked */)
15021533
if len(pools) == 0 {
1503-
return nil, nil, fmt.Errorf("No connection pool found")
1534+
return nil, nil, errNoPool
15041535
} else if len(pools) > 1 { // choose a random connection
15051536
currentPool = rand.Intn(len(pools))
15061537
} // if only one pool, currentPool defaults to 0, i.e., the only pool

primitives/couchbase/memcached_scan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,7 +1738,7 @@ func (cqueue *rswCancelQueue) runWorker() {
17381738
vbucket = cr[i].vbucket
17391739
desc := &doDescriptor{useReplicas: true, version: b.Version, maxTries: b.backOffRetries(), retry: true}
17401740
for desc.attempts = 0; desc.attempts < desc.maxTries; {
1741-
conn, pool, err = b.getVbConnection(uint32(vbucket), desc)
1741+
conn, pool, err = b.getVbConnection(uint32(vbucket), desc, false)
17421742
if err != nil {
17431743
if desc.retry {
17441744
desc.attempts++
@@ -1914,7 +1914,7 @@ func (queue *rswQueue) runWorker() {
19141914
desc := &doDescriptor{useReplicas: vbscan.scan.useReplica, version: b.Version, maxTries: b.backOffRetries(),
19151915
retry: true}
19161916
for desc.attempts = 0; desc.attempts < desc.maxTries; {
1917-
conn, pool, err = b.getVbConnection(uint32(vb), desc)
1917+
conn, pool, err = b.getVbConnection(uint32(vb), desc, false)
19181918
if err != nil {
19191919
if desc.retry {
19201920
desc.attempts++

primitives/couchbase/ns_server.go

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,47 +1108,81 @@ func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
11081108
return nil, fmt.Errorf("Collections not enabled.")
11091109
}
11101110

1111-
b.RLock()
1112-
pools := b.getConnPools(true /* already locked */)
1113-
if len(pools) == 0 {
1114-
b.RUnlock()
1115-
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: no connection pool. No collections "+
1116-
"access to bucket %s.", b.Name)
1117-
}
1118-
pool := pools[0] // Any pool will do, so use the first one.
1119-
b.RUnlock()
1120-
client, err := pool.Get()
1121-
if err != nil {
1122-
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to "+
1123-
"bucket %s.", err, b.Name)
1124-
}
1125-
dl, _ := getDeadline(noDeadline, _NO_TIMEOUT, DefaultTimeout)
1126-
client.SetDeadline(dl)
1127-
1128-
// We need to select the bucket before GetCollectionsManifest()
1129-
// will work. This is sometimes done at startup (see defaultMkConn())
1130-
// but not always, depending on the auth type.
1131-
// Doing this is safe because we collect the the connections
1132-
// by bucket, so the bucket being selected will never change.
1133-
_, err = client.SelectBucket(b.Name)
1134-
if err != nil {
1135-
pool.Return(client)
1136-
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
1137-
}
1111+
vb := uint32(b.VBHash("DUMMY")) // any key
1112+
desc := &doDescriptor{version: b.Version, maxTries: b.backOffRetries()}
1113+
var res *gomemcached.MCResponse
1114+
var lastError error
1115+
selectFailed := false
1116+
for desc.attempts = 0; desc.attempts < desc.maxTries; desc.attempts++ {
1117+
// The collections manifest is a bucket-level resource available from any KV node,
1118+
// so we use getVbConnection with random=true to pick an arbitrary node while still
1119+
// benefiting from its built-in retry and error-handling logic.
1120+
conn, pool, err := b.getVbConnection(vb, desc, true)
1121+
if err != nil {
1122+
lastError = err
1123+
if desc.retry {
1124+
continue
1125+
}
1126+
if desc.errorString != "" {
1127+
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest for bucket %s: %v",
1128+
b.Name, fmt.Errorf(desc.errorString, b.Name, err))
1129+
}
1130+
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest for bucket %s: %v",
1131+
b.Name, err)
1132+
}
11381133

1139-
res, err := client.GetCollectionsManifest()
1140-
if err != nil {
1141-
pool.Return(client)
1142-
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
1134+
dl, _ := getDeadline(noDeadline, _NO_TIMEOUT, DefaultTimeout)
1135+
conn.SetDeadline(dl)
1136+
// We need to select the bucket before GetCollectionsManifest()
1137+
// will work. This is sometimes done at startup (see defaultMkConn())
1138+
// but not always, depending on the auth type.
1139+
// Doing this is safe because we collect the the connections
1140+
// by bucket, so the bucket being selected will never change.
1141+
selectFailed = false
1142+
_, lastError = conn.SelectBucket(b.Name)
1143+
if lastError == nil {
1144+
res, lastError = conn.GetCollectionsManifest()
1145+
} else {
1146+
selectFailed = true
1147+
}
1148+
b.processOpError(vb, lastError, pool.Node(), desc)
1149+
if desc.discard {
1150+
pool.Discard(conn)
1151+
} else {
1152+
conn.SetReplica(false)
1153+
pool.Return(conn)
1154+
}
1155+
if lastError == nil {
1156+
mani, err := parseCollectionsManifest(res)
1157+
if err != nil {
1158+
return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.",
1159+
err, b.Name)
1160+
}
1161+
return mani, nil
1162+
} else if !desc.retry {
1163+
if selectFailed {
1164+
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.",
1165+
b.Name, lastError, b.Name)
1166+
}
1167+
1168+
break
1169+
}
11431170
}
1144-
mani, err := parseCollectionsManifest(res)
1145-
if err != nil {
1146-
pool.Return(client)
1147-
return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
1171+
if lastError != nil {
1172+
if selectFailed {
1173+
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.",
1174+
b.Name, lastError, b.Name)
1175+
}
1176+
if desc.errorString != "" {
1177+
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.",
1178+
fmt.Errorf(desc.errorString, b.Name, lastError), b.Name)
1179+
}
1180+
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v after %d attempts. No collections access to bucket %s.", lastError, desc.attempts, b.Name)
11481181
}
1149-
1150-
pool.Return(client)
1151-
return mani, nil
1182+
// The final return here is only reachable when lastError == nil after the loop exits. That can only happen if desc.maxTries is 0 — the loop condition
1183+
// desc.attempts < desc.maxTries is immediately false, the body never executes, and lastError stays at its zero value (nil). This is just a defensive
1184+
// guard for the maxTries == 0 edge case and also satisfies Go's requirement that all code paths return a value.
1185+
return nil, fmt.Errorf("Unable to retrieve collections manifest after %d attempts. No collections access to bucket %s.", desc.attempts, b.Name)
11521186
}
11531187

11541188
func (b *Bucket) RefreshFully() error {

0 commit comments

Comments
 (0)