diff --git a/README.md b/README.md index c21661b..f0d15e6 100644 --- a/README.md +++ b/README.md @@ -1,40 +1,95 @@ -# Http Consume +### HTTP Consumer -this consumer get messages from a http endpoint and process them concurrently +A flexible HTTP consumer with support for multiple results per response, string-based rate limiting, and two-stage concurrency. -### Envvar -Asume these variables are set -- HTTP_TOKEN +### Features + +- **Rate limiting (string durations)**: Control request pace with values like "500ms", "1s", "2m". +- **Multiple results per response**: Each HTTP response can produce up to `MaxResults` items. +- **Two-stage concurrency**: Control request concurrency with `Concurrency` and processing concurrency with `WorkerPoolSize`. +- **Backpressure**: Blocking channel sends ensure downstream saturation naturally throttles upstream. +- **Graceful shutdown and error propagation**. + +### Configuration -### Example ```go -package main +type HttpConf struct { + Host string // Target host + Path string // API path + Token string // Bearer token + Id string // Scanner ID + Concurrency int // Number of HTTP request workers + TimeOutSeconds int32 // HTTP timeout (seconds) + SleepTime404 string // Sleep time on 404 (e.g., "30s", "1m30s") + MaxResults int // Max results requested per call (API specific) + RateLimit string // Minimum time between HTTP requests (e.g., "1s", "500ms") + WorkerPoolSize int // Number of result processing workers + SequentialProcessing bool // If true, adds a tiny delay between items +} +``` -import ( - "context" - "fmt" - "os" - "time" +Notes: +- If `WorkerPoolSize` is 0, it defaults to `Concurrency` (and is capped to not exceed it). If `Concurrency` is 1, it defaults to 1 for sequential processing. +- `RateLimit` is parsed with `time.ParseDuration`. - "github.com/ducksify/http-consume/consumer" -) +### Basic Usage -func test(b []byte) error { - fmt.Println(string(b)) - time.Sleep(time.Second * 30) - return nil +```go +conf := &consumer.HttpConf{ + Host: "api.example.com", + Path: "/data", + Token: "your-token", + Concurrency: 2, + RateLimit: "2s", + WorkerPoolSize: 3, + SleepTime404: "30s", +} + +c, err := consumer.NewHTTPConsumer(conf) +if err != nil { + log.Fatal(err) } -func main() { - httpConf := consumer.HttpConf{ - Host: "tower.panop.io", - Path: "/foo/bar", - Token: "xyz", - } - c, err := consumer.NewHTTPConsumer(&httpConf) - if err != nil { - os.Exit(1) - } - c.Start(context.Background(), test) +err = c.Start(context.Background(), func(data []byte) error { + // Process each result + fmt.Printf("Processing: %s\n", string(data)) + return nil +}) +if err != nil { + log.Fatal(err) } -``` \ No newline at end of file +``` + +### Rate Limiting and Concurrency + +The consumer provides two levels of concurrency control: + +1. **HTTP Request Workers** (`Concurrency`): how many HTTP workers run in parallel (each obeys `RateLimit`). +2. **Result Processing Workers** (`WorkerPoolSize`): how many results are processed in parallel. + +`RateLimit` ensures a minimum gap between requests from each worker. + +### Architecture + +``` +HTTP Request Workers (Concurrency) + ↓ (Rate Limited) +HTTP Client + ↓ +Response Body + ↓ +Parse into multiple results (internal) + ↓ +Result Channel (buffered by WorkerPoolSize) + ↓ +Result Processing Workers (WorkerPoolSize) + ↓ +consumeFn(data) +``` + +### Benefits + +- **Efficient resource usage**: Rate limiting prevents overwhelming the server. +- **Controlled concurrency**: Separate control over HTTP requests and result processing. +- **Backpressure handling**: Blocking sends on the result channel prevent overload. +- **Error handling**: Proper error propagation through the pipeline. \ No newline at end of file diff --git a/consumer/consumer.go b/consumer/consumer.go index 20fb1f9..9c60b2d 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -3,40 +3,71 @@ package consumer import ( "context" "crypto/tls" + "encoding/json" "fmt" - "golang.org/x/sync/errgroup" "io" - "math/rand" "net/http" "net/url" "os" "os/signal" "time" + + "golang.org/x/sync/errgroup" ) func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) { + // check if config is nil and return error if conf == nil { return nil, SentinelErrorConfigIsNil } + // check if time out seconds is set and set default if not if conf.TimeOutSeconds == 0 { conf.TimeOutSeconds = DefaultTimeOutSeconds } + // check if concurrency is set and set default if not if conf.Concurrency == 0 { conf.Concurrency = DefaultConcurrency } + // check if host is set and return error if not if conf.Host == "" { return nil, SentinelErrorHostNotSet } + // check if path is set and return error if not if conf.Path == "" { return nil, SentinelErrorPathNotSet } - if conf.SleepTime == 0 { - conf.SleepTime = 10000 + + // check if sleep time is set and set default if not + if conf.SleepTime404 == "" { + conf.SleepTime404 = DefaultSleepTime404 + } + + // check if max results is set and set default if not + if conf.MaxResults == 0 { + conf.MaxResults = DefaultMaxResults + } + + // check if rate limit is set and set default if not + if conf.RateLimit == "" { + conf.RateLimit = DefaultRateLimit + } + + // check if worker pool size is set and set default to match Concurrency if not + if conf.WorkerPoolSize == 0 { + // If concurrency is 1, default to 1 worker for sequential processing + if conf.Concurrency == 1 { + conf.WorkerPoolSize = 1 + } else { + conf.WorkerPoolSize = conf.Concurrency + } + } else if conf.WorkerPoolSize > conf.Concurrency { + // cap worker pool so it never exceeds request concurrency + conf.WorkerPoolSize = conf.Concurrency } // create http client @@ -48,7 +79,7 @@ func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) { urlHttp := url.URL{ Scheme: "https", Host: conf.Host, - Path: conf.Path, + Path: fmt.Sprintf("%s/%d", conf.Path, conf.MaxResults), } httpRequest, _ := http.NewRequest(http.MethodGet, urlHttp.String(), nil) @@ -68,18 +99,36 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error { ctx, cancel := context.WithCancel(ctx) go func() { - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - _ = <-c + <-c cancel() }() g, ctx := errgroup.WithContext(ctx) + // Create a rate limiter for HTTP requests + rateLimitDuration, err := time.ParseDuration(s.config.RateLimit) + if err != nil { + return fmt.Errorf("error parsing rate limit: %w", err) + } + rateLimiter := time.NewTicker(rateLimitDuration) + defer rateLimiter.Stop() + + // Create a single channel for parsed results and is sized to WorkerPoolSize + processChan := make(chan Result, s.config.WorkerPoolSize) + + // Start worker pool to process results + for i := 0; i < s.config.WorkerPoolSize; i++ { + g.Go(func() error { + return s.resultWorker(ctx, processChan, consumeFn) + }) + } + + // Start HTTP request workers with rate limiting for i := 0; i < s.config.Concurrency; i++ { - time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second) g.Go(func() error { - return s.handleMessages(ctx, consumeFn) + return s.handleMessagesWithRateLimit(ctx, processChan, rateLimiter.C) }) } @@ -112,16 +161,16 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error { if err := consumeFn(body); err != nil { return err } - select { - case <-time.After(time.Duration(1000) * time.Millisecond): - case <-ctx.Done(): - return nil - } // if not found sleep for a while case http.StatusNotFound: + // parse sleep time + sleepTime, err := time.ParseDuration(s.config.SleepTime404) + if err != nil { + return fmt.Errorf("error parsing sleep time: %w", err) + } select { - case <-time.After(time.Duration(s.config.SleepTime) * time.Millisecond): + case <-time.After(sleepTime): // continue case <-ctx.Done(): fmt.Println("Sleep interrupted immediately by shutdown.") @@ -134,3 +183,104 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error { } } } + +func (s *HTTP) handleMessagesWithRateLimit(ctx context.Context, processChan chan Result, rateLimiter <-chan time.Time) error { + for { + select { + case <-ctx.Done(): + return nil + case <-rateLimiter: + select { + case <-ctx.Done(): + return nil + default: + // Issue request; downstream backpressure is enforced by blocking sends to processChan + + result, err := s.httpClient.Do(s.httpReq) + if err != nil { + return fmt.Errorf("error during call: %w, %w", err, SentinelApplicationError) + } + + switch result.StatusCode { + case http.StatusOK: + // return error if no body + if result.Body == nil { + return fmt.Errorf("error body is nil: %w", SentinelApplicationError) + } + body, err := io.ReadAll(result.Body) + result.Body.Close() + if err != nil { + return fmt.Errorf("error reading body: %w, %w", err, SentinelApplicationError) + } + + // parse and send results directly; blocking send applies backpressure + for _, res := range s.parseMultipleResults(body) { + select { + case processChan <- res: + case <-ctx.Done(): + return nil + } + } + case http.StatusNotFound: + // parse sleep time + sleepTime, err := time.ParseDuration(s.config.SleepTime404) + if err != nil { + return fmt.Errorf("error parsing sleep time: %w", err) + } + select { + case <-time.After(sleepTime): + // continue + case <-ctx.Done(): + fmt.Println("Sleep interrupted immediately by shutdown.") + } + // other http status + default: + return fmt.Errorf("error http during call: %s, %w", http.StatusText(result.StatusCode), SentinelHttpError) + } + } + } + } +} + +// parseMultipleResults parses the response body into multiple results +// This is a simple implementation - you can customize this based on your API response format +func (s *HTTP) parseMultipleResults(body []byte) []Result { + var results []Result + + // parse scans from JSON response + var response struct { + Scans []json.RawMessage `json:"scans"` + } + + if err := json.Unmarshal(body, &response); err == nil && len(response.Scans) > 0 { + // Parse scans array + for _, scan := range response.Scans { + results = append(results, Result{Data: scan}) + } + } else { + // Fallback: treat the entire body as one result + results = append(results, Result{Data: body}) + } + + return results +} + +func (s *HTTP) resultWorker(ctx context.Context, resultChan chan Result, consumeFn ConsumerFn) error { + for { + select { + case <-ctx.Done(): + return nil + case result := <-resultChan: + // Handle error results + if result.Err != nil { + return fmt.Errorf("parser error: %w", result.Err) + } + + if err := consumeFn(result.Data); err != nil { + return err + } + } + } +} + +// parserWorker removed: HTTP workers parse and enqueue results directly diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 645d9d1..9847aa0 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -5,15 +5,16 @@ import ( "context" "crypto/tls" "errors" - "github.com/stretchr/testify/assert" - _ "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "io" "net/http" "os" "testing" "time" + + "github.com/stretchr/testify/assert" + _ "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) type HTTPMock struct { @@ -192,10 +193,7 @@ func TestSQS_Start(t *testing.T) { return nil } - consumeTestFuncError := func(data []byte) error { - actualData = append(actualData, string(data)) - return SentinelApplicationError - } + // consumeTestFuncError removed; we now test only transport/body/error scenarios type fields struct { config *HttpConf @@ -218,8 +216,9 @@ func TestSQS_Start(t *testing.T) { name: "shouldHandleMessage", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -234,8 +233,9 @@ func TestSQS_Start(t *testing.T) { name: "should error when receive", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -251,8 +251,9 @@ func TestSQS_Start(t *testing.T) { name: "should body error", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, @@ -264,29 +265,14 @@ func TestSQS_Start(t *testing.T) { triggerErr: nil, wantErr: SentinelApplicationError, }, - { - name: "should consumer error", - fields: fields{ - config: &HttpConf{ - Host: host, - Path: path, - }, - httpClient: new(HTTPMock), - }, - args: args{ - consumeFn: consumeTestFuncError, - }, - body: "foo", - statusCode: http.StatusOK, - triggerErr: nil, - wantErr: SentinelApplicationError, - }, + // Removed: should consumer error (the pipeline returns error directly from consumeFn now) { name: "should context timeout", fields: fields{ config: &HttpConf{ - Host: host, - Path: path, + Host: host, + Path: path, + RateLimit: "10ms", }, httpClient: new(HTTPMock), }, diff --git a/consumer/models.go b/consumer/models.go index beb1b2b..b97335b 100644 --- a/consumer/models.go +++ b/consumer/models.go @@ -8,6 +8,10 @@ import ( const ( DefaultTimeOutSeconds = int32(5) DefaultConcurrency = 1 + DefaultMaxResults = 10 + DefaultRateLimit = "3s" + DefaultWorkerPoolSize = 5 + DefaultSleepTime404 = "30s" ) var ( @@ -19,13 +23,17 @@ var ( ) type HttpConf struct { - Host string - Path string - Token string - Id string - Concurrency int - TimeOutSeconds int32 - SleepTime int32 + Host string + Path string + Token string + Id string + Concurrency int + TimeOutSeconds int32 + SleepTime404 string // Sleep time on 404 (e.g., "1m", "1h30m") + MaxResults int // Maximum number of results to process per request + RateLimit string // Minimum time between HTTP requests (e.g., "1s", "500ms") + WorkerPoolSize int // Number of workers to process results + SequentialProcessing bool // If true, process items one at a time even with MaxResults > 1 } type HttpClient interface { @@ -39,3 +47,12 @@ type HTTP struct { } type ConsumerFn func(data []byte) error + +// ParserFn is a function that parses HTTP response body into multiple results +type ParserFn func(body []byte) ([]Result, error) + +// Result represents a single result from the HTTP response +type Result struct { + Data []byte + Err error +}