Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DSQueue struct {
enqueue chan []byte
clear chan chan<- int
closeTimeout time.Duration
empty chan struct{}
getn chan getRequest
name string
}
Expand All @@ -62,6 +63,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
enqueue: make(chan []byte),
clear: make(chan chan<- int),
closeTimeout: cfg.closeTimeout,
empty: make(chan struct{}),
getn: make(chan getRequest),
name: name,
}
Expand Down Expand Up @@ -94,6 +96,13 @@ func (q *DSQueue) Close() error {
return err
}

// Empty returns a channel that is signaled when the queue is empty. This is
// useful for exiting select when there are currently no more queued items to
// read.
func (q *DSQueue) Empty() <-chan struct{} {
return q.empty
}

// Put puts an item into the queue.
func (q *DSQueue) Put(item []byte) (err error) {
if len(item) == 0 {
Expand All @@ -118,13 +127,13 @@ type getResponse struct {
err error
}

// Deprecated: Use Out and Empty in a select loop to read as many items as
// wanted or until an empty signal returned.
//
// GetN retrieves up to n items that are currently available in the queue. If
// there are no items currently available, then none are returned and GetN does
// not wait for any.
//
// GetN is used to poll the DSQueue for items and return batches of those
// items. This is the most efficient way of fetching currently available items.
//
// GetN and Out can both be used to read items from the DSQueue, but they
// should not be used concurrently as items will be returned by one or the
// other indeterminately.
Expand Down Expand Up @@ -222,6 +231,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
var (
commit bool
dsEmpty bool
empty chan struct{}
err error
idle bool
)
Expand Down Expand Up @@ -271,6 +281,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
var dequeue chan []byte
if item != nil {
dequeue = q.dequeue
empty = nil
} else {
empty = q.empty
}

select {
Expand Down Expand Up @@ -301,6 +314,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
if bufferSize != 0 && inBuf.Len() >= bufferSize {
commit = true
}
case empty <- struct{}{}:

case getRequest := <-q.getn:
n := getRequest.n
rspChan := getRequest.rsp
Expand Down
63 changes: 63 additions & 0 deletions dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,69 @@ func TestBasicOperation(t *testing.T) {
}
}

func TestReadUntilEmpty(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
q := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
defer q.Close()

cids := random.Cids(29)
for _, c := range cids {
q.Put(c.Bytes())
}

var i int

loop:
for {
select {
case outItem := <-q.Out():
outCid, err := cid.Parse(outItem)
if err != nil {
t.Fatal(err)
}
if outCid != cids[i] {
t.Fatal("retrieved items out of order")
}
i++
case <-q.Empty():
break loop
}
}

if i != len(cids) {
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), i)
}

// Check still empty.
select {
case <-q.Empty():
case <-q.Out():
t.Fatal("should not have any more data")
case <-time.After(time.Second):
t.Fatal("did not receive empty signal")
}

q.Put(cids[0].Bytes())

// Check for data item.
select {
case <-q.Out():
case <-q.Empty():
t.Fatal("should not have received empty signal")
case <-time.After(time.Second):
t.Fatal("did not receive any data")
}

// Check for empty.
select {
case <-q.Empty():
case <-q.Out():
t.Fatal("should not have any more data")
case <-time.After(time.Second):
t.Fatal("did not receive empty signal")
}
}

func TestGetN(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
Expand Down
Loading