Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ job := Job{

err := manager.ScheduleJob(job)
// Handle the error

// Limit a job to execute only three times before self-removing
_, err = manager.ScheduleTask(
SomeStruct{ID: "single-task"},
15*time.Second,
WithExecLimit(3),
)
// Handle the error
```

### Logging
Expand Down
5 changes: 2 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

## TODO v0.6.0

- Add a functional option to execute a job a select amount of times, e.g. a "one-hit" or "multi-hit" job, either with immediate or delayed execution
- Add a function to instantly execute a job already in the queue, even though it has some time until next execution
- use heap.Fix to reposition the job in the heap, https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/container/heap/heap.go;l=83
- Add Task Execution Timeouts: Implement per-task timeouts to prevent indefinite hangs, using context.WithTimeout in worker goroutines and propagating deadline exceeded errors.

## Future ideas

- Add a function to instantly execute a job already in the queue, even though it has some time until next execution
- use heap.Fix to reposition the job in the heap, https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/container/heap/heap.go;l=83
- Task control
- Make tasks within grouped jobs have ID:s + add an option to remove a task from a job based on its ID
- Attach a context to a task, so that it can be cancelled and controlled in other ways
Expand Down
120 changes: 96 additions & 24 deletions executor_distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type distributedExecutor struct {
maxPar int // parallelism limit per job (0 = unlimited)
}

// pausedRunner represents a paused job runner.
type pausedRunner struct {
job Job
remaining time.Duration
Expand Down Expand Up @@ -97,6 +98,7 @@ func (e *distributedExecutor) Remove(jobID string) error {
return nil
}

// Pause pauses a job.
func (e *distributedExecutor) Pause(jobID string) error {
select {
case <-e.ctx.Done():
Expand Down Expand Up @@ -136,6 +138,7 @@ func (e *distributedExecutor) Pause(jobID string) error {
return nil
}

// Resume resumes a paused job.
func (e *distributedExecutor) Resume(jobID string) error {
select {
case <-e.ctx.Done():
Expand All @@ -154,6 +157,7 @@ func (e *distributedExecutor) Resume(jobID string) error {

job := paused.job
job.NextExec = time.Now().Add(paused.remaining)
job.initializeExecLimit()

rCtx, rCancel := context.WithCancel(e.ctx)
runner := &jobRunner{
Expand All @@ -163,11 +167,16 @@ func (e *distributedExecutor) Resume(jobID string) error {
parallel: e.parallel,
maxPar: e.maxPar,
catchUpMax: e.catchUpMax,
exec: e,
}

// Register the runner before starting its goroutine to avoid a race where the
// runner completes immediately and attempts to remove itself before it is
// present in the runners map.
e.runners.Set(job.ID, runner)
runner.wg.Add(1)
go runner.loop(e.errCh, e.taskExecChan, e.jobExecChan)

e.runners.Set(job.ID, runner)
return nil
}

Expand All @@ -186,6 +195,7 @@ func (e *distributedExecutor) Replace(job Job) error {
runner.mu.Lock()
prev := runner.job
job.NextExec = prev.NextExec
job.inheritExecLimit(&prev)
runner.job = job
runner.mu.Unlock()

Expand All @@ -206,6 +216,8 @@ func (e *distributedExecutor) Schedule(job Job) error {
return fmt.Errorf("invalid job: %w", err)
}

job.initializeExecLimit()

// Check executor context state
select {
case <-e.ctx.Done():
Expand Down Expand Up @@ -243,11 +255,16 @@ func (e *distributedExecutor) Schedule(job Job) error {
parallel: e.parallel,
maxPar: e.maxPar,
catchUpMax: e.catchUpMax,
exec: e,
}

// Register the runner before starting its goroutine to avoid a race where the
// runner completes immediately and attempts to remove itself before it is
// present in the runners map.
e.runners.Set(job.ID, runner)
runner.wg.Add(1)
go runner.loop(e.errCh, e.taskExecChan, e.jobExecChan)

e.runners.Set(job.ID, runner)
e.metrics.updateMetrics(1, len(job.Tasks), job.Cadence)

return nil
Expand Down Expand Up @@ -291,6 +308,18 @@ func (e *distributedExecutor) Stop() {
e.metrics.cancel()
}

// completeRunner removes a job runner from the executor and updates metrics.
func (e *distributedExecutor) completeRunner(r *jobRunner) {
if r == nil {
return
}
job := r.snapshotJob()
r.cancel()
if _, ok := e.runners.LoadAndDelete(job.ID); ok {
e.metrics.updateMetrics(-1, -len(job.Tasks), job.Cadence)
}
}

// jobRunner is an implementation of job runner that runs tasks in a separate goroutine.
type jobRunner struct {
mu sync.RWMutex
Expand All @@ -299,12 +328,14 @@ type jobRunner struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
exec *distributedExecutor

parallel bool // run tasks in parallel within a job
maxPar int // 0 = unlimited
catchUpMax int // max immediate catch-ups per tick when behind
}

// execute runs the job runner.
func (r *jobRunner) execute(errCh chan<- error, taskExecChan chan<- time.Duration) {
if r.parallel {
r.runParallel(errCh, taskExecChan)
Expand All @@ -313,6 +344,13 @@ func (r *jobRunner) execute(errCh chan<- error, taskExecChan chan<- time.Duratio
r.runSequential(errCh, taskExecChan)
}

// loop runs the job runner.
//
// It uses a one-shot timer that always points at the job's next scheduled execution time
// (stored in the local "next" variable). After each run, "next" is advanced by the
// cadence. If the runner fell behind schedule (e.g., the previous run took longer than
// one cadence), we allow it to "catch up" by at most catchUpMax skipped periods to avoid
// a long backlog of immediate executions.
// loop runs the job runner.
//
// It uses a one-shot timer that always points at the job's next scheduled execution time
Expand Down Expand Up @@ -349,33 +387,14 @@ func (r *jobRunner) loop(
case <-r.ctx.Done():
return
case <-timer.C:
// Execute tasks for this job tick.
r.execute(errCh, taskExecChan)

// Meter one job execution
jobExecChan <- struct{}{}

// Advance "next" forward by whole cadences until it lands in the future,
// but cap the number of immediate catch-ups to catchUpMax.
skips := 0
r.mu.RLock()
cadence := r.job.Cadence
r.mu.RUnlock()
for {
next = next.Add(cadence)
if skips >= catchUpMax || next.After(time.Now()) {
break
}
skips++
// Handle the tick in a helper to keep cognitive complexity low.
if r.handleTimerTick(errCh, taskExecChan, jobExecChan, &next, catchUpMax) {
return
}

// Reset the timer to fire at the upcoming "next" (never negative duration).
duration := max(time.Until(next), 0)

r.mu.Lock()
r.job.NextExec = next
r.mu.Unlock()

// Drain the timer channel if needed before resetting to avoid spurious wakeups.
if !timer.Stop() {
select {
Expand All @@ -388,6 +407,53 @@ func (r *jobRunner) loop(
}
}

// handleTimerTick performs the work that used to be in the timer.C case of loop.
// It returns true when the runner should stop (job exhausted or context canceled).
func (r *jobRunner) handleTimerTick(
errCh chan<- error,
taskExecChan chan<- time.Duration,
jobExecChan chan<- struct{},
next *time.Time,
catchUpMax int,
) bool {
// Execute tasks for this job tick.
r.execute(errCh, taskExecChan)

// Meter one job execution
jobExecChan <- struct{}{}

// Consume one run and check if the job is done.
r.mu.Lock()
cadence := r.job.Cadence
jobDone := r.job.consumeRun()
r.mu.Unlock()
if jobDone {
if r.exec != nil {
r.exec.completeRunner(r)
}
return true
}

// Advance "next" forward by whole cadences until it lands in the future,
// but cap the number of immediate catch-ups to catchUpMax.
skips := 0
now := time.Now()
for {
*next = (*next).Add(cadence)
if skips >= catchUpMax || (*next).After(now) {
break
}
skips++
}

// Persist next to the job under lock.
r.mu.Lock()
r.job.NextExec = *next
r.mu.Unlock()

return false
}

// runSequential runs the job sequentially.
func (r *jobRunner) runSequential(errCh chan<- error, taskExecChan chan<- time.Duration) {
r.mu.RLock()
Expand Down Expand Up @@ -434,6 +500,12 @@ func (r *jobRunner) runParallel(errCh chan<- error, taskExecChan chan<- time.Dur
wg.Wait()
}

func (r *jobRunner) snapshotJob() Job {
r.mu.RLock()
defer r.mu.RUnlock()
return r.job
}

// equalRunners returns true if the two runners have the same job ID.
func equalRunners(r1, r2 *jobRunner) bool {
r1.mu.RLock()
Expand Down
78 changes: 54 additions & 24 deletions executor_on_demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *onDemandExecutor) Remove(jobID string) error {
if err != nil {
if paused, ok := e.pausedJobs[jobID]; ok {
delete(e.pausedJobs, jobID)
e.metrics.updateMetrics(-1, -len(paused.job.Tasks), paused.job.Cadence)
e.finalizeJobRemovalLocked(paused.job)
return nil
}
return fmt.Errorf("job with ID %s not found", jobID)
Expand All @@ -100,8 +100,7 @@ func (e *onDemandExecutor) Remove(jobID string) error {
return err
}

// Update metrics
e.metrics.updateMetrics(-1, -len(job.Tasks), job.Cadence)
e.finalizeJobRemovalLocked(job)

return nil
}
Expand Down Expand Up @@ -208,6 +207,7 @@ func (e *onDemandExecutor) Replace(job Job) error {
oldJob := e.jobQueue[jobIndex]
job.NextExec = oldJob.NextExec
job.index = oldJob.index
job.inheritExecLimit(oldJob)
e.jobQueue[jobIndex] = &job

// Preserve heap invariants if ordering-related fields ever change
Expand All @@ -233,6 +233,8 @@ func (e *onDemandExecutor) Schedule(job Job) error {
return fmt.Errorf("invalid job: %w", err)
}

job.initializeExecLimit()

e.mu.Lock()
defer e.mu.Unlock()

Expand Down Expand Up @@ -415,28 +417,9 @@ func (e *onDemandExecutor) run() {
e.jobExecChan <- struct{}{}
}()

// Reschedule the job under lock, with catch-up
// Reschedule the job under lock, with catch-up, or retire if run limit reached
e.mu.Lock()
if jobPtr != nil &&
jobPtr.index < len(e.jobQueue) &&
e.jobQueue[jobPtr.index].ID == jobID {
// Advance "next" forward by whole cadences until it lands in the future,
// but cap the number of immediate catch-ups to catchUpMax.
skips := 0
catchUpMax := e.catchUpMax
if catchUpMax <= 0 {
catchUpMax = 1
}
for skips < catchUpMax {
nextExec = nextExec.Add(cadence)
if nextExec.After(now) {
break
}
skips++
}
e.jobQueue[jobPtr.index].NextExec = nextExec
heap.Fix(&e.jobQueue, jobPtr.index)
}
e.rescheduleOrRemoveAtLocked(jobPtr, nextExec, cadence, now)
e.mu.Unlock()
continue
}
Expand Down Expand Up @@ -488,6 +471,53 @@ func (e *onDemandExecutor) notifyQueueUpdate() {
}
}

// rescheduleOrRemoveAtLocked handles rescheduling or removing a job entry for the
// on-demand executor. It assumes `e.mu` is already held.
func (e *onDemandExecutor) rescheduleOrRemoveAtLocked(
jobPtr *Job,
nextExec time.Time,
cadence time.Duration,
now time.Time,
) {
if jobPtr.index < len(e.jobQueue) && e.jobQueue[jobPtr.index].ID == jobPtr.ID {
entry := e.jobQueue[jobPtr.index]
if entry.consumeRun() {
removed := heap.Remove(&e.jobQueue, jobPtr.index)
if removedJob, ok := removed.(*Job); ok {
e.finalizeJobRemovalLocked(removedJob)
}
return
}

// Advance "next" forward by whole cadences until it lands in the future,
// but cap the number of immediate catch-ups to catchUpMax.
n := nextExec
skips := 0
catchUpMax := e.catchUpMax
if catchUpMax <= 0 {
catchUpMax = 1
}
for skips < catchUpMax {
n = n.Add(cadence)
if n.After(now) {
break
}
skips++
}
e.jobQueue[jobPtr.index].NextExec = n
heap.Fix(&e.jobQueue, jobPtr.index)
}
}

// finalizeJobRemovalLocked finalizes the removal of a job from the queue. Assumes the executor lock
// is already held when called.
func (e *onDemandExecutor) finalizeJobRemovalLocked(job *Job) {
if job == nil {
return
}
e.metrics.updateMetrics(-1, -len(job.Tasks), job.Cadence)
}

// revive:enable:cyclomatic
// revive:enable:function-length
// revive:enable:cognitive-complexity
Expand Down
Loading