Skip to content

Commit c9bf3a7

Browse files
dannykoppingbradfitz
authored andcommitted
memcache: extend connection deadline before each multi-get operation
This cherry-picks grafana#16
1 parent 24af94b commit c9bf3a7

1 file changed

Lines changed: 23 additions & 14 deletions

File tree

memcache/memcache.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -363,30 +363,31 @@ func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
363363
return fn(addr)
364364
}
365365

366-
func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (err error) {
366+
func (c *Client) withAddrRw(addr net.Addr, fn func(*conn) error) (err error) {
367367
cn, err := c.getConn(addr)
368368
if err != nil {
369369
return err
370370
}
371371
defer cn.condRelease(&err)
372-
return fn(cn.rw)
372+
return fn(cn)
373373
}
374374

375-
func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {
375+
func (c *Client) withKeyRw(key string, fn func(*conn) error) error {
376376
return c.withKeyAddr(key, func(addr net.Addr) error {
377377
return c.withAddrRw(addr, fn)
378378
})
379379
}
380380

381381
func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error {
382-
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
382+
return c.withAddrRw(addr, func(conn *conn) error {
383+
rw := conn.rw
383384
if _, err := fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
384385
return err
385386
}
386387
if err := rw.Flush(); err != nil {
387388
return err
388389
}
389-
if err := parseGetResponse(rw.Reader, cb); err != nil {
390+
if err := parseGetResponse(rw.Reader, conn, cb); err != nil {
390391
return err
391392
}
392393
return nil
@@ -395,7 +396,8 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error
395396

396397
// flushAllFromAddr send the flush_all command to the given addr
397398
func (c *Client) flushAllFromAddr(addr net.Addr) error {
398-
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
399+
return c.withAddrRw(addr, func(conn *conn) error {
400+
rw := conn.rw
399401
if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil {
400402
return err
401403
}
@@ -418,7 +420,8 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error {
418420

419421
// ping sends the version command to the given addr
420422
func (c *Client) ping(addr net.Addr) error {
421-
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
423+
return c.withAddrRw(addr, func(conn *conn) error {
424+
rw := conn.rw
422425
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
423426
return err
424427
}
@@ -441,7 +444,8 @@ func (c *Client) ping(addr net.Addr) error {
441444
}
442445

443446
func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
444-
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
447+
return c.withAddrRw(addr, func(conn *conn) error {
448+
rw := conn.rw
445449
for _, key := range keys {
446450
if _, err := fmt.Fprintf(rw, "touch %s %d\r\n", key, expiration); err != nil {
447451
return err
@@ -509,8 +513,12 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
509513

510514
// parseGetResponse reads a GET response from r and calls cb for each
511515
// read and allocated Item
512-
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
516+
func parseGetResponse(r *bufio.Reader, conn *conn, cb func(*Item)) error {
513517
for {
518+
// extend deadline before each additional call, otherwise all cumulative
519+
// calls use the same overall deadline
520+
conn.extendDeadline()
521+
514522
line, err := r.ReadSlice('\n')
515523
if err != nil {
516524
return err
@@ -694,15 +702,15 @@ func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...in
694702
// Delete deletes the item with the provided key. The error ErrCacheMiss is
695703
// returned if the item didn't already exist in the cache.
696704
func (c *Client) Delete(key string) error {
697-
return c.withKeyRw(key, func(rw *bufio.ReadWriter) error {
698-
return writeExpectf(rw, resultDeleted, "delete %s\r\n", key)
705+
return c.withKeyRw(key, func(conn *conn) error {
706+
return writeExpectf(conn.rw, resultDeleted, "delete %s\r\n", key)
699707
})
700708
}
701709

702710
// DeleteAll deletes all items in the cache.
703711
func (c *Client) DeleteAll() error {
704-
return c.withKeyRw("", func(rw *bufio.ReadWriter) error {
705-
return writeExpectf(rw, resultDeleted, "flush_all\r\n")
712+
return c.withKeyRw("", func(conn *conn) error {
713+
return writeExpectf(conn.rw, resultDeleted, "flush_all\r\n")
706714
})
707715
}
708716

@@ -733,7 +741,8 @@ func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error
733741

734742
func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
735743
var val uint64
736-
err := c.withKeyRw(key, func(rw *bufio.ReadWriter) error {
744+
err := c.withKeyRw(key, func(conn *conn) error {
745+
rw := conn.rw
737746
line, err := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta)
738747
if err != nil {
739748
return err

0 commit comments

Comments
 (0)