Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,95 @@
# Http Consume
### HTTP Consumer

this consumer get messages from a http endpoint and process them concurrently
A flexible HTTP consumer with support for multiple results per response, string-based rate limiting, and two-stage concurrency.

### Envvar
Asume these variables are set
- HTTP_TOKEN
### Features

- **Rate limiting (string durations)**: Control request pace with values like "500ms", "1s", "2m".
- **Multiple results per response**: Each HTTP response can produce up to `MaxResults` items.
- **Two-stage concurrency**: Control request concurrency with `Concurrency` and processing concurrency with `WorkerPoolSize`.
- **Backpressure**: Blocking channel sends ensure downstream saturation naturally throttles upstream.
- **Graceful shutdown and error propagation**.

### Configuration

### Example
```go
package main
type HttpConf struct {
Host string // Target host
Path string // API path
Token string // Bearer token
Id string // Scanner ID
Concurrency int // Number of HTTP request workers
TimeOutSeconds int32 // HTTP timeout (seconds)
SleepTime404 string // Sleep time on 404 (e.g., "30s", "1m30s")
MaxResults int // Max results requested per call (API specific)
RateLimit string // Minimum time between HTTP requests (e.g., "1s", "500ms")
WorkerPoolSize int // Number of result processing workers
SequentialProcessing bool // If true, adds a tiny delay between items
}
```

import (
"context"
"fmt"
"os"
"time"
Notes:
- If `WorkerPoolSize` is 0, it defaults to `Concurrency` (and is capped to not exceed it). If `Concurrency` is 1, it defaults to 1 for sequential processing.
- `RateLimit` is parsed with `time.ParseDuration`.

"github.com/ducksify/http-consume/consumer"
)
### Basic Usage

func test(b []byte) error {
fmt.Println(string(b))
time.Sleep(time.Second * 30)
return nil
```go
conf := &consumer.HttpConf{
Host: "api.example.com",
Path: "/data",
Token: "your-token",
Concurrency: 2,
RateLimit: "2s",
WorkerPoolSize: 3,
SleepTime404: "30s",
}

c, err := consumer.NewHTTPConsumer(conf)
if err != nil {
log.Fatal(err)
}

func main() {
httpConf := consumer.HttpConf{
Host: "tower.panop.io",
Path: "/foo/bar",
Token: "xyz",
}
c, err := consumer.NewHTTPConsumer(&httpConf)
if err != nil {
os.Exit(1)
}
c.Start(context.Background(), test)
err = c.Start(context.Background(), func(data []byte) error {
// Process each result
fmt.Printf("Processing: %s\n", string(data))
return nil
})
if err != nil {
log.Fatal(err)
}
```
```

### Rate Limiting and Concurrency

The consumer provides two levels of concurrency control:

1. **HTTP Request Workers** (`Concurrency`): how many HTTP workers run in parallel (each obeys `RateLimit`).
2. **Result Processing Workers** (`WorkerPoolSize`): how many results are processed in parallel.

`RateLimit` ensures a minimum gap between requests from each worker.

### Architecture

```
HTTP Request Workers (Concurrency)
↓ (Rate Limited)
HTTP Client
Response Body
Parse into multiple results (internal)
Result Channel (buffered by WorkerPoolSize)
Result Processing Workers (WorkerPoolSize)
consumeFn(data)
```

### Benefits

- **Efficient resource usage**: Rate limiting prevents overwhelming the server.
- **Controlled concurrency**: Separate control over HTTP requests and result processing.
- **Backpressure handling**: Blocking sends on the result channel prevent overload.
- **Error handling**: Proper error propagation through the pipeline.
180 changes: 165 additions & 15 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,71 @@ package consumer
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"golang.org/x/sync/errgroup"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"time"

"golang.org/x/sync/errgroup"
)

func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) {

// check if config is nil and return error
if conf == nil {
return nil, SentinelErrorConfigIsNil
}

// check if time out seconds is set and set default if not
if conf.TimeOutSeconds == 0 {
conf.TimeOutSeconds = DefaultTimeOutSeconds
}

// check if concurrency is set and set default if not
if conf.Concurrency == 0 {
conf.Concurrency = DefaultConcurrency
}

// check if host is set and return error if not
if conf.Host == "" {
return nil, SentinelErrorHostNotSet
}

// check if path is set and return error if not
if conf.Path == "" {
return nil, SentinelErrorPathNotSet
}
if conf.SleepTime == 0 {
conf.SleepTime = 10000

// check if sleep time is set and set default if not
if conf.SleepTime404 == "" {
conf.SleepTime404 = DefaultSleepTime404
}

// check if max results is set and set default if not
if conf.MaxResults == 0 {
conf.MaxResults = DefaultMaxResults
}

// check if rate limit is set and set default if not
if conf.RateLimit == "" {
conf.RateLimit = DefaultRateLimit
}

// check if worker pool size is set and set default to match Concurrency if not
if conf.WorkerPoolSize == 0 {
// If concurrency is 1, default to 1 worker for sequential processing
if conf.Concurrency == 1 {
conf.WorkerPoolSize = 1
} else {
conf.WorkerPoolSize = conf.Concurrency
}
} else if conf.WorkerPoolSize > conf.Concurrency {
// cap worker pool so it never exceeds request concurrency
conf.WorkerPoolSize = conf.Concurrency
}

// create http client
Expand All @@ -48,7 +79,7 @@ func NewHTTPConsumer(conf *HttpConf) (*HTTP, error) {
urlHttp := url.URL{
Scheme: "https",
Host: conf.Host,
Path: conf.Path,
Path: fmt.Sprintf("%s/%d", conf.Path, conf.MaxResults),
}

httpRequest, _ := http.NewRequest(http.MethodGet, urlHttp.String(), nil)
Expand All @@ -68,18 +99,36 @@ func (s *HTTP) Start(ctx context.Context, consumeFn ConsumerFn) error {
ctx, cancel := context.WithCancel(ctx)

go func() {
c := make(chan os.Signal)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
_ = <-c
<-c
cancel()
}()

g, ctx := errgroup.WithContext(ctx)

// Create a rate limiter for HTTP requests
rateLimitDuration, err := time.ParseDuration(s.config.RateLimit)
if err != nil {
return fmt.Errorf("error parsing rate limit: %w", err)
}
rateLimiter := time.NewTicker(rateLimitDuration)
defer rateLimiter.Stop()

// Create a single channel for parsed results and is sized to WorkerPoolSize
processChan := make(chan Result, s.config.WorkerPoolSize)

// Start worker pool to process results
for i := 0; i < s.config.WorkerPoolSize; i++ {
g.Go(func() error {
return s.resultWorker(ctx, processChan, consumeFn)
})
}

// Start HTTP request workers with rate limiting
for i := 0; i < s.config.Concurrency; i++ {
time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
g.Go(func() error {
return s.handleMessages(ctx, consumeFn)
return s.handleMessagesWithRateLimit(ctx, processChan, rateLimiter.C)
})
}

Expand Down Expand Up @@ -112,16 +161,16 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error {
if err := consumeFn(body); err != nil {
return err
}
select {
case <-time.After(time.Duration(1000) * time.Millisecond):
case <-ctx.Done():
return nil
}

// if not found sleep for a while
case http.StatusNotFound:
// parse sleep time
sleepTime, err := time.ParseDuration(s.config.SleepTime404)
if err != nil {
return fmt.Errorf("error parsing sleep time: %w", err)
}
select {
case <-time.After(time.Duration(s.config.SleepTime) * time.Millisecond):
case <-time.After(sleepTime):
// continue
case <-ctx.Done():
fmt.Println("Sleep interrupted immediately by shutdown.")
Expand All @@ -134,3 +183,104 @@ func (s *HTTP) handleMessages(ctx context.Context, consumeFn ConsumerFn) error {
}
}
}

func (s *HTTP) handleMessagesWithRateLimit(ctx context.Context, processChan chan Result, rateLimiter <-chan time.Time) error {
for {
select {
case <-ctx.Done():
return nil
case <-rateLimiter:
select {
case <-ctx.Done():
return nil
default:
// Issue request; downstream backpressure is enforced by blocking sends to processChan

result, err := s.httpClient.Do(s.httpReq)
if err != nil {
return fmt.Errorf("error during call: %w, %w", err, SentinelApplicationError)
}

switch result.StatusCode {
case http.StatusOK:
// return error if no body
if result.Body == nil {
return fmt.Errorf("error body is nil: %w", SentinelApplicationError)
}
body, err := io.ReadAll(result.Body)
result.Body.Close()
if err != nil {
return fmt.Errorf("error reading body: %w, %w", err, SentinelApplicationError)
}

// parse and send results directly; blocking send applies backpressure
for _, res := range s.parseMultipleResults(body) {
select {
case processChan <- res:
case <-ctx.Done():
return nil
}
}
case http.StatusNotFound:
// parse sleep time
sleepTime, err := time.ParseDuration(s.config.SleepTime404)
if err != nil {
return fmt.Errorf("error parsing sleep time: %w", err)
}
select {
case <-time.After(sleepTime):
// continue
case <-ctx.Done():
fmt.Println("Sleep interrupted immediately by shutdown.")
}
// other http status
default:
return fmt.Errorf("error http during call: %s, %w", http.StatusText(result.StatusCode), SentinelHttpError)
}
}
}
}
}

// parseMultipleResults parses the response body into multiple results
// This is a simple implementation - you can customize this based on your API response format
func (s *HTTP) parseMultipleResults(body []byte) []Result {
var results []Result

// parse scans from JSON response
var response struct {
Scans []json.RawMessage `json:"scans"`
}

if err := json.Unmarshal(body, &response); err == nil && len(response.Scans) > 0 {
// Parse scans array
for _, scan := range response.Scans {
results = append(results, Result{Data: scan})
}
} else {
// Fallback: treat the entire body as one result
results = append(results, Result{Data: body})
}

return results
}

func (s *HTTP) resultWorker(ctx context.Context, resultChan chan Result, consumeFn ConsumerFn) error {
for {
select {
case <-ctx.Done():
return nil
case result := <-resultChan:
// Handle error results
if result.Err != nil {
return fmt.Errorf("parser error: %w", result.Err)
}

if err := consumeFn(result.Data); err != nil {
return err
}
}
}
}

// parserWorker removed: HTTP workers parse and enqueue results directly
Loading
Loading