From aae36db39b407ff0d0ad9790635fd51656a3d6e7 Mon Sep 17 00:00:00 2001 From: "fegger@ducksify.com" Date: Sun, 24 May 2026 12:57:50 +0200 Subject: [PATCH 1/2] chore(go): bump Gochain to1.25.7 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e039dd5..6bae953 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ducksify/http-consume -go 1.24.2 +go 1.25.7 require ( github.com/stretchr/testify v1.10.0 From 23bff68ab45adf897c6dfd280da0e5b9672e2a4b Mon Sep 17 00:00:00 2001 From: "fegger@ducksify.com" Date: Sun, 24 May 2026 13:02:13 +0200 Subject: [PATCH 2/2] feat(consumer): HTTP pulls dynamic and thread-safeAdjust HTTP consumer to build requests per-poll and respect availableconcurrency and requested batch sizes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What changed Poll when any slot is free (availableSlots > 0) Request size = min(availableSlots, MaxResults) per poll via a new pullRequest(batchSize) method (path .../path/{batchSize}) Removed the static httpReq field — each poll builds a request with the right batch size and headers --- consumer/consumer.go | 53 ++++++++++++++++++++++++++++---------------- consumer/models.go | 1 - 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 4e2f13d..c55b000 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -76,26 +76,34 @@ func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) { } httpClient := &http.Client{Transport: tr, Timeout: time.Second * 3} + return &HTTP{config: conf, + httpClient: httpClient, + semaphore: make(chan struct{}, conf.Concurrency)}, nil +} + +func (s *HTTP) pullRequest(batchSize int) (*http.Request, error) { + if batchSize > s.config.MaxResults { + batchSize = s.config.MaxResults + } + if batchSize < 1 { + batchSize = 1 + } urlHttp := url.URL{ Scheme: "https", - Host: conf.Host, - Path: fmt.Sprintf("%s/%d", conf.Path, conf.MaxResults), + Host: s.config.Host, + Path: fmt.Sprintf("%s/%d", s.config.Path, batchSize), } - - httpRequest, _ := http.NewRequest(http.MethodGet, urlHttp.String(), nil) - // inject Bearer if token is set - if conf.Token != "" { - httpRequest.Header.Set("Authorization", fmt.Sprintf("Bearer %s", conf.Token)) + req, err := http.NewRequest(http.MethodGet, urlHttp.String(), nil) + if err != nil { + return nil, err } - // inject Scanner ID if set - if conf.Id != "" { - httpRequest.Header.Set("X-Panop-Scanner", conf.Id) + if s.config.Token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.Token)) } - - return &HTTP{config: conf, - httpClient: httpClient, - httpReq: httpRequest, - semaphore: make(chan struct{}, conf.Concurrency)}, nil + if s.config.Id != "" { + req.Header.Set("X-Panop-Scanner", s.config.Id) + } + return req, nil } func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error { @@ -116,11 +124,18 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error { return nil default: - // Only poll SQS when we have semaphore capacity availableSlots := cap(s.semaphore) - len(s.semaphore) - if availableSlots >= s.config.MaxResults { - slog.Info("Polling messages from Tower, available slots : ", slog.Int("availableSlots", availableSlots)) - result, err := s.httpClient.Do(s.httpReq) + if availableSlots > 0 { + batchSize := availableSlots + if batchSize > s.config.MaxResults { + batchSize = s.config.MaxResults + } + req, err := s.pullRequest(batchSize) + if err != nil { + return err + } + slog.Info("Polling messages from Tower", slog.Int("availableSlots", availableSlots), slog.Int("batchSize", batchSize)) + result, err := s.httpClient.Do(req) if err != nil { slog.Error("error during call", "error", err, "sentinel", SentinelApplicationError) if err := s.sleepOnError(ctx); err != nil { diff --git a/consumer/models.go b/consumer/models.go index 2b7d424..e0d7e27 100644 --- a/consumer/models.go +++ b/consumer/models.go @@ -46,7 +46,6 @@ type HttpClient interface { type HTTP struct { config *HttpConf httpClient HttpClient - httpReq *http.Request semaphore chan struct{} errorCount int // Counter for consecutive errors to implement logarithmic backoff }