diff --git a/dsqueue.go b/dsqueue.go index 8aef441..b4db352 100644 --- a/dsqueue.go +++ b/dsqueue.go @@ -44,6 +44,7 @@ type DSQueue struct { enqueue chan []byte clear chan chan<- int closeTimeout time.Duration + empty chan struct{} getn chan getRequest name string } @@ -62,6 +63,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue { enqueue: make(chan []byte), clear: make(chan chan<- int), closeTimeout: cfg.closeTimeout, + empty: make(chan struct{}), getn: make(chan getRequest), name: name, } @@ -94,6 +96,13 @@ func (q *DSQueue) Close() error { return err } +// Empty returns a channel that is signaled when the queue is empty. This is +// useful for exiting select when there are currently no more queued items to +// read. +func (q *DSQueue) Empty() <-chan struct{} { + return q.empty +} + // Put puts an item into the queue. func (q *DSQueue) Put(item []byte) (err error) { if len(item) == 0 { @@ -118,13 +127,13 @@ type getResponse struct { err error } +// Deprecated: Use Out and Empty in a select loop to read as many items as +// wanted or until an empty signal returned. +// // GetN retrieves up to n items that are currently available in the queue. If // there are no items currently available, then none are returned and GetN does // not wait for any. // -// GetN is used to poll the DSQueue for items and return batches of those -// items. This is the most efficient way of fetching currently available items. -// // GetN and Out can both be used to read items from the DSQueue, but they // should not be used concurrently as items will be returned by one or the // other indeterminately. @@ -222,6 +231,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id var ( commit bool dsEmpty bool + empty chan struct{} err error idle bool ) @@ -271,6 +281,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id var dequeue chan []byte if item != nil { dequeue = q.dequeue + empty = nil + } else { + empty = q.empty } select { @@ -301,6 +314,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id if bufferSize != 0 && inBuf.Len() >= bufferSize { commit = true } + case empty <- struct{}{}: + case getRequest := <-q.getn: n := getRequest.n rspChan := getRequest.rsp diff --git a/dsqueue_test.go b/dsqueue_test.go index a6f3768..c0abfbc 100644 --- a/dsqueue_test.go +++ b/dsqueue_test.go @@ -103,6 +103,69 @@ func TestBasicOperation(t *testing.T) { } } +func TestReadUntilEmpty(t *testing.T) { + ds := sync.MutexWrap(datastore.NewMapDatastore()) + q := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0)) + defer q.Close() + + cids := random.Cids(29) + for _, c := range cids { + q.Put(c.Bytes()) + } + + var i int + +loop: + for { + select { + case outItem := <-q.Out(): + outCid, err := cid.Parse(outItem) + if err != nil { + t.Fatal(err) + } + if outCid != cids[i] { + t.Fatal("retrieved items out of order") + } + i++ + case <-q.Empty(): + break loop + } + } + + if i != len(cids) { + t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), i) + } + + // Check still empty. + select { + case <-q.Empty(): + case <-q.Out(): + t.Fatal("should not have any more data") + case <-time.After(time.Second): + t.Fatal("did not receive empty signal") + } + + q.Put(cids[0].Bytes()) + + // Check for data item. + select { + case <-q.Out(): + case <-q.Empty(): + t.Fatal("should not have received empty signal") + case <-time.After(time.Second): + t.Fatal("did not receive any data") + } + + // Check for empty. + select { + case <-q.Empty(): + case <-q.Out(): + t.Fatal("should not have any more data") + case <-time.After(time.Second): + t.Fatal("did not receive empty signal") + } +} + func TestGetN(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))