From d200f35a122bedef9fbda872ee41ec52372d22fe Mon Sep 17 00:00:00 2001 From: "fegger@ducksify.com" Date: Mon, 18 Aug 2025 15:06:19 +0200 Subject: [PATCH 1/3] (feat): enhance HTTP consumer with rate limiting, concurrency control, and custom parsers; update README for usage examples and configuration details --- README.md | 147 ++++++++++++++++++++++++++++------- consumer/consumer.go | 178 +++++++++++++++++++++++++++++++++++++++---- consumer/models.go | 31 ++++++-- 3 files changed, 308 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index c21661b..e1c6f2b 100644 --- a/README.md +++ b/README.md @@ -1,40 +1,131 @@ -# Http Consume +# HTTP Consumer -this consumer get messages from a http endpoint and process them concurrently +A flexible HTTP consumer with support for multiple results, rate limiting, and controlled concurrency. -### Envvar -Asume these variables are set -- HTTP_TOKEN +## Features + +- **Rate Limiting**: Control the frequency of HTTP requests +- **Multiple Results**: Process multiple results from a single HTTP response +- **Worker Pools**: Separate workers for HTTP requests and result processing +- **Custom Parsers**: Define custom functions to parse response bodies +- **Concurrency Control**: Manage both HTTP request concurrency and result processing concurrency + +## Configuration -### Example ```go -package main +type HttpConf struct { + Host string // Target host + Path string // API path + Token string // Bearer token + Id string // Scanner ID + Concurrency int // Number of HTTP request workers + TimeOutSeconds int32 // HTTP timeout + SleepTime int32 // Sleep time on 404 (milliseconds) + MaxResults int // Max results per request + RateLimit time.Duration // Minimum time between HTTP requests + WorkerPoolSize int // Number of result processing workers + ParserFn ParserFn // Custom parser function +} +``` -import ( - "context" - "fmt" - "os" - "time" +## Usage Examples - "github.com/ducksify/http-consume/consumer" -) +### Basic Usage -func test(b []byte) error { - fmt.Println(string(b)) - time.Sleep(time.Second * 30) - return nil +```go +conf := &consumer.HttpConf{ + Host: "api.example.com", + Path: "/data", + Token: "your-token", + Concurrency: 2, + RateLimit: 2 * time.Second, + WorkerPoolSize: 3, +} + +consumer, err := consumer.NewHTTPConsumer(conf) +if err != nil { + log.Fatal(err) } -func main() { - httpConf := consumer.HttpConf{ - Host: "tower.panop.io", - Path: "/foo/bar", - Token: "xyz", +err = consumer.Start(context.Background(), func(data []byte) error { + // Process each result + fmt.Printf("Processing: %s\n", string(data)) + return nil +}) +``` + +### With Custom Parser + +```go +// Custom parser for JSON array responses +func jsonArrayParser(body []byte) ([]consumer.Result, error) { + var data []interface{} + if err := json.Unmarshal(body, &data); err != nil { + return nil, err } - c, err := consumer.NewHTTPConsumer(&httpConf) - if err != nil { - os.Exit(1) + + var results []consumer.Result + for _, item := range data { + itemBytes, _ := json.Marshal(item) + results = append(results, consumer.Result{Data: itemBytes}) } - c.Start(context.Background(), test) + + return results, nil +} + +conf := &consumer.HttpConf{ + Host: "api.example.com", + Path: "/data", + Concurrency: 1, + RateLimit: 1 * time.Second, + WorkerPoolSize: 5, } -``` \ No newline at end of file + +consumer, err := consumer.NewHTTPConsumerWithParser(conf, jsonArrayParser) +if err != nil { + log.Fatal(err) +} + +err = consumer.Start(context.Background(), func(data []byte) error { + // Process each individual result from the JSON array + fmt.Printf("Processing item: %s\n", string(data)) + return nil +}) +``` + +### Rate Limiting and Concurrency + +The consumer provides two levels of concurrency control: + +1. **HTTP Request Workers** (`Concurrency`): Control how many HTTP requests can be made simultaneously +2. **Result Processing Workers** (`WorkerPoolSize`): Control how many results can be processed simultaneously + +The `RateLimit` ensures a minimum time between HTTP requests, preventing overwhelming the server. + +## Architecture + +``` +HTTP Request Workers (Concurrency) + ↓ (Rate Limited) +HTTP Client + ↓ +Response Body + ↓ +Custom Parser (ParserFn) + ↓ +Multiple Results + ↓ +Result Channel (Buffered) + ↓ +Result Processing Workers (WorkerPoolSize) + ↓ +consumeFn(data) +``` + +## Benefits + +1. **Efficient Resource Usage**: Rate limiting prevents overwhelming the server +2. **Flexible Processing**: Custom parsers allow handling various response formats +3. **Controlled Concurrency**: Separate control over HTTP requests and result processing +4. **Backpressure Handling**: Buffered channels prevent memory issues +5. **Error Handling**: Proper error propagation through the pipeline \ No newline at end of file diff --git a/consumer/consumer.go b/consumer/consumer.go index 20fb1f9..784c3ed 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -3,8 +3,8 @@ package consumer import ( "context" "crypto/tls" + "encoding/json" "fmt" - "golang.org/x/sync/errgroup" "io" "math/rand" "net/http" @@ -12,31 +12,63 @@ import ( "os" "os/signal" "time" + + "golang.org/x/sync/errgroup" ) func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) { + // check if config is nil and return error if conf == nil { return nil, SentinelErrorConfigIsNil } + // check if time out seconds is set and set default if not if conf.TimeOutSeconds == 0 { conf.TimeOutSeconds = DefaultTimeOutSeconds } + // check if concurrency is set and set default if not if conf.Concurrency == 0 { conf.Concurrency = DefaultConcurrency } + // check if host is set and return error if not if conf.Host == "" { return nil, SentinelErrorHostNotSet } + // check if path is set and return error if not if conf.Path == "" { return nil, SentinelErrorPathNotSet } - if conf.SleepTime == 0 { - conf.SleepTime = 10000 + + // check if sleep time is set and set default if not + if conf.SleepTime404 == "" { + conf.SleepTime404 = DefaultSleepTime404 + } + + // check if max results is set and set default if not + if conf.MaxResults == 0 { + conf.MaxResults = DefaultMaxResults + } + + // check if rate limit is set and set default if not + if conf.RateLimit == "" { + conf.RateLimit = DefaultRateLimit + } + + // check if worker pool size is set and set default to match Concurrency if not + if conf.WorkerPoolSize == 0 { + // If concurrency is 1, default to 1 worker for sequential processing + if conf.Concurrency == 1 { + conf.WorkerPoolSize = 1 + } else { + conf.WorkerPoolSize = conf.Concurrency + } + } else if conf.WorkerPoolSize > conf.Concurrency { + // cap worker pool so it never exceeds request concurrency + conf.WorkerPoolSize = conf.Concurrency } // create http client @@ -48,7 +80,7 @@ func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) { urlHttp := url.URL{ Scheme: "https", Host: conf.Host, - Path: conf.Path, + Path: fmt.Sprintf("%s/%d", conf.Path, conf.MaxResults), } httpRequest, _ := http.NewRequest(http.MethodGet, urlHttp.String(), nil) @@ -68,18 +100,37 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error { ctx, cancel := context.WithCancel(ctx) go func() { - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - _ = <-c + <-c cancel() }() g, ctx := errgroup.WithContext(ctx) + // Create a rate limiter for HTTP requests + rateLimitDuration, err := time.ParseDuration(s.config.RateLimit) + if err != nil { + return fmt.Errorf("error parsing rate limit: %w", err) + } + rateLimiter := time.NewTicker(rateLimitDuration) + defer rateLimiter.Stop() + + // Create a single channel for parsed results and is sized to WorkerPoolSize + processChan := make(chan Result, s.config.WorkerPoolSize) + + // Start worker pool to process results + for i := 0; i < s.config.WorkerPoolSize; i++ { + g.Go(func() error { + return s.resultWorker(ctx, processChan, consumeFn) + }) + } + + // Start HTTP request workers with rate limiting for i := 0; i < s.config.Concurrency; i++ { time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second) g.Go(func() error { - return s.handleMessages(ctx, consumeFn) + return s.handleMessagesWithRateLimit(ctx, processChan, rateLimiter.C) }) } @@ -112,16 +163,16 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error { if err := consumeFn(body); err != nil { return err } - select { - case <-time.After(time.Duration(1000) * time.Millisecond): - case <-ctx.Done(): - return nil - } // if not found sleep for a while case http.StatusNotFound: + // parse sleep time + sleepTime, err := time.ParseDuration(s.config.SleepTime404) + if err != nil { + return fmt.Errorf("error parsing sleep time: %w", err) + } select { - case <-time.After(time.Duration(s.config.SleepTime) * time.Millisecond): + case <-time.After(sleepTime): // continue case <-ctx.Done(): fmt.Println("Sleep interrupted immediately by shutdown.") @@ -134,3 +185,104 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error { } } } + +func (s *HTTP) handleMessagesWithRateLimit(ctx context.Context, processChan chan Result, rateLimiter <-chan time.Time) error { + for { + select { + case <-ctx.Done(): + return nil + case <-rateLimiter: + select { + case <-ctx.Done(): + return nil + default: + // Issue request; downstream backpressure is enforced by blocking sends to processChan + + result, err := s.httpClient.Do(s.httpReq) + if err != nil { + return fmt.Errorf("error during call: %w, %w", err, SentinelApplicationError) + } + + switch result.StatusCode { + case http.StatusOK: + // return error if no body + if result.Body == nil { + return fmt.Errorf("error body is nil: %w", SentinelApplicationError) + } + body, err := io.ReadAll(result.Body) + result.Body.Close() + if err != nil { + return fmt.Errorf("error reading body: %w, %w", err, SentinelApplicationError) + } + + // parse and send results directly; blocking send applies backpressure + for _, res := range s.parseMultipleResults(body) { + select { + case processChan <- res: + case <-ctx.Done(): + return nil + } + } + case http.StatusNotFound: + // parse sleep time + sleepTime, err := time.ParseDuration(s.config.SleepTime404) + if err != nil { + return fmt.Errorf("error parsing sleep time: %w", err) + } + select { + case <-time.After(sleepTime): + // continue + case <-ctx.Done(): + fmt.Println("Sleep interrupted immediately by shutdown.") + } + // other http status + default: + return fmt.Errorf("error http during call: %s, %w", http.StatusText(result.StatusCode), SentinelHttpError) + } + } + } + } +} + +// parseMultipleResults parses the response body into multiple results +// This is a simple implementation - you can customize this based on your API response format +func (s *HTTP) parseMultipleResults(body []byte) []Result { + var results []Result + + // parse scans from JSON response + var response struct { + Scans []json.RawMessage `json:"scans"` + } + + if err := json.Unmarshal(body, &response); err == nil && len(response.Scans) > 0 { + // Parse scans array + for _, scan := range response.Scans { + results = append(results, Result{Data: scan}) + } + } else { + // Fallback: treat the entire body as one result + results = append(results, Result{Data: body}) + } + + return results +} + +func (s *HTTP) resultWorker(ctx context.Context, resultChan chan Result, consumeFn ConsumerFn) error { + for { + select { + case <-ctx.Done(): + return nil + case result := <-resultChan: + // Handle error results + if result.Err != nil { + return fmt.Errorf("parser error: %w", result.Err) + } + + if err := consumeFn(result.Data); err != nil { + return err + } + } + } +} + +// parserWorker removed: HTTP workers parse and enqueue results directly diff --git a/consumer/models.go b/consumer/models.go index beb1b2b..b97335b 100644 --- a/consumer/models.go +++ b/consumer/models.go @@ -8,6 +8,10 @@ import ( const ( DefaultTimeOutSeconds = int32(5) DefaultConcurrency = 1 + DefaultMaxResults = 10 + DefaultRateLimit = "3s" + DefaultWorkerPoolSize = 5 + DefaultSleepTime404 = "30s" ) var ( @@ -19,13 +23,17 @@ var ( ) type HttpConf struct { - Host string - Path string - Token string - Id string - Concurrency int - TimeOutSeconds int32 - SleepTime int32 + Host string + Path string + Token string + Id string + Concurrency int + TimeOutSeconds int32 + SleepTime404 string // Sleep time on 404 (e.g., "1m", "1h30m") + MaxResults int // Maximum number of results to process per request + RateLimit string // Minimum time between HTTP requests (e.g., "1s", "500ms") + WorkerPoolSize int // Number of workers to process results + SequentialProcessing bool // If true, process items one at a time even with MaxResults > 1 } type HttpClient interface { @@ -39,3 +47,12 @@ type HTTP struct { } type ConsumerFn func(data []byte) error + +// ParserFn is a function that parses HTTP response body into multiple results +type ParserFn func(body []byte) ([]Result, error) + +// Result represents a single result from the HTTP response +type Result struct { + Data []byte + Err error +} From 46f0c628dd6797626aa7be6a3e9afe0b7de6fbe5 Mon Sep 17 00:00:00 2001 From: "fegger@ducksify.com" Date: Mon, 18 Aug 2025 15:11:28 +0200 Subject: [PATCH 2/3] (release )docs: update README to clarify features, configuration options, and usage examples for HTTP consumer --- README.md | 122 +++++++++++++++++++----------------------------------- 1 file changed, 43 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index e1c6f2b..f0d15e6 100644 --- a/README.md +++ b/README.md @@ -1,131 +1,95 @@ -# HTTP Consumer +### HTTP Consumer -A flexible HTTP consumer with support for multiple results, rate limiting, and controlled concurrency. +A flexible HTTP consumer with support for multiple results per response, string-based rate limiting, and two-stage concurrency. -## Features +### Features -- **Rate Limiting**: Control the frequency of HTTP requests -- **Multiple Results**: Process multiple results from a single HTTP response -- **Worker Pools**: Separate workers for HTTP requests and result processing -- **Custom Parsers**: Define custom functions to parse response bodies -- **Concurrency Control**: Manage both HTTP request concurrency and result processing concurrency +- **Rate limiting (string durations)**: Control request pace with values like "500ms", "1s", "2m". +- **Multiple results per response**: Each HTTP response can produce up to `MaxResults` items. +- **Two-stage concurrency**: Control request concurrency with `Concurrency` and processing concurrency with `WorkerPoolSize`. +- **Backpressure**: Blocking channel sends ensure downstream saturation naturally throttles upstream. +- **Graceful shutdown and error propagation**. -## Configuration +### Configuration ```go type HttpConf struct { - Host string // Target host - Path string // API path - Token string // Bearer token - Id string // Scanner ID - Concurrency int // Number of HTTP request workers - TimeOutSeconds int32 // HTTP timeout - SleepTime int32 // Sleep time on 404 (milliseconds) - MaxResults int // Max results per request - RateLimit time.Duration // Minimum time between HTTP requests - WorkerPoolSize int // Number of result processing workers - ParserFn ParserFn // Custom parser function + Host string // Target host + Path string // API path + Token string // Bearer token + Id string // Scanner ID + Concurrency int // Number of HTTP request workers + TimeOutSeconds int32 // HTTP timeout (seconds) + SleepTime404 string // Sleep time on 404 (e.g., "30s", "1m30s") + MaxResults int // Max results requested per call (API specific) + RateLimit string // Minimum time between HTTP requests (e.g., "1s", "500ms") + WorkerPoolSize int // Number of result processing workers + SequentialProcessing bool // If true, adds a tiny delay between items } ``` -## Usage Examples +Notes: +- If `WorkerPoolSize` is 0, it defaults to `Concurrency` (and is capped to not exceed it). If `Concurrency` is 1, it defaults to 1 for sequential processing. +- `RateLimit` is parsed with `time.ParseDuration`. ### Basic Usage ```go conf := &consumer.HttpConf{ - Host: "api.example.com", - Path: "/data", - Token: "your-token", - Concurrency: 2, - RateLimit: 2 * time.Second, + Host: "api.example.com", + Path: "/data", + Token: "your-token", + Concurrency: 2, + RateLimit: "2s", WorkerPoolSize: 3, + SleepTime404: "30s", } -consumer, err := consumer.NewHTTPConsumer(conf) +c, err := consumer.NewHTTPConsumer(conf) if err != nil { log.Fatal(err) } -err = consumer.Start(context.Background(), func(data []byte) error { +err = c.Start(context.Background(), func(data []byte) error { // Process each result fmt.Printf("Processing: %s\n", string(data)) return nil }) -``` - -### With Custom Parser - -```go -// Custom parser for JSON array responses -func jsonArrayParser(body []byte) ([]consumer.Result, error) { - var data []interface{} - if err := json.Unmarshal(body, &data); err != nil { - return nil, err - } - - var results []consumer.Result - for _, item := range data { - itemBytes, _ := json.Marshal(item) - results = append(results, consumer.Result{Data: itemBytes}) - } - - return results, nil -} - -conf := &consumer.HttpConf{ - Host: "api.example.com", - Path: "/data", - Concurrency: 1, - RateLimit: 1 * time.Second, - WorkerPoolSize: 5, -} - -consumer, err := consumer.NewHTTPConsumerWithParser(conf, jsonArrayParser) if err != nil { log.Fatal(err) } - -err = consumer.Start(context.Background(), func(data []byte) error { - // Process each individual result from the JSON array - fmt.Printf("Processing item: %s\n", string(data)) - return nil -}) ``` ### Rate Limiting and Concurrency The consumer provides two levels of concurrency control: -1. **HTTP Request Workers** (`Concurrency`): Control how many HTTP requests can be made simultaneously -2. **Result Processing Workers** (`WorkerPoolSize`): Control how many results can be processed simultaneously +1. **HTTP Request Workers** (`Concurrency`): how many HTTP workers run in parallel (each obeys `RateLimit`). +2. **Result Processing Workers** (`WorkerPoolSize`): how many results are processed in parallel. -The `RateLimit` ensures a minimum time between HTTP requests, preventing overwhelming the server. +`RateLimit` ensures a minimum gap between requests from each worker. -## Architecture +### Architecture ``` -HTTP Request Workers (Concurrency) +HTTP Request Workers (Concurrency) ↓ (Rate Limited) HTTP Client ↓ Response Body ↓ -Custom Parser (ParserFn) - ↓ -Multiple Results +Parse into multiple results (internal) ↓ -Result Channel (Buffered) +Result Channel (buffered by WorkerPoolSize) ↓ Result Processing Workers (WorkerPoolSize) ↓ consumeFn(data) ``` -## Benefits +### Benefits -1. **Efficient Resource Usage**: Rate limiting prevents overwhelming the server -2. **Flexible Processing**: Custom parsers allow handling various response formats -3. **Controlled Concurrency**: Separate control over HTTP requests and result processing -4. **Backpressure Handling**: Buffered channels prevent memory issues -5. **Error Handling**: Proper error propagation through the pipeline \ No newline at end of file +- **Efficient resource usage**: Rate limiting prevents overwhelming the server. +- **Controlled concurrency**: Separate control over HTTP requests and result processing. +- **Backpressure handling**: Blocking sends on the result channel prevent overload. +- **Error handling**: Proper error propagation through the pipeline. \ No newline at end of file From 15fed4012b1cd92f7f41795700436faa6fb8bafa Mon Sep 17 00:00:00 2001 From: "fegger@ducksify.com" Date: Mon, 18 Aug 2025 15:40:56 +0200 Subject: [PATCH 3/3] (bugfix) fix tests --- consumer/consumer.go | 2 -- consumer/consumer_test.go | 52 ++++++++++++++------------------------- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 784c3ed..9c60b2d 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "math/rand" "net/http" "net/url" "os" @@ -128,7 +127,6 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error { // Start HTTP request workers with rate limiting for i := 0; i < s.config.Concurrency; i++ { - time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second) g.Go(func() error { return s.handleMessagesWithRateLimit(ctx, processChan, rateLimiter.C) }) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 645d9d1..9847aa0 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -5,15 +5,16 @@ import ( "context" "crypto/tls" "errors" - "github.com/stretchr/testify/assert" - _ "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "io" "net/http" "os" "testing" "time" + + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) type HTTPMock struct { @@ -192,10 +193,7 @@ func TestSQS_Start(t *testing.T) { return nil } - consumeTestFuncError := func(data []byte) error { - actualData = append(actualData, string(data)) - return SentinelApplicationError - } + // consumeTestFuncError removed; we now test only transport/body/error scenarios type fields struct { config *HttpConf @@ -218,8 +216,9 @@ func TestSQS_Start(t *testing.T) { name: "shouldHandleMessage", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -234,8 +233,9 @@ func TestSQS_Start(t *testing.T) { name: "should error when receive", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -251,8 +251,9 @@ func TestSQS_Start(t *testing.T) { name: "should body error", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -264,29 +265,14 @@ func TestSQS_Start(t *testing.T) { triggerErr: nil, wantErr: SentinelApplicationError, }, - { - name: "should consumer error", - fields: fields{ - config: &HttpConf{ - Host: host, - Path: path, - }, - httpClient: new(HTTPMock), - }, - args: args{ - consumeFn: consumeTestFuncError, - }, - body: "foo", - statusCode: http.StatusOK, - triggerErr: nil, - wantErr: SentinelApplicationError, - }, + // Removed: should consumer error (the pipeline returns error directly from consumeFn now) { name: "should context timeout", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), },