Skip to content

Commit e761c22

Browse files
authored
Refactor and optimise the task queue (#57)
1 parent 15f9347 commit e761c22

1 file changed

Lines changed: 18 additions & 14 deletions

File tree

scraper/taskQueue.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package scraper
22

33
import (
44
"bdo-rest-api/utils"
5-
"slices"
65
"sync"
76
"time"
87
)
@@ -15,7 +14,8 @@ type Task struct {
1514

1615
type TaskQueue struct {
1716
clientIPs map[string]int
18-
hashes []string
17+
cond *sync.Cond
18+
hashSet map[string]struct{}
1919
mutex sync.Mutex
2020
paused bool
2121
processFunc func(Task)
@@ -25,9 +25,11 @@ type TaskQueue struct {
2525
func NewTaskQueue(bufferSize int) *TaskQueue {
2626
queue := &TaskQueue{
2727
clientIPs: make(map[string]int),
28-
paused: false,
28+
hashSet: make(map[string]struct{}),
2929
tasks: make(chan Task, bufferSize),
3030
}
31+
queue.cond = sync.NewCond(&queue.mutex)
32+
3133
go queue.run()
3234
return queue
3335
}
@@ -39,35 +41,35 @@ func (q *TaskQueue) AddTask(clientIP, hash, url string) (added bool) {
3941
})
4042

4143
q.mutex.Lock()
42-
if duplicate := slices.Contains(q.hashes, hash); duplicate {
44+
if _, exists := q.hashSet[hash]; exists {
4345
q.mutex.Unlock()
4446
return false
4547
}
48+
49+
q.hashSet[hash] = struct{}{}
4650
q.clientIPs[clientIP]++
47-
q.hashes = append(q.hashes, hash)
4851
q.mutex.Unlock()
4952

5053
q.tasks <- Task{
5154
ClientIP: clientIP,
5255
Hash: hash,
5356
URL: fullURL,
5457
}
55-
5658
return true
5759
}
5860

5961
func (q *TaskQueue) run() {
6062
for task := range q.tasks {
6163
q.mutex.Lock()
6264
for q.paused {
63-
q.mutex.Unlock()
64-
// FIXME: This is probably inefficient af
65-
time.Sleep(time.Second)
66-
q.mutex.Lock()
65+
q.cond.Wait()
6766
}
67+
process := q.processFunc
6868
q.mutex.Unlock()
6969

70-
q.processFunc(task)
70+
if process != nil {
71+
process(task)
72+
}
7173
}
7274
}
7375

@@ -87,6 +89,8 @@ func (q *TaskQueue) Pause(t time.Duration) {
8789
q.mutex.Lock()
8890
q.paused = false
8991
q.mutex.Unlock()
92+
93+
q.cond.Broadcast()
9094
}
9195

9296
func (q *TaskQueue) CountQueuedTasksForClient(clientIP string) (count int) {
@@ -99,9 +103,9 @@ func (q *TaskQueue) CountQueuedTasksForClient(clientIP string) (count int) {
99103

100104
func (q *TaskQueue) ConfirmTaskCompletion(clientIP string, hash string) {
101105
q.mutex.Lock()
102-
q.clientIPs[clientIP] = max(0, q.clientIPs[clientIP]-1)
103-
for i := slices.Index(q.hashes, hash); i != -1; i = slices.Index(q.hashes, hash) {
104-
q.hashes = slices.Delete(q.hashes, i, i+1)
106+
if q.clientIPs[clientIP] > 0 {
107+
q.clientIPs[clientIP]--
105108
}
109+
delete(q.hashSet, hash)
106110
q.mutex.Unlock()
107111
}

0 commit comments

Comments
 (0)