Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,34 @@ func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) {
}
httpClient := &http.Client{Transport: tr, Timeout: time.Second * 3}

return &HTTP{config: conf,
httpClient: httpClient,
semaphore: make(chan struct{}, conf.Concurrency)}, nil
}

func (s *HTTP) pullRequest(batchSize int) (*http.Request, error) {
if batchSize > s.config.MaxResults {
batchSize = s.config.MaxResults
}
if batchSize < 1 {
batchSize = 1
}
urlHttp := url.URL{
Scheme: "https",
Host: conf.Host,
Path: fmt.Sprintf("%s/%d", conf.Path, conf.MaxResults),
Host: s.config.Host,
Path: fmt.Sprintf("%s/%d", s.config.Path, batchSize),
}

httpRequest, _ := http.NewRequest(http.MethodGet, urlHttp.String(), nil)
// inject Bearer if token is set
if conf.Token != "" {
httpRequest.Header.Set("Authorization", fmt.Sprintf("Bearer %s", conf.Token))
req, err := http.NewRequest(http.MethodGet, urlHttp.String(), nil)
if err != nil {
return nil, err
}
// inject Scanner ID if set
if conf.Id != "" {
httpRequest.Header.Set("X-Panop-Scanner", conf.Id)
if s.config.Token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.Token))
}

return &HTTP{config: conf,
httpClient: httpClient,
httpReq: httpRequest,
semaphore: make(chan struct{}, conf.Concurrency)}, nil
if s.config.Id != "" {
req.Header.Set("X-Panop-Scanner", s.config.Id)
}
return req, nil
}

func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error {
Expand All @@ -116,11 +124,18 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error {
return nil

default:
// Only poll SQS when we have semaphore capacity
availableSlots := cap(s.semaphore) - len(s.semaphore)
if availableSlots >= s.config.MaxResults {
slog.Info("Polling messages from Tower, available slots : ", slog.Int("availableSlots", availableSlots))
result, err := s.httpClient.Do(s.httpReq)
if availableSlots > 0 {
batchSize := availableSlots
if batchSize > s.config.MaxResults {
batchSize = s.config.MaxResults
}
req, err := s.pullRequest(batchSize)
if err != nil {
return err
}
slog.Info("Polling messages from Tower", slog.Int("availableSlots", availableSlots), slog.Int("batchSize", batchSize))
result, err := s.httpClient.Do(req)
if err != nil {
slog.Error("error during call", "error", err, "sentinel", SentinelApplicationError)
if err := s.sleepOnError(ctx); err != nil {
Expand Down
1 change: 0 additions & 1 deletion consumer/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type HttpClient interface {
type HTTP struct {
config *HttpConf
httpClient HttpClient
httpReq *http.Request
semaphore chan struct{}
errorCount int // Counter for consecutive errors to implement logarithmic backoff
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ducksify/http-consume

go 1.24.2
go 1.25.7

require (
github.com/stretchr/testify v1.10.0
Expand Down
Loading