Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand Down Expand Up @@ -58,6 +59,7 @@ type launcher struct {
p2pStreamConfig p2ptypes.StreamConfig
metrics *launcherMetrics
localCapMgr localcapmgr.LocalCapabilityManager
triggerRegistrationStatusUpdateTimeout limits.TimeLimiter

muSubServices sync.Mutex
subServices []services.Service
Expand Down Expand Up @@ -89,6 +91,7 @@ func NewLauncher(
dispatcher remotetypes.Dispatcher,
registry *Registry,
workflowDonNotifier DonNotifier,
triggerRegistrationStatusUpdateTimeout limits.TimeLimiter,
) (*launcher, error) {
p2pStreamConfig := defaultStreamConfig
if streamConfig != nil {
Expand Down Expand Up @@ -122,8 +125,9 @@ func NewLauncher(
registry: registry,
workflowDonNotifier: workflowDonNotifier,
don2donSharedPeer: don2donSharedPeer,
p2pStreamConfig: p2pStreamConfig,
metrics: metrics,
p2pStreamConfig: p2pStreamConfig,
metrics: metrics,
triggerRegistrationStatusUpdateTimeout: triggerRegistrationStatusUpdateTimeout,
}, nil
}

Expand Down Expand Up @@ -566,6 +570,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
"", // empty method name for v1
w.dispatcher,
w.lggr,
w.triggerRegistrationStatusUpdateTimeout,
)
w.cachedShims.triggerSubscribers[shimKey] = triggerCap
}
Expand Down Expand Up @@ -953,7 +958,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
if config.RemoteTriggerConfig != nil { // trigger
sub, alreadyExists := w.cachedShims.triggerSubscribers[shimKey]
if !alreadyExists {
sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr)
sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr, w.triggerRegistrationStatusUpdateTimeout)
cc.SetTriggerSubscriber(method, sub)
// add to cachedShims later, only after startNewShim succeeds
}
Expand Down
21 changes: 18 additions & 3 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/smartcontractkit/chainlink-protos/cre/go/values"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks"
Expand Down Expand Up @@ -132,6 +133,7 @@ func TestLauncher(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -181,6 +183,7 @@ func TestLauncher(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -225,6 +228,7 @@ func TestLauncher(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand All @@ -248,6 +252,7 @@ func TestLauncher(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -329,6 +334,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
limits.NewTimeLimiter(time.Millisecond),
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -372,7 +378,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
},
}
triggerEventCallbackCh, err := remoteTriggerSubscriber.RegisterTrigger(ctx, req)
require.NoError(t, err)
require.ErrorIs(t, err, capabilities.ErrUnableToDetermineRegistrationStatus)
<-awaitRegistrationMessageCh

// Receive trigger event
Expand Down Expand Up @@ -425,6 +431,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -483,6 +490,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -544,6 +552,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -614,6 +623,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -677,6 +687,7 @@ func TestLauncher_SuccessfullyFilterDon2Don(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -735,7 +746,7 @@ func TestLauncher_DonPairsToUpdate(t *testing.T) {
tt := NewTestTopology(pid, 4, 4)
wfDONID, capDONID, mixedDONID := registrysyncer.DonID(7), registrysyncer.DonID(12), registrysyncer.DonID(33)
localRegistry := tt.MakeLocalRegistry(uint32(wfDONID), uint32(capDONID), uint32(mixedDONID), RandomUTF8BytesWord(), fullTriggerCapID)
launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{})
launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}, nil)
require.NoError(t, err)

sharedPeer.On("IsBootstrap").Return(false).Times(3)
Expand Down Expand Up @@ -817,7 +828,7 @@ func TestLauncher_DonPairsToUpdate_SkipsDifferentFamilies(t *testing.T) {
addDON(localRegistry, capDONZoneBID, uint32(0), uint8(1), true, false, capabilityDonNodesZoneB, []string{"zone-b"}, 1, [][32]byte{triggerCapID})
addCapabilityToDON(localRegistry, capDONZoneBID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)

launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{})
launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}, nil)
require.NoError(t, err)

sharedPeer.On("IsBootstrap").Return(false).Once()
Expand Down Expand Up @@ -920,6 +931,7 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
launcher.p2pStreamConfig = customStreamConfig
Expand Down Expand Up @@ -1074,6 +1086,7 @@ func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down Expand Up @@ -1190,6 +1203,7 @@ func TestLauncher_OnNewRegistry_CallsLocalCapabilityManagerReconcile(t *testing.
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
launcher.SetLocalCapabilityManager(mockLCM)
Expand Down Expand Up @@ -1233,6 +1247,7 @@ func TestLauncher_OnNewRegistry_NilLocalCapabilityManager(t *testing.T) {
dispatcher,
registry,
&mockDonNotifier{},
nil,
)
require.NoError(t, err)
require.NoError(t, launcher.Start(t.Context()))
Expand Down
16 changes: 9 additions & 7 deletions core/capabilities/remote/messagecache/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,33 @@ func (c *MessageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, t
// received more recently than <minTimestamp>.
// Return all messages that satisfy the above condition.
// Ready() will return true at most once per event if <once> is true.
func (c *MessageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, [][]byte) {
func (c *MessageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, []PeerID, [][]byte) {
ev, ok := c.events[eventID]
if !ok {
return false, nil
return false, nil, nil
}
if ev.wasReady && once {
return false, nil
return false, nil, nil
}
//nolint:gosec // G115
if uint32(len(ev.peerMsgs)) < minCount {
return false, nil
return false, nil, nil
}
countAboveMinTimestamp := uint32(0)
var peers []PeerID
accPayloads := [][]byte{}
for _, msg := range ev.peerMsgs {
for peer, msg := range ev.peerMsgs {
if msg.timestamp >= minTimestamp {
countAboveMinTimestamp++
accPayloads = append(accPayloads, msg.payload)
peers = append(peers, peer)
}
}
if countAboveMinTimestamp >= minCount {
ev.wasReady = true
return true, accPayloads
return true, peers, accPayloads
}
return false, nil
return false, nil, nil
}

func (c *MessageCache[EventID, PeerID]) Delete(eventID EventID) {
Expand Down
8 changes: 4 additions & 4 deletions core/capabilities/remote/messagecache/message_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ func TestMessageCache_InsertReady(t *testing.T) {
// not ready with one message
ts := cache.Insert(eventID1, peerID1, 100, []byte(payloadA))
require.Equal(t, int64(100), ts)
ready, _ := cache.Ready(eventID1, 2, 100, true)
ready, _, _ := cache.Ready(eventID1, 2, 100, true)
require.False(t, ready)

// not ready with two messages but only one fresh enough
ts = cache.Insert(eventID1, peerID2, 200, []byte(payloadA))
require.Equal(t, int64(100), ts)
ready, _ = cache.Ready(eventID1, 2, 150, true)
ready, _, _ = cache.Ready(eventID1, 2, 150, true)
require.False(t, ready)

// ready with two messages (once only)
ready, messages := cache.Ready(eventID1, 2, 100, true)
ready, _, messages := cache.Ready(eventID1, 2, 100, true)
require.True(t, ready)
require.Equal(t, []byte(payloadA), messages[0])
require.Equal(t, []byte(payloadA), messages[1])

// not ready again for the same event ID
ready, _ = cache.Ready(eventID1, 2, 100, true)
ready, _, _ = cache.Ready(eventID1, 2, 100, true)
require.False(t, ready)
}

Expand Down
Loading
Loading