From fdc38b103eb7e89ba8a8f1befb85aa1507bfb8d1 Mon Sep 17 00:00:00 2001 From: Jakob Ersson <11717405+jakobilobi@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:35:12 +0100 Subject: [PATCH 1/2] fix: guard against race condition on poolScaler shutdown --- worker_pool.go | 27 ++++++++++++++++++++++++--- worker_pool_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/worker_pool.go b/worker_pool.go index 25ad19f..2ad4483 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -35,6 +35,7 @@ type workerPool struct { workersActive atomic.Int32 // Number of active workers workersRunning atomic.Int32 // Number of running workers workerCountTarget atomic.Int32 // Target number of workers + stopping atomic.Bool // Indicates whether the pool is shutting down errorChan chan<- error // Send-only channel for errors taskExecChan chan time.Duration // Channel to send execution times @@ -49,8 +50,9 @@ type workerPool struct { downScaleMinInterval time.Duration // configurable per pool; default defaultDownScaleMinInterval maxWorkers int // Maximum number of workers allowed - mu sync.Mutex - wg sync.WaitGroup + mu sync.Mutex + wg sync.WaitGroup + scalerWG sync.WaitGroup } // worker represents a worker that executes tasks. @@ -83,6 +85,9 @@ func (wp *workerPool) availableWorkers() int32 { // addWorkers adds to the worker pool by starting new workers. func (wp *workerPool) addWorkers(nWorkers int) { + if wp.stopping.Load() { + return + } wp.log.Debug().Msgf("Adding %d new workers to the pool", nWorkers) wp.wg.Add(nWorkers) for i := 0; i < nWorkers; i++ { @@ -164,6 +169,9 @@ func (wp *workerPool) busyWorkers() []xid.ID { // enqueueWorkerScaling enqueues a worker count scaling request. func (wp *workerPool) enqueueWorkerScaling(target int32) { + if wp.stopping.Load() { + return + } select { case <-wp.stopPoolChan: // Worker pool is shutting down, exit @@ -195,12 +203,20 @@ func (wp *workerPool) idleWorkers() []xid.ID { // processWorkerCountScaling listens for worker count requests and adjusts the // worker count accordingly. func (wp *workerPool) processWorkerCountScaling() { + defer wp.scalerWG.Done() + for { select { case <-wp.stopPoolChan: // Worker count scaling received stop signal, exiting return - case newTargetCount := <-wp.workerCountChan: + case newTargetCount, ok := <-wp.workerCountChan: + if !ok { + return + } + if wp.stopping.Load() { + continue + } wp.mu.Lock() wp.adjustWorkerCount(newTargetCount) wp.mu.Unlock() @@ -293,9 +309,13 @@ func (wp *workerPool) startWorker(id xid.ID) { // stop signals the worker pool to stop processing tasks and exit. func (wp *workerPool) stop() { + wp.stopping.Store(true) // Signal workers to stop close(wp.stopPoolChan) + // Wait for scaler goroutine to exit before waiting on workers. + wp.scalerWG.Wait() + // Wait for all workers to finish wp.wg.Wait() @@ -424,6 +444,7 @@ func newWorkerPool( // Record initial sizing as a scaling event to reflect startup sizing in metrics pool.workerScalingEvents.Add(1) + pool.scalerWG.Add(1) go pool.processWorkerCountScaling() return pool diff --git a/worker_pool_test.go b/worker_pool_test.go index a9775bf..dcc44e0 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -55,6 +55,36 @@ func TestWorkerPoolStartStop(t *testing.T) { assert.Equal(t, int32(4), pool.runningWorkers(), "Expected 4 running workers") } +func TestWorkerPoolStopWhileScaling(t *testing.T) { + pool := getWorkerPool(2) + var scalerWG sync.WaitGroup + stopScaler := make(chan struct{}) + + scalerWG.Add(1) + go func() { + defer scalerWG.Done() + for { + select { + case <-stopScaler: + return + default: + pool.enqueueWorkerScaling(5) + } + } + }() + + // Allow scaler goroutine to enqueue at least one request. + time.Sleep(10 * time.Millisecond) + + pool.stop() + close(stopScaler) + scalerWG.Wait() + + assert.True(t, pool.stopping.Load(), "Pool should be marked as stopping") + assert.Equal(t, int32(0), pool.runningWorkers(), "Expected no running workers after stop") + assert.Equal(t, int32(0), pool.activeWorkers(), "Expected no active workers after stop") +} + func TestWorkerPoolTaskExecution(t *testing.T) { errorChan := make(chan error, 1) taskExecChan := make(chan time.Duration, 1) From 3fc747c8a6e358b219bc42fac94551278348a6df Mon Sep 17 00:00:00 2001 From: Jakob Ersson <11717405+jakobilobi@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:49:08 +0100 Subject: [PATCH 2/2] ci: update release job --- .github/workflows/release.yml | 140 +++++++++++++++++++++++++++------- 1 file changed, 111 insertions(+), 29 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 601c762..686d5d9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -32,34 +32,38 @@ jobs: - name: Version bump check shell: bash run: | + set -euo pipefail + raw="${{ inputs.version }}" NEW_VERSION="${raw#v}" # normalized - # basic semver check: X.Y.Z with optional prerelease/build + # basic semver check: X.Y.Z with optional prerelease, but we only accept bare X.Y.Z for base version if [[ ! "$NEW_VERSION" =~ ^[0-9]+(\.[0-9]+){2}([\-+][0-9A-Za-z\.-]+)?$ ]]; then echo "::error::Input must be SemVer like 0.5.0 or 1.2.3-rc.1" exit 1 fi - LATEST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0") - OLD_VERSION="${LATEST_TAG#v}" # normalized + # latest FINAL tag: strictly vX.Y.Z (ignore all prereleases) + LATEST_FINAL_TAG=$(git tag --list 'v[0-9]*' \ + | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' \ + | sort -V | tail -n1 || true) + LATEST_FINAL_TAG=${LATEST_FINAL_TAG:-v0.0.0} + OLD_FINAL_VERSION=${LATEST_FINAL_TAG#v} - echo "Latest tag: $LATEST_TAG" - echo "Input version: $NEW_VERSION" + echo "Latest FINAL tag: $LATEST_FINAL_TAG" + echo "VERSION file: $NEW_VERSION" - if [[ "$NEW_VERSION" == "$OLD_VERSION" ]]; then - echo "::error title=VERSION check::VERSION not bumped (still $NEW_VERSION)" + # STRICT: VERSION must be > latest final (no equality) + if [[ "$NEW_VERSION" == "$OLD_FINAL_VERSION" ]]; then + echo "::error::VERSION ($NEW_VERSION) must be greater than latest final ($OLD_FINAL_VERSION)" exit 1 fi - - if [[ "$(printf '%s\n' "$OLD_VERSION" "$NEW_VERSION" | sort -V | tail -n1)" != "$OLD_VERSION" ]]; then - : # OK, NEW_VERSION > OLD_VERSION - else - echo "::error::New version ($NEW_VERSION) must be greater than old ($OLD_VERSION)" + if [[ "$(printf '%s\n' "$OLD_FINAL_VERSION" "$NEW_VERSION" | sort -V | tail -n1)" != "$NEW_VERSION" ]]; then + echo "::error::VERSION ($NEW_VERSION) must be greater than latest final ($OLD_FINAL_VERSION)" exit 1 fi - echo "Version bump OK: $OLD_VERSION → $NEW_VERSION" + echo "VERSION is newer than latest final: $OLD_FINAL_VERSION → $NEW_VERSION" release: name: Test, tag & release @@ -82,30 +86,93 @@ jobs: - name: Run tests run: go test -mod=readonly ./... -count=1 -race - - name: Normalize version - id: ver + - name: Compute tag (final or next -rc.N) + id: compute_tag shell: bash run: | + set -euo pipefail + + PRERELEASE="${{ inputs.prerelease }}" raw="${{ inputs.version }}" - ver="${raw#v}" - echo "version=$ver" >> "$GITHUB_OUTPUT" - echo "tag=v$ver" >> "$GITHUB_OUTPUT" + VERSION="${raw#v}" # normalized base: X.Y.Z + + # existence helpers + final_exists() { git rev-parse "v${VERSION}" >/dev/null 2>&1; } + any_rc_tags() { git tag --list "v${VERSION}-rc*" | grep -q .; } + + # find next rc number (supports both -rc1 and -rc.1 historical patterns) + next_rc_tag() { + local base="$1" + local maxn + maxn=$(git tag --list "v${base}-rc*" \ + | sed -E 's/^.*-rc\.?([0-9]+)$/\1/' \ + | grep -E '^[0-9]+$' \ + | sort -n | tail -n1 || true) + if [[ -z "${maxn:-}" ]]; then + echo "v${base}-rc.1" + else + echo "v${base}-rc.$((maxn+1))" + fi + } + + # guard: do not allow prerelease/final if final already exists for this version + if final_exists; then + if [[ "$PRERELEASE" == "true" ]]; then + echo "::error::Cannot create prerelease: final tag v${VERSION} already exists" + else + echo "::error::Final tag v${VERSION} already exists; bump VERSION first" + fi + exit 1 + fi + + if [[ "$PRERELEASE" == "true" ]]; then + TAG="$(next_rc_tag "$VERSION")" + IS_PRE="true" + else + TAG="v${VERSION}" + IS_PRE="false" + fi + + # final guard: avoid accidental retagging if somehow exists + if git rev-parse "$TAG" >/dev/null 2>&1; then + echo "::error::Tag $TAG already exists" + exit 1 + fi + + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + echo "tag=$TAG" >> "$GITHUB_OUTPUT" + echo "is_prerelease=$IS_PRE" >> "$GITHUB_OUTPUT" + + echo "Computed tag: $TAG (prerelease=$IS_PRE)" - name: Create Git tag shell: bash run: | - TAG="${{ steps.ver.outputs.tag }}" + set -euo pipefail + TAG="${{ steps.compute_tag.outputs.tag }}" git config user.name "github-actions[bot]" git config user.email "github-actions[bot]@users.noreply.github.com" + if git rev-parse "$TAG" >/dev/null 2>&1; then - echo "::error::Tag $TAG already exists" - exit 1 + echo "Tag $TAG already exists; skipping tag creation." + else + git tag -a "$TAG" -m "chore(release): $TAG" + git push origin "$TAG" fi - git tag -a "$TAG" -m "chore(release): $TAG" - git push origin "$TAG" - - name: Generate changelog for the new tag - id: git-cliff + - name: Find previous final tag (-rc.N excluded) + id: prev_final + shell: bash + run: | + set -euo pipefail + PREV=$(git tag --list 'v[0-9]*' \ + | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' \ + | sort -V | tail -n2 | head -n1 || true) + echo "prev=$PREV" >> "$GITHUB_OUTPUT" + + - name: Generate changelog (RC) + id: git-cliff-rc + if: ${{ steps.compute_tag.outputs.is_prerelease == 'true' }} uses: orhun/git-cliff-action@v4 with: args: --current --no-exec @@ -113,12 +180,27 @@ jobs: GITHUB_REPO: ${{ github.repository }} GITHUB_TOKEN: ${{ github.token }} + - name: Generate changelog (final release) + id: git-cliff-final + if: ${{ steps.compute_tag.outputs.is_prerelease == 'false' }} + uses: orhun/git-cliff-action@v4 + with: + # If no previous final tag, this produces an empty range; optionally + # fall back to the root commit if you want *all* history instead. + args: "${{ steps.prev_final.outputs.prev }}..${{ steps.compute_tag.outputs.tag }} --no-exec" + env: + GITHUB_REPO: ${{ github.repository }} + GITHUB_TOKEN: ${{ github.token }} + - name: Publish GitHub Release uses: softprops/action-gh-release@v2 with: - tag_name: ${{ steps.ver.outputs.tag }} - name: taskman ${{ steps.ver.outputs.tag }} - body: ${{ steps.git-cliff.outputs.content }} - prerelease: ${{ inputs.prerelease }} + tag_name: ${{ steps.compute_tag.outputs.tag }} + name: taskman ${{ steps.compute_tag.outputs.tag }} + body: ${{ steps.compute_tag.outputs.is_prerelease == 'true' + && steps.git-cliff-rc.outputs.content + || steps.git-cliff-final.outputs.content }} + prerelease: ${{ steps.compute_tag.outputs.is_prerelease }} + make_latest: ${{ steps.compute_tag.outputs.is_prerelease == 'false' }} # finals only env: GITHUB_TOKEN: ${{ github.token }}