Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions core/capabilities/fakes/gateway/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,90 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"time"

httptypedapi "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http"
)

type JSONRPCRequest struct {
const (
triggerPath = "/trigger"
maxRequestBytes = 1 << 20 // 1 MiB
readHeaderTimeout = time.Second
shutdownTimeout = time.Second
)

type triggerRequest struct {
Input json.RawMessage `json:"input"`
}

// Config holds settings for a LocalGateway test server.
type Config struct {
Port uint16
}

// LocalGateway is a minimal HTTP server that accepts a single trigger POST
// and returns the parsed payload to the caller.
type LocalGateway struct {
config Config
}

// NewLocalGateway returns a LocalGateway bound to the port in config.
func NewLocalGateway(config Config) *LocalGateway {
return &LocalGateway{config: config}
}

// ListenForTriggerPayload starts an HTTP server on the configured port and
// blocks until a POST /trigger request arrives or ctx is cancelled.
func (g *LocalGateway) ListenForTriggerPayload(ctx context.Context) (*httptypedapi.Payload, error) {
payloadCh := make(chan *httptypedapi.Payload, 1)
errorCh := make(chan error, 1)
type result struct {
payload *httptypedapi.Payload
err error
}
resultCh := make(chan result, 1)

mux := http.NewServeMux()
mux.HandleFunc("/trigger", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(triggerPath, func(w http.ResponseWriter, r *http.Request) {
input, err := parseRequest(r)
if err != nil {
http.Error(w, fmt.Sprintf("error processing request: %v", err), http.StatusBadRequest)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

payloadCh <- &httptypedapi.Payload{
Input: input,
select {
case resultCh <- result{payload: &httptypedapi.Payload{Input: input}}:
w.WriteHeader(http.StatusOK)
case <-r.Context().Done():
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}
w.WriteHeader(http.StatusOK)
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", g.config.Port),
Addr: net.JoinHostPort("", strconv.Itoa(int(g.config.Port))),
Handler: mux,
ReadHeaderTimeout: time.Second,
ReadHeaderTimeout: readHeaderTimeout,
BaseContext: func(net.Listener) context.Context { return ctx },
}
defer server.Close()

go func() {
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errorCh <- err
select {
case resultCh <- result{err: err}:
default:
}
}
}()
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()

select {
case payload := <-payloadCh:
return payload, nil
case err := <-errorCh:
return nil, err
case res := <-resultCh:
return res.payload, res.err
case <-ctx.Done():
return nil, ctx.Err()
}
Expand All @@ -73,16 +99,17 @@ func parseRequest(req *http.Request) ([]byte, error) {
if req.Method != http.MethodPost {
return nil, errors.New("gateway expects POST request")
}
defer req.Body.Close()

body, err := io.ReadAll(req.Body)
body, err := io.ReadAll(http.MaxBytesReader(nil, req.Body, maxRequestBytes))
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
return nil, fmt.Errorf("read request body: %w", err)
}

var rpcRequest JSONRPCRequest
if err := json.Unmarshal(body, &rpcRequest); err != nil {
return nil, fmt.Errorf("failed to parse request body: %w", err)
var request triggerRequest
if err := json.Unmarshal(body, &request); err != nil {
return nil, fmt.Errorf("parse request body: %w", err)
}

return rpcRequest.Input, nil
return request.Input, nil
}
24 changes: 12 additions & 12 deletions core/capabilities/fakes/gateway/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,29 @@ import (
"fmt"
"net"
"net/http"
"strconv"
"testing"
"time"

"github.com/smartcontractkit/freeport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

httptypedapi "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http"
)

// waitForPort polls until the TCP port is reachable or the deadline passes.
// waitForPort polls until the TCP port is reachable or timeout passes.
func waitForPort(t *testing.T, port uint16, timeout time.Duration) {
t.Helper()
addr := fmt.Sprintf("127.0.0.1:%d", port)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
dialer := &net.Dialer{Timeout: 50 * time.Millisecond}
conn, err := dialer.DialContext(context.Background(), "tcp", addr)
if err == nil {
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(port)))
dialer := &net.Dialer{Timeout: 50 * time.Millisecond}

require.EventuallyWithT(t, func(c *assert.CollectT) {
conn, err := dialer.DialContext(t.Context(), "tcp", addr)
if assert.NoError(c, err) {
_ = conn.Close()
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("server on port %d did not become ready within %s", port, timeout)
}, timeout, 10*time.Millisecond, "server on port %d did not become ready", port)
}

// TestListenForTriggerPayload_HappyPath is an integration test that verifies
Expand All @@ -39,7 +38,8 @@ func waitForPort(t *testing.T, port uint16, timeout time.Duration) {
// 2. A valid POST request carrying a signed JWT and a JSON-RPC body is sent.
// 3. The method returns a Payload whose Input and Key match the request.
func TestListenForTriggerPayload_HappyPath(t *testing.T) {
var port uint16 = 30123
t.Parallel()
port := uint16(freeport.GetOne(t)) //nolint:gosec // G115: freeport returns valid port range
gw := NewLocalGateway(Config{Port: port})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion core/internal/testutils/pgtest/pgtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SET statement_timeout = '30s';`)
}

func MustExec(t *testing.T, ds sqlutil.DataSource, stmt string, args ...any) {
ctx := testutils.Context(t)
ctx := t.Context()
require.NoError(t, utils.JustError(ds.ExecContext(ctx, stmt, args...)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/durableemitter"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

Expand Down Expand Up @@ -74,8 +73,8 @@ func TestPgDurableEventStore_ListPending_RespectsLimit(t *testing.T) {
ctx := t.Context()
store := durableemitter.NewPgDurableEventStore(db)

for i := 0; i < 20; i++ {
_, err := store.Insert(ctx, []byte(fmt.Sprintf("event-%d", i)))
for i := range 20 {
_, err := store.Insert(ctx, fmt.Appendf(nil, "event-%d", i))
require.NoError(t, err)
}

Expand Down Expand Up @@ -107,7 +106,7 @@ func TestPgDurableEventStore_DeleteExpired(t *testing.T) {
func TestPgDurableEventStore_ObserveDurableQueue(t *testing.T) {
db := pgtest.NewSqlxDB(t)
truncateChipDurableEvents(t, db)
ctx := testutils.Context(t)
ctx := t.Context()
store := durableemitter.NewPgDurableEventStore(db)

st, err := store.ObserveDurableQueue(ctx, time.Hour, time.Minute)
Expand All @@ -126,7 +125,7 @@ func TestPgDurableEventStore_ObserveDurableQueue(t *testing.T) {
func TestPgDurableEventStore_MarkDeliveredAndPurgeDelivered(t *testing.T) {
db := pgtest.NewSqlxDB(t)
truncateChipDurableEvents(t, db)
ctx := testutils.Context(t)
ctx := t.Context()
store := durableemitter.NewPgDurableEventStore(db)

id, err := store.Insert(ctx, []byte("payload"))
Expand Down
6 changes: 2 additions & 4 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ func NewCache(cleanupInterval time.Duration) *Cache {
go c.updateMetrics()

if cleanupInterval > 0 {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.wg.Go(func() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
Expand All @@ -117,7 +115,7 @@ func NewCache(cleanupInterval time.Duration) *Cache {
return
}
}
}()
})
}

return c
Expand Down
14 changes: 7 additions & 7 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestCache_UpdateStreamValues(t *testing.T) {
})
}

func TestCache_UpdateStreamValues_RecordsHitEntryAge(t *testing.T) {
func TestCache_UpdateStreamValues_RecordsHitEntryAge(t *testing.T) { //nolint:paralleltest // resets package-level prometheus metrics
promCacheHitEntryAgeMs.Reset()
promCacheHitCount.Reset()

Expand Down Expand Up @@ -334,7 +334,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id % 256)}}, time.Second)
}
}(i)
}
Expand All @@ -345,7 +345,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
for j := range numOperations {
streamID := i*numOperations + j
val, _ := cache.Get(streamID)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, val)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i % 256)}}, val)
}
}
}
Expand All @@ -366,7 +366,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id % 256)}}, time.Second)
}
}(i)
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id % 256)}}, time.Second)
}
}(i)
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestCache_ConcurrentAddMany(t *testing.T) {
batch := make(map[llotypes.StreamID]llo.StreamValue, batchSize)
for j := range batchSize {
streamID := id*numBatches*batchSize + b*batchSize + j
batch[streamID] = &mockStreamValue{value: []byte{byte(id)}}
batch[streamID] = &mockStreamValue{value: []byte{byte(id % 256)}}
}
cache.AddMany(batch, time.Second)
}
Expand All @@ -451,7 +451,7 @@ func TestCache_ConcurrentAddMany(t *testing.T) {
for j := range batchSize {
streamID := i*numBatches*batchSize + b*batchSize + j
val, _ := cache.Get(streamID)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, val)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i % 256)}}, val)
}
}
}
Expand Down
Loading
Loading