Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 111 additions & 29 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,38 @@ jobs:
- name: Version bump check
shell: bash
run: |
set -euo pipefail

raw="${{ inputs.version }}"
NEW_VERSION="${raw#v}" # normalized

# basic semver check: X.Y.Z with optional prerelease/build
# basic semver check: X.Y.Z with optional prerelease, but we only accept bare X.Y.Z for base version
if [[ ! "$NEW_VERSION" =~ ^[0-9]+(\.[0-9]+){2}([\-+][0-9A-Za-z\.-]+)?$ ]]; then
echo "::error::Input must be SemVer like 0.5.0 or 1.2.3-rc.1"
exit 1
fi

LATEST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
OLD_VERSION="${LATEST_TAG#v}" # normalized
# latest FINAL tag: strictly vX.Y.Z (ignore all prereleases)
LATEST_FINAL_TAG=$(git tag --list 'v[0-9]*' \
| grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' \
| sort -V | tail -n1 || true)
LATEST_FINAL_TAG=${LATEST_FINAL_TAG:-v0.0.0}
OLD_FINAL_VERSION=${LATEST_FINAL_TAG#v}

echo "Latest tag: $LATEST_TAG"
echo "Input version: $NEW_VERSION"
echo "Latest FINAL tag: $LATEST_FINAL_TAG"
echo "VERSION file: $NEW_VERSION"

if [[ "$NEW_VERSION" == "$OLD_VERSION" ]]; then
echo "::error title=VERSION check::VERSION not bumped (still $NEW_VERSION)"
# STRICT: VERSION must be > latest final (no equality)
if [[ "$NEW_VERSION" == "$OLD_FINAL_VERSION" ]]; then
echo "::error::VERSION ($NEW_VERSION) must be greater than latest final ($OLD_FINAL_VERSION)"
exit 1
fi

if [[ "$(printf '%s\n' "$OLD_VERSION" "$NEW_VERSION" | sort -V | tail -n1)" != "$OLD_VERSION" ]]; then
: # OK, NEW_VERSION > OLD_VERSION
else
echo "::error::New version ($NEW_VERSION) must be greater than old ($OLD_VERSION)"
if [[ "$(printf '%s\n' "$OLD_FINAL_VERSION" "$NEW_VERSION" | sort -V | tail -n1)" != "$NEW_VERSION" ]]; then
echo "::error::VERSION ($NEW_VERSION) must be greater than latest final ($OLD_FINAL_VERSION)"
exit 1
fi

echo "Version bump OK: $OLD_VERSION → $NEW_VERSION"
echo "VERSION is newer than latest final: $OLD_FINAL_VERSION → $NEW_VERSION"

release:
name: Test, tag & release
Expand All @@ -82,43 +86,121 @@ jobs:
- name: Run tests
run: go test -mod=readonly ./... -count=1 -race

- name: Normalize version
id: ver
- name: Compute tag (final or next -rc.N)
id: compute_tag
shell: bash
run: |
set -euo pipefail

PRERELEASE="${{ inputs.prerelease }}"
raw="${{ inputs.version }}"
ver="${raw#v}"
echo "version=$ver" >> "$GITHUB_OUTPUT"
echo "tag=v$ver" >> "$GITHUB_OUTPUT"
VERSION="${raw#v}" # normalized base: X.Y.Z

# existence helpers
final_exists() { git rev-parse "v${VERSION}" >/dev/null 2>&1; }
any_rc_tags() { git tag --list "v${VERSION}-rc*" | grep -q .; }

# find next rc number (supports both -rc1 and -rc.1 historical patterns)
next_rc_tag() {
local base="$1"
local maxn
maxn=$(git tag --list "v${base}-rc*" \
| sed -E 's/^.*-rc\.?([0-9]+)$/\1/' \
| grep -E '^[0-9]+$' \
| sort -n | tail -n1 || true)
if [[ -z "${maxn:-}" ]]; then
echo "v${base}-rc.1"
else
echo "v${base}-rc.$((maxn+1))"
fi
}

# guard: do not allow prerelease/final if final already exists for this version
if final_exists; then
if [[ "$PRERELEASE" == "true" ]]; then
echo "::error::Cannot create prerelease: final tag v${VERSION} already exists"
else
echo "::error::Final tag v${VERSION} already exists; bump VERSION first"
fi
exit 1
fi

if [[ "$PRERELEASE" == "true" ]]; then
TAG="$(next_rc_tag "$VERSION")"
IS_PRE="true"
else
TAG="v${VERSION}"
IS_PRE="false"
fi

# final guard: avoid accidental retagging if somehow exists
if git rev-parse "$TAG" >/dev/null 2>&1; then
echo "::error::Tag $TAG already exists"
exit 1
fi

echo "version=$VERSION" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
echo "is_prerelease=$IS_PRE" >> "$GITHUB_OUTPUT"

echo "Computed tag: $TAG (prerelease=$IS_PRE)"

- name: Create Git tag
shell: bash
run: |
TAG="${{ steps.ver.outputs.tag }}"
set -euo pipefail
TAG="${{ steps.compute_tag.outputs.tag }}"
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"

if git rev-parse "$TAG" >/dev/null 2>&1; then
echo "::error::Tag $TAG already exists"
exit 1
echo "Tag $TAG already exists; skipping tag creation."
else
git tag -a "$TAG" -m "chore(release): $TAG"
git push origin "$TAG"
fi
git tag -a "$TAG" -m "chore(release): $TAG"
git push origin "$TAG"

- name: Generate changelog for the new tag
id: git-cliff
- name: Find previous final tag (-rc.N excluded)
id: prev_final
shell: bash
run: |
set -euo pipefail
PREV=$(git tag --list 'v[0-9]*' \
| grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' \
| sort -V | tail -n2 | head -n1 || true)
echo "prev=$PREV" >> "$GITHUB_OUTPUT"

- name: Generate changelog (RC)
id: git-cliff-rc
if: ${{ steps.compute_tag.outputs.is_prerelease == 'true' }}
uses: orhun/git-cliff-action@v4
with:
args: --current --no-exec
env:
GITHUB_REPO: ${{ github.repository }}
GITHUB_TOKEN: ${{ github.token }}

- name: Generate changelog (final release)
id: git-cliff-final
if: ${{ steps.compute_tag.outputs.is_prerelease == 'false' }}
uses: orhun/git-cliff-action@v4
with:
# If no previous final tag, this produces an empty range; optionally
# fall back to the root commit if you want *all* history instead.
args: "${{ steps.prev_final.outputs.prev }}..${{ steps.compute_tag.outputs.tag }} --no-exec"
env:
GITHUB_REPO: ${{ github.repository }}
GITHUB_TOKEN: ${{ github.token }}

- name: Publish GitHub Release
uses: softprops/action-gh-release@v2
with:
tag_name: ${{ steps.ver.outputs.tag }}
name: taskman ${{ steps.ver.outputs.tag }}
body: ${{ steps.git-cliff.outputs.content }}
prerelease: ${{ inputs.prerelease }}
tag_name: ${{ steps.compute_tag.outputs.tag }}
name: taskman ${{ steps.compute_tag.outputs.tag }}
body: ${{ steps.compute_tag.outputs.is_prerelease == 'true'
&& steps.git-cliff-rc.outputs.content
|| steps.git-cliff-final.outputs.content }}
prerelease: ${{ steps.compute_tag.outputs.is_prerelease }}
make_latest: ${{ steps.compute_tag.outputs.is_prerelease == 'false' }} # finals only
env:
GITHUB_TOKEN: ${{ github.token }}
27 changes: 24 additions & 3 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type workerPool struct {
workersActive atomic.Int32 // Number of active workers
workersRunning atomic.Int32 // Number of running workers
workerCountTarget atomic.Int32 // Target number of workers
stopping atomic.Bool // Indicates whether the pool is shutting down

errorChan chan<- error // Send-only channel for errors
taskExecChan chan time.Duration // Channel to send execution times
Expand All @@ -49,8 +50,9 @@ type workerPool struct {
downScaleMinInterval time.Duration // configurable per pool; default defaultDownScaleMinInterval
maxWorkers int // Maximum number of workers allowed

mu sync.Mutex
wg sync.WaitGroup
mu sync.Mutex
wg sync.WaitGroup
scalerWG sync.WaitGroup
}

// worker represents a worker that executes tasks.
Expand Down Expand Up @@ -83,6 +85,9 @@ func (wp *workerPool) availableWorkers() int32 {

// addWorkers adds to the worker pool by starting new workers.
func (wp *workerPool) addWorkers(nWorkers int) {
if wp.stopping.Load() {
return
}
wp.log.Debug().Msgf("Adding %d new workers to the pool", nWorkers)
wp.wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
Expand Down Expand Up @@ -164,6 +169,9 @@ func (wp *workerPool) busyWorkers() []xid.ID {

// enqueueWorkerScaling enqueues a worker count scaling request.
func (wp *workerPool) enqueueWorkerScaling(target int32) {
if wp.stopping.Load() {
return
}
select {
case <-wp.stopPoolChan:
// Worker pool is shutting down, exit
Expand Down Expand Up @@ -195,12 +203,20 @@ func (wp *workerPool) idleWorkers() []xid.ID {
// processWorkerCountScaling listens for worker count requests and adjusts the
// worker count accordingly.
func (wp *workerPool) processWorkerCountScaling() {
defer wp.scalerWG.Done()

for {
select {
case <-wp.stopPoolChan:
// Worker count scaling received stop signal, exiting
return
case newTargetCount := <-wp.workerCountChan:
case newTargetCount, ok := <-wp.workerCountChan:
if !ok {
return
}
if wp.stopping.Load() {
continue
}
wp.mu.Lock()
wp.adjustWorkerCount(newTargetCount)
wp.mu.Unlock()
Expand Down Expand Up @@ -293,9 +309,13 @@ func (wp *workerPool) startWorker(id xid.ID) {

// stop signals the worker pool to stop processing tasks and exit.
func (wp *workerPool) stop() {
wp.stopping.Store(true)
// Signal workers to stop
close(wp.stopPoolChan)

// Wait for scaler goroutine to exit before waiting on workers.
wp.scalerWG.Wait()

// Wait for all workers to finish
wp.wg.Wait()

Expand Down Expand Up @@ -424,6 +444,7 @@ func newWorkerPool(
// Record initial sizing as a scaling event to reflect startup sizing in metrics
pool.workerScalingEvents.Add(1)

pool.scalerWG.Add(1)
go pool.processWorkerCountScaling()

return pool
Expand Down
30 changes: 30 additions & 0 deletions worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ func TestWorkerPoolStartStop(t *testing.T) {
assert.Equal(t, int32(4), pool.runningWorkers(), "Expected 4 running workers")
}

func TestWorkerPoolStopWhileScaling(t *testing.T) {
pool := getWorkerPool(2)
var scalerWG sync.WaitGroup
stopScaler := make(chan struct{})

scalerWG.Add(1)
go func() {
defer scalerWG.Done()
for {
select {
case <-stopScaler:
return
default:
pool.enqueueWorkerScaling(5)
}
}
}()

// Allow scaler goroutine to enqueue at least one request.
time.Sleep(10 * time.Millisecond)

pool.stop()
close(stopScaler)
scalerWG.Wait()

assert.True(t, pool.stopping.Load(), "Pool should be marked as stopping")
assert.Equal(t, int32(0), pool.runningWorkers(), "Expected no running workers after stop")
assert.Equal(t, int32(0), pool.activeWorkers(), "Expected no active workers after stop")
}

func TestWorkerPoolTaskExecution(t *testing.T) {
errorChan := make(chan error, 1)
taskExecChan := make(chan time.Duration, 1)
Expand Down