diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index bca0a2acf6c..b9de37eba4e 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -18,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" @@ -58,6 +59,7 @@ type launcher struct { p2pStreamConfig p2ptypes.StreamConfig metrics *launcherMetrics localCapMgr localcapmgr.LocalCapabilityManager + triggerRegistrationStatusUpdateTimeout limits.TimeLimiter muSubServices sync.Mutex subServices []services.Service @@ -89,6 +91,7 @@ func NewLauncher( dispatcher remotetypes.Dispatcher, registry *Registry, workflowDonNotifier DonNotifier, + triggerRegistrationStatusUpdateTimeout limits.TimeLimiter, ) (*launcher, error) { p2pStreamConfig := defaultStreamConfig if streamConfig != nil { @@ -122,8 +125,9 @@ func NewLauncher( registry: registry, workflowDonNotifier: workflowDonNotifier, don2donSharedPeer: don2donSharedPeer, - p2pStreamConfig: p2pStreamConfig, - metrics: metrics, + p2pStreamConfig: p2pStreamConfig, + metrics: metrics, + triggerRegistrationStatusUpdateTimeout: triggerRegistrationStatusUpdateTimeout, }, nil } @@ -566,6 +570,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili "", // empty method name for v1 w.dispatcher, w.lggr, + w.triggerRegistrationStatusUpdateTimeout, ) w.cachedShims.triggerSubscribers[shimKey] = triggerCap } @@ -953,7 +958,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth if config.RemoteTriggerConfig != nil { // trigger sub, alreadyExists := w.cachedShims.triggerSubscribers[shimKey] if !alreadyExists { - sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr) + sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr, w.triggerRegistrationStatusUpdateTimeout) cc.SetTriggerSubscriber(method, sub) // add to cachedShims later, only after startNewShim succeeds } diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 29ec22e2fe3..32a252fce92 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-protos/cre/go/values" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" @@ -132,6 +133,7 @@ func TestLauncher(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -181,6 +183,7 @@ func TestLauncher(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -225,6 +228,7 @@ func TestLauncher(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -248,6 +252,7 @@ func TestLauncher(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -329,6 +334,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + limits.NewTimeLimiter(time.Millisecond), ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -372,7 +378,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { }, } triggerEventCallbackCh, err := remoteTriggerSubscriber.RegisterTrigger(ctx, req) - require.NoError(t, err) + require.ErrorIs(t, err, capabilities.ErrUnableToDetermineRegistrationStatus) <-awaitRegistrationMessageCh // Receive trigger event @@ -425,6 +431,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -483,6 +490,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -544,6 +552,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -614,6 +623,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -677,6 +687,7 @@ func TestLauncher_SuccessfullyFilterDon2Don(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -735,7 +746,7 @@ func TestLauncher_DonPairsToUpdate(t *testing.T) { tt := NewTestTopology(pid, 4, 4) wfDONID, capDONID, mixedDONID := registrysyncer.DonID(7), registrysyncer.DonID(12), registrysyncer.DonID(33) localRegistry := tt.MakeLocalRegistry(uint32(wfDONID), uint32(capDONID), uint32(mixedDONID), RandomUTF8BytesWord(), fullTriggerCapID) - launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}) + launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}, nil) require.NoError(t, err) sharedPeer.On("IsBootstrap").Return(false).Times(3) @@ -817,7 +828,7 @@ func TestLauncher_DonPairsToUpdate_SkipsDifferentFamilies(t *testing.T) { addDON(localRegistry, capDONZoneBID, uint32(0), uint8(1), true, false, capabilityDonNodesZoneB, []string{"zone-b"}, 1, [][32]byte{triggerCapID}) addCapabilityToDON(localRegistry, capDONZoneBID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil) - launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}) + launcher, err := NewLauncher(logger.Test(t), nil, sharedPeer, nil, dispatcher, registry, &mockDonNotifier{}, nil) require.NoError(t, err) sharedPeer.On("IsBootstrap").Return(false).Once() @@ -920,6 +931,7 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) launcher.p2pStreamConfig = customStreamConfig @@ -1074,6 +1086,7 @@ func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) @@ -1190,6 +1203,7 @@ func TestLauncher_OnNewRegistry_CallsLocalCapabilityManagerReconcile(t *testing. dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) launcher.SetLocalCapabilityManager(mockLCM) @@ -1233,6 +1247,7 @@ func TestLauncher_OnNewRegistry_NilLocalCapabilityManager(t *testing.T) { dispatcher, registry, &mockDonNotifier{}, + nil, ) require.NoError(t, err) require.NoError(t, launcher.Start(t.Context())) diff --git a/core/capabilities/remote/messagecache/message_cache.go b/core/capabilities/remote/messagecache/message_cache.go index ea36ff4271e..cd7ffa27549 100644 --- a/core/capabilities/remote/messagecache/message_cache.go +++ b/core/capabilities/remote/messagecache/message_cache.go @@ -43,31 +43,33 @@ func (c *MessageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, t // received more recently than . // Return all messages that satisfy the above condition. // Ready() will return true at most once per event if is true. -func (c *MessageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, [][]byte) { +func (c *MessageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, []PeerID, [][]byte) { ev, ok := c.events[eventID] if !ok { - return false, nil + return false, nil, nil } if ev.wasReady && once { - return false, nil + return false, nil, nil } //nolint:gosec // G115 if uint32(len(ev.peerMsgs)) < minCount { - return false, nil + return false, nil, nil } countAboveMinTimestamp := uint32(0) + var peers []PeerID accPayloads := [][]byte{} - for _, msg := range ev.peerMsgs { + for peer, msg := range ev.peerMsgs { if msg.timestamp >= minTimestamp { countAboveMinTimestamp++ accPayloads = append(accPayloads, msg.payload) + peers = append(peers, peer) } } if countAboveMinTimestamp >= minCount { ev.wasReady = true - return true, accPayloads + return true, peers, accPayloads } - return false, nil + return false, nil, nil } func (c *MessageCache[EventID, PeerID]) Delete(eventID EventID) { diff --git a/core/capabilities/remote/messagecache/message_cache_test.go b/core/capabilities/remote/messagecache/message_cache_test.go index 2d059adef32..04d411a9aa1 100644 --- a/core/capabilities/remote/messagecache/message_cache_test.go +++ b/core/capabilities/remote/messagecache/message_cache_test.go @@ -22,23 +22,23 @@ func TestMessageCache_InsertReady(t *testing.T) { // not ready with one message ts := cache.Insert(eventID1, peerID1, 100, []byte(payloadA)) require.Equal(t, int64(100), ts) - ready, _ := cache.Ready(eventID1, 2, 100, true) + ready, _, _ := cache.Ready(eventID1, 2, 100, true) require.False(t, ready) // not ready with two messages but only one fresh enough ts = cache.Insert(eventID1, peerID2, 200, []byte(payloadA)) require.Equal(t, int64(100), ts) - ready, _ = cache.Ready(eventID1, 2, 150, true) + ready, _, _ = cache.Ready(eventID1, 2, 150, true) require.False(t, ready) // ready with two messages (once only) - ready, messages := cache.Ready(eventID1, 2, 100, true) + ready, _, messages := cache.Ready(eventID1, 2, 100, true) require.True(t, ready) require.Equal(t, []byte(payloadA), messages[0]) require.Equal(t, []byte(payloadA), messages[1]) // not ready again for the same event ID - ready, _ = cache.Ready(eventID1, 2, 100, true) + ready, _, _ = cache.Ready(eventID1, 2, 100, true) require.False(t, ready) } diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 7ee06945473..4ee745d406c 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -82,10 +82,11 @@ type ackKey struct { } type pubRegState struct { - callback <-chan commoncap.TriggerResponse - request commoncap.TriggerRegistrationRequest - cancel context.CancelFunc - registrationErr error // non-nil if RegisterTrigger returned an error; used to suppress retries + callback <-chan commoncap.TriggerResponse + request commoncap.TriggerRegistrationRequest + cancel context.CancelFunc + registrationStatus types.RegistrationStatus + registrationErrorMessage string } type batchedResponse struct { @@ -256,17 +257,21 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { defer p.mu.Unlock() p.messageCache.Insert(key, sender, nowMs, msg.Payload) if existing, exists := p.registrations[key]; exists { - if existing.registrationErr != nil { - p.lggr.Debugw("skipping trigger registration; previous attempt failed with user error", - "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID, "err", existing.registrationErr) - } else { + if existing.registrationStatus == types.RegistrationStatus_REGISTRATION_ERROR { + p.lggr.Debugw("skipping trigger registration; previous attempt failed", + "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID) + p.sendRegistrationStatus(sender, key, existing) + return + } + if existing.registrationStatus == types.RegistrationStatus_REGISTERED { p.lggr.Debugw("trigger registration already exists", "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID) + p.sendRegistrationStatus(sender, key, existing) + return } - return } // NOTE: require 2F+1 by default, introduce different strategies later (KS-76) minRequired := uint32(2*callerDon.F + 1) - ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), false) + ready, requestingPeers, payloads := p.messageCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), false) if !ready { p.lggr.Debugw("not ready to aggregate yet", "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID, "minRequired", minRequired) return @@ -287,22 +292,30 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { attribute.String("capabilityID", p.capabilityID), attribute.String("callerDonID", strconv.FormatUint(uint64(key.callerDonID), 10)), ) + regState := &pubRegState{request: unmarshaled, cancel: cancel} + if err == nil { p.metrics.registerTriggerCounter.Add(ctx, 1, capAttrs, metric.WithAttributes(attribute.String("outcome", "success"))) - p.registrations[key] = &pubRegState{ - callback: callbackCh, - request: unmarshaled, - cancel: cancel, - } + regState.callback = callbackCh + regState.registrationStatus = types.RegistrationStatus_REGISTERED + p.registrations[key] = regState p.wg.Add(1) go p.triggerEventLoop(callbackCh, key) p.lggr.Debugw("updated trigger registration", "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID) } else { p.metrics.registerTriggerCounter.Add(ctx, 1, capAttrs, metric.WithAttributes(attribute.String("outcome", "error"))) cancel() + regState.registrationStatus = types.RegistrationStatus_REGISTRATION_ERROR var capErr caperrors.Error - if errors.As(err, &capErr) && capErr.Origin() == caperrors.OriginUser { - p.registrations[key] = &pubRegState{registrationErr: err} + if errors.As(err, &capErr) { + regState.registrationErrorMessage = capErr.SerializeToRemoteString() + } else { + regState.registrationErrorMessage = caperrors.NewPublicSystemError(err, caperrors.Unknown).SerializeToRemoteString() + } + + isUserError := errors.As(err, &capErr) && capErr.Origin() == caperrors.OriginUser + if isUserError { + p.registrations[key] = regState p.lggr.Errorw("trigger registration failed with user error; will not retry", "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID, "err", err) } else { @@ -310,6 +323,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { "workflowId", req.Metadata.WorkflowID, "triggerID", req.TriggerID, "err", err) } } + + for _, peerID := range requestingPeers { + p.sendRegistrationStatus(peerID, key, regState) + } case types.MethodUnregisterTrigger: meta := msg.GetTriggerEventMetadata() if meta == nil { @@ -346,7 +363,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { nowMs := time.Now().UnixMilli() minRequired := uint32(2*callerDon.F + 1) p.unregisterCache.Insert(key, sender, nowMs, nil) - ready, _ := p.unregisterCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), true) + ready, _, _ := p.unregisterCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), true) if !ready { p.mu.Unlock() p.lggr.Debugw("unregister quorum not reached yet", "workflowID", key.workflowID, "triggerID", key.triggerID, "sender", sender, "minRequired", minRequired) @@ -402,7 +419,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { nowMs := time.Now().UnixMilli() p.ackCache.Insert(key, sender, nowMs, msg.Payload) minRequired := uint32(2*callerDon.F + 1) - ready, _ := p.ackCache.Ready(key, minRequired, 0, false) + ready, _, _ := p.ackCache.Ready(key, minRequired, 0, false) if !ready { ackCount := len(p.ackCache.Peers(key)) p.lggr.Debugw("not ready to ACK trigger event yet", @@ -746,6 +763,32 @@ func (p *triggerPublisher) batchingLoop() { } } +func (p *triggerPublisher) sendRegistrationStatus(peerID p2ptypes.PeerID, key registrationKey, reg *pubRegState) { + cfg := p.cfg.Load() + if cfg == nil { + return + } + + msg := &types.MessageBody{ + CapabilityId: p.capabilityID, + CapabilityDonId: cfg.capDonInfo.ID, + CallerDonId: key.callerDonID, + Method: types.MethodTriggerRegistrationStatus, + CapabilityMethod: p.capMethodName, + Metadata: &types.MessageBody_TriggerRegistrationMetadata{ + TriggerRegistrationMetadata: &types.TriggerRegistrationMetadata{ + WorkflowId: key.workflowID, + TriggerId: key.triggerID, + Status: reg.registrationStatus, + RegistrationError: reg.registrationErrorMessage, + }, + }, + } + if err := p.dispatcher.Send(peerID, msg); err != nil { + p.lggr.Errorw("failed to send trigger registration status", "peerID", peerID, "err", err) + } +} + func (p *triggerPublisher) Close() error { close(p.stopCh) p.wg.Wait() diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index a65204ead4b..1613fdb4fc8 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -269,7 +269,8 @@ func newServices(t *testing.T, capabilityDONID uint32, workflowDONID uint32, max func allowRegistrationChecks(dispatcher *mocks.Dispatcher) { dispatcher.On("Send", mock.Anything, mock.MatchedBy(func(m *remotetypes.MessageBody) bool { - return m.Method == remotetypes.MethodTriggerRegistrationCheck + return m.Method == remotetypes.MethodTriggerRegistrationCheck || + m.Method == remotetypes.MethodTriggerRegistrationStatus })).Return(nil).Maybe() } @@ -540,6 +541,9 @@ func TestTriggerPublisher_SendsRegistrationChecks(t *testing.T) { require.NoError(t, publisher.SetConfig(config, underlying, capDonInfo, workflowDONs)) checkReceived := make(chan *remotetypes.MessageBody, 10) + dispatcher.On("Send", mock.Anything, mock.MatchedBy(func(m *remotetypes.MessageBody) bool { + return m.Method == remotetypes.MethodTriggerRegistrationStatus + })).Return(nil).Maybe() dispatcher.On("Send", mock.Anything, mock.MatchedBy(func(m *remotetypes.MessageBody) bool { return m.Method == remotetypes.MethodTriggerRegistrationCheck })).Run(func(args mock.Arguments) { @@ -603,6 +607,9 @@ func TestTriggerPublisher_RegistrationChecksChunkByMaxBatchSize(t *testing.T) { var mu sync.Mutex var chunkLens []int + dispatcher.On("Send", peers[1], mock.MatchedBy(func(m *remotetypes.MessageBody) bool { + return m.Method == remotetypes.MethodTriggerRegistrationStatus + })).Return(nil).Maybe() dispatcher.On("Send", peers[1], mock.MatchedBy(func(m *remotetypes.MessageBody) bool { return m.Method == remotetypes.MethodTriggerRegistrationCheck })).Run(func(args mock.Arguments) { @@ -1094,6 +1101,7 @@ func TestTriggerPublisher_RegisterTrigger_FailureShortCircuit(t *testing.T) { underlying := &errTrigger{info: capInfo, err: userErr} dispatcher := mocks.NewDispatcher(t) + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Maybe() config := &commoncap.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, @@ -1147,6 +1155,7 @@ func TestTriggerPublisher_RegisterTrigger_FailureShortCircuit(t *testing.T) { underlying := &errTrigger{info: capInfo, err: errors.New("transient system failure")} dispatcher := mocks.NewDispatcher(t) + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Maybe() config := &commoncap.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, diff --git a/core/capabilities/remote/trigger_registration_load_test.go b/core/capabilities/remote/trigger_registration_load_test.go index 5d38bb09629..86cfd6e43f0 100644 --- a/core/capabilities/remote/trigger_registration_load_test.go +++ b/core/capabilities/remote/trigger_registration_load_test.go @@ -16,10 +16,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -199,26 +199,26 @@ func TestRegistrationTrafficVolume(t *testing.T) { MessageExpiry: time.Hour, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(cfg.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(cfg, capInfo, workflowDon.ID, capDon, agg)) - // Register all triggers before Start — RegisterTrigger does not - // send to the cap DON until registrationLoop runs after Start. + require.NoError(t, subscriber.Start(t.Context())) + t.Cleanup(func() { subscriber.Close() }) + + // Without a publisher, RegisterTrigger times out but leaves registrations + // active for the refresh loop to re-send on each tick. for i := range tc.nRegistrations { req := commoncap.TriggerRegistrationRequest{ TriggerID: fmt.Sprintf("trigger_%d", i), Metadata: commoncap.RequestMetadata{WorkflowID: generateWorkflowID(i)}, } - _, err := subscriber.RegisterTrigger(testutils.Context(t), req) - require.NoError(t, err) + _, err := subscriber.RegisterTrigger(t.Context(), req) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) } - // Reset counters after initial registration sends, then start - // the loop so the first tick generates a clean measurement. + // Reset counters after registration burst so the next tick is a clean measurement. dispatcher.Reset() - require.NoError(t, subscriber.Start(testutils.Context(t))) - t.Cleanup(func() { subscriber.Close() }) // Wait for exactly one tick (500ms refresh, wait 700ms to give time) expectedSends := int64(tc.nRegistrations * tc.capDonSize) @@ -264,7 +264,7 @@ func TestRegistrationCheckTrafficVolume(t *testing.T) { for _, tc := range cases { t.Run(fmt.Sprintf("N=%d", tc.nRegistrations), func(t *testing.T) { - ctx := testutils.Context(t) + ctx := t.Context() lggr := logger.Test(t) dispatcher := newCountingDispatcher() @@ -466,7 +466,7 @@ func TestRegistrationLoopLockDuration(t *testing.T) { MessageExpiry: time.Hour, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(cfg.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(cfg, capInfo, workflowDon.ID, capDon, agg)) @@ -475,15 +475,15 @@ func TestRegistrationLoopLockDuration(t *testing.T) { TriggerID: fmt.Sprintf("trigger_%d", i), Metadata: commoncap.RequestMetadata{WorkflowID: generateWorkflowID(i)}, } - _, err := subscriber.RegisterTrigger(testutils.Context(t), req) - require.NoError(t, err) + _, err := subscriber.RegisterTrigger(t.Context(), req) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) } dispatcher.Reset() expectedSends := int64(n * 4) // n registrations * 4 cap DON members - require.NoError(t, subscriber.Start(testutils.Context(t))) + require.NoError(t, subscriber.Start(t.Context())) t.Cleanup(func() { subscriber.Close() }) start := time.Now() @@ -519,7 +519,7 @@ func TestRegistrationLoopLockDuration(t *testing.T) { // DON peers, workflow F=2 → quorum 5). This does not simulate dispatcher drops // or shared-channel saturation; see test log for that limitation. func TestTrafficAttribution_RegisterLoopVsChecksVsEventsAndAcks(t *testing.T) { - ctx := testutils.Context(t) + ctx := t.Context() lggr := logger.Test(t) const ( @@ -550,20 +550,21 @@ func TestTrafficAttribution_RegisterLoopVsChecksVsEventsAndAcks(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: time.Hour, } - sub := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", subRegDisp, lggr) + regStatusTimeout := limits.NewTimeLimiter(time.Millisecond) + sub := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", subRegDisp, lggr, regStatusTimeout) agg := aggregation.NewDefaultModeAggregator(subCfg.MinResponsesToAggregate) require.NoError(t, sub.SetConfig(subCfg, capInfo, wfDon.ID, capDon, agg)) + require.NoError(t, sub.Start(ctx)) + t.Cleanup(func() { require.NoError(t, sub.Close()) }) for i := range nRegistrations { req := commoncap.TriggerRegistrationRequest{ TriggerID: fmt.Sprintf("trigger_%d", i), Metadata: commoncap.RequestMetadata{WorkflowID: generateWorkflowID(i)}, } _, err := sub.RegisterTrigger(ctx, req) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) } subRegDisp.Reset() - require.NoError(t, sub.Start(ctx)) - t.Cleanup(func() { require.NoError(t, sub.Close()) }) perTickReg := int64(nRegistrations * capDonPeerCount) deadline := time.Now().Add(3 * time.Second) @@ -579,7 +580,7 @@ func TestTrafficAttribution_RegisterLoopVsChecksVsEventsAndAcks(t *testing.T) { // --- Phase 2: subscriber — one engine round-trip: deliver event + AckEvent (ACK fan-out to cap DON) --- subAckDisp := newCountingDispatcher() - sub2 := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", subAckDisp, lggr) + sub2 := remote.NewTriggerSubscriber(capInfo.ID, "LogTrigger", subAckDisp, lggr, regStatusTimeout) require.NoError(t, sub2.SetConfig(subCfg, capInfo, wfDon.ID, capDon, agg)) require.NoError(t, sub2.Start(ctx)) t.Cleanup(func() { require.NoError(t, sub2.Close()) }) @@ -590,7 +591,7 @@ func TestTrafficAttribution_RegisterLoopVsChecksVsEventsAndAcks(t *testing.T) { Metadata: commoncap.RequestMetadata{WorkflowID: workflowID1}, } _, err := sub2.RegisterTrigger(ctx, regReq) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) subAckDisp.Reset() ev := buildTriggerEventWithTriggerID(t, capPeers[0][:], workflowID1, soloTrig, "event-attr-1") diff --git a/core/capabilities/remote/trigger_registration_state.go b/core/capabilities/remote/trigger_registration_state.go new file mode 100644 index 00000000000..0011b44bc89 --- /dev/null +++ b/core/capabilities/remote/trigger_registration_state.go @@ -0,0 +1,182 @@ +package remote + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +const registrationStatusUpdateCacheKey = "subscriber_registration" + +type triggerRegistrationState struct { + lggr logger.Logger + triggerResponseCh chan commoncap.TriggerResponse + rawRequest []byte + + statusUpdateCache *messagecache.MessageCache[string, p2ptypes.PeerID] + + statusMu sync.RWMutex + registrationStatus types.RegistrationStatus + registrationError caperrors.Error + registrationFinalized bool + initialRegistrationResponseChan chan struct{} +} + +func newTriggerRegistrationState(lggr logger.Logger, rawRequest []byte, workflowID, triggerID string) *triggerRegistrationState { + return &triggerRegistrationState{ + lggr: logger.With(lggr, "workflowID", workflowID, "triggerID", triggerID), + triggerResponseCh: make(chan commoncap.TriggerResponse, sendChannelBufferSize), + rawRequest: rawRequest, + statusUpdateCache: messagecache.NewMessageCache[string, p2ptypes.PeerID](), + initialRegistrationResponseChan: make(chan struct{}), + } +} + +func (s *triggerRegistrationState) handleRegistrationStatusUpdate( + sender p2ptypes.PeerID, + msg *types.MessageBody, + minResponsesToAggregate uint32, + registrationExpiry time.Duration, + publisherNodeCount int, +) { + nowMs := time.Now().UnixMilli() + s.statusUpdateCache.DeleteOlderThan(nowMs - registrationExpiry.Milliseconds()) + + meta := msg.GetTriggerRegistrationMetadata() + if meta == nil { + s.lggr.Errorw("received trigger registration status with nil metadata", "sender", sender) + return + } + + if meta.Status == types.RegistrationStatus_REGISTRATION_ERROR { + s.statusUpdateCache.Insert(registrationStatusUpdateCacheKey, sender, nowMs, []byte(meta.RegistrationError)) + } else { + s.statusUpdateCache.Insert(registrationStatusUpdateCacheKey, sender, nowMs, nil) + } + + ready, _, registrationResponses := s.statusUpdateCache.Ready( + registrationStatusUpdateCacheKey, + minResponsesToAggregate, + nowMs-registrationExpiry.Milliseconds(), + false, + ) + if !ready { + return + } + + var successfulRegistrationCount uint32 + totalErrorCount := 0 + errorToCount := map[string]uint32{} + for _, responseError := range registrationResponses { + if len(responseError) > 0 { + errorStr := string(responseError) + errorToCount[errorStr]++ + totalErrorCount++ + } else { + successfulRegistrationCount++ + } + } + + if successfulRegistrationCount >= minResponsesToAggregate { + s.lggr.Infow("successful remote trigger registration", "sender", sender) + s.setRegistrationStatus(types.RegistrationStatus_REGISTERED, nil) + return + } + + errStrs := make([]string, 0, len(errorToCount)) + for errStr := range errorToCount { + errStrs = append(errStrs, errStr) + } + sort.Strings(errStrs) + + lastErr := "" + for _, errStr := range errStrs { + count := errorToCount[errStr] + if count >= minResponsesToAggregate { + capErr := caperrors.DeserializeErrorFromString(errStr) + s.setRegistrationStatus(types.RegistrationStatus_REGISTRATION_ERROR, capErr) + return + } + lastErr = errStr + } + + if totalErrorCount >= publisherNodeCount-int(minResponsesToAggregate)+1 { + s.setRegistrationStatus( + types.RegistrationStatus_REGISTRATION_ERROR, + caperrors.NewPublicSystemError( + fmt.Errorf("received %d errors, last error: %s", totalErrorCount, SanitizeLogString(lastErr)), + caperrors.ConsensusFailed, + ), + ) + s.lggr.Warnw("failed to achieve consensus on trigger registration errors", "errors", errStrs) + } +} + +func (s *triggerRegistrationState) awaitRegistration(ctx context.Context) error { + registrationStatus, registrationErr := s.getRegistrationStatus() + switch registrationStatus { + case types.RegistrationStatus_REGISTERED: + return nil + case types.RegistrationStatus_REGISTRATION_ERROR: + return registrationErr + default: + select { + case <-ctx.Done(): + return commoncap.ErrUnableToDetermineRegistrationStatus + case <-s.initialRegistrationResponseChan: + _, registrationErr = s.getRegistrationStatus() + return registrationErr + } + } +} + +func (s *triggerRegistrationState) setRegistrationStatus(status types.RegistrationStatus, registrationError caperrors.Error) { + s.statusMu.Lock() + defer s.statusMu.Unlock() + if s.registrationStatus == types.RegistrationStatus_UNREGISTERED && status != types.RegistrationStatus_UNREGISTERED { + close(s.initialRegistrationResponseChan) + } + s.registrationStatus = status + s.registrationError = registrationError + if status == types.RegistrationStatus_REGISTERED || status == types.RegistrationStatus_REGISTRATION_ERROR { + s.registrationFinalized = true + } +} + +func (s *triggerRegistrationState) getRegistrationStatus() (types.RegistrationStatus, caperrors.Error) { + s.statusMu.RLock() + defer s.statusMu.RUnlock() + return s.registrationStatus, s.registrationError +} + +func (s *triggerRegistrationState) isRegistrationFinalized() bool { + s.statusMu.RLock() + defer s.statusMu.RUnlock() + return s.registrationFinalized +} + +func (s *triggerRegistrationState) updateRegistrationRequest(rawRequest []byte) { + s.rawRequest = rawRequest +} + +func (s *triggerRegistrationState) getRawRequest() []byte { + return s.rawRequest +} + +func (s *triggerRegistrationState) getTriggerResponseChannel() chan commoncap.TriggerResponse { + return s.triggerResponseCh +} + +func (s *triggerRegistrationState) close() { + close(s.triggerResponseCh) +} diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 11eba607f53..3d489239935 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -37,11 +38,13 @@ type triggerSubscriber struct { // - Better logging and debugging. // - Easier migration. // workflowID -> triggerID -> subRegState - registeredWorkflows map[string]map[string]*subRegState + registeredWorkflows map[string]map[string]*triggerRegistrationState // Records that the workflow engine already issued ACK fan-out for this logical event // (registration TriggerID + TriggerEventId). Used to replay ACK on duplicate Receive without // re-aggregating or delivering to the engine again. ackReplayCache map[ackReplayKey]int64 + initialRegistrationRequestCh chan *triggerRegistrationState + registrationStatusUpdateTimeout limits.TimeLimiter mu sync.RWMutex // protects registeredWorkflows, messageCache, and ackReplayCache stopCh services.StopChan wg sync.WaitGroup @@ -68,11 +71,6 @@ type ackReplayKey struct { triggerEventID string } -type subRegState struct { - callback chan commoncap.TriggerResponse - rawRequest []byte -} - type TriggerSubscriber interface { commoncap.TriggerCapability Receive(ctx context.Context, msg *types.MessageBody) @@ -89,16 +87,21 @@ const ( maxBatchedWorkflowIDs = 1000 ) -func NewTriggerSubscriber(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger) *triggerSubscriber { +func NewTriggerSubscriber(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger, registrationStatusUpdateTimeout limits.TimeLimiter) *triggerSubscriber { + if registrationStatusUpdateTimeout == nil { + registrationStatusUpdateTimeout = limits.NewTimeLimiter(30 * time.Second) + } return &triggerSubscriber{ - capabilityID: capabilityID, - capMethodName: capMethodName, - dispatcher: dispatcher, - messageCache: messagecache.NewMessageCache[triggerEventKey, p2ptypes.PeerID](), - ackReplayCache: make(map[ackReplayKey]int64), - registeredWorkflows: make(map[string]map[string]*subRegState), - stopCh: make(services.StopChan), - lggr: logger.With(logger.Named(lggr, "TriggerSubscriber"), "capabilityID", capabilityID, "capMethodName", capMethodName), + capabilityID: capabilityID, + capMethodName: capMethodName, + dispatcher: dispatcher, + messageCache: messagecache.NewMessageCache[triggerEventKey, p2ptypes.PeerID](), + ackReplayCache: make(map[ackReplayKey]int64), + registeredWorkflows: make(map[string]map[string]*triggerRegistrationState), + initialRegistrationRequestCh: make(chan *triggerRegistrationState, 1), + registrationStatusUpdateTimeout: registrationStatusUpdateTimeout, + stopCh: make(services.StopChan), + lggr: logger.With(logger.Named(lggr, "TriggerSubscriber"), "capabilityID", capabilityID, "capMethodName", capMethodName), } } @@ -189,26 +192,58 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc } s.mu.Lock() - defer s.mu.Unlock() s.lggr.Infow("RegisterTrigger called", "donId", cfg.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID, "triggerID", request.TriggerID) triggerMap, ok := s.registeredWorkflows[request.Metadata.WorkflowID] if !ok { - triggerMap = make(map[string]*subRegState) + triggerMap = make(map[string]*triggerRegistrationState) s.registeredWorkflows[request.Metadata.WorkflowID] = triggerMap } - regState, ok := triggerMap[request.TriggerID] - if !ok { - regState = &subRegState{ - callback: make(chan commoncap.TriggerResponse, sendChannelBufferSize), - rawRequest: rawRequest, - } - triggerMap[request.TriggerID] = regState + reg, existingRegistration := triggerMap[request.TriggerID] + if !existingRegistration { + reg = newTriggerRegistrationState(s.lggr, rawRequest, request.Metadata.WorkflowID, request.TriggerID) + triggerMap[request.TriggerID] = reg } else { - regState.rawRequest = rawRequest + reg.updateRegistrationRequest(rawRequest) s.lggr.Warnw("RegisterTrigger re-registering trigger", "donId", cfg.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID, "triggerID", request.TriggerID) } + s.mu.Unlock() + + if existingRegistration { + return reg.getTriggerResponseChannel(), nil + } + + select { + case s.initialRegistrationRequestCh <- reg: + default: + s.sendRegistrationRequestToCapabilityDon(cfg, reg) + } + + subCtx, subCancel, err := s.registrationStatusUpdateTimeout.WithTimeout(ctx) + if err != nil { + s.lggr.Errorw("failed to create timeout context for trigger registration status update", "err", err) + return reg.getTriggerResponseChannel(), commoncap.ErrUnableToDetermineRegistrationStatus + } + defer subCancel() + + regErr := reg.awaitRegistration(subCtx) + if regErr == nil { + return reg.getTriggerResponseChannel(), nil + } + if errors.Is(regErr, commoncap.ErrUnableToDetermineRegistrationStatus) { + s.lggr.Warnw("unable to determine registration status", "donId", cfg.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID, "triggerID", request.TriggerID) + return reg.getTriggerResponseChannel(), regErr + } + + s.mu.Lock() + delete(triggerMap, request.TriggerID) + if len(triggerMap) == 0 { + delete(s.registeredWorkflows, request.Metadata.WorkflowID) + } + s.mu.Unlock() + reg.close() - return regState.callback, nil + s.lggr.Errorw("registration error occurred", "donId", cfg.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID, "triggerID", request.TriggerID, "error", regErr) + return nil, regErr } func (s *triggerSubscriber) registrationLoop() { @@ -221,6 +256,9 @@ func (s *triggerSubscriber) registrationLoop() { select { case <-s.stopCh: return + case reg := <-s.initialRegistrationRequestCh: + cfg := s.cfg.Load() + s.sendRegistrationRequestToCapabilityDon(cfg, reg) case <-ticker.C: cfg := s.cfg.Load() if cfg.remoteConfig.RegistrationRefresh != tickerDuration { @@ -230,42 +268,32 @@ func (s *triggerSubscriber) registrationLoop() { s.mu.RLock() s.lggr.Infow("register trigger for remote capability", "donId", cfg.capDonInfo.ID, "nMembers", len(cfg.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows)) - var totalRegistrations, totalP2PSends, totalSendErrors int - for _, regMap := range s.registeredWorkflows { - totalRegistrations += len(regMap) - } - s.lggr.Infow("registrationLoop tick: sending registrations", - "donId", cfg.capDonInfo.ID, - "nCapDonMembers", len(cfg.capDonInfo.Members), - "nWorkflows", len(s.registeredWorkflows), - "nRegistrations", totalRegistrations, - "expectedP2PSends", totalRegistrations*len(cfg.capDonInfo.Members)) for _, regMap := range s.registeredWorkflows { - for _, registration := range regMap { - for _, peerID := range cfg.capDonInfo.Members { - m := &types.MessageBody{ - CapabilityId: cfg.capInfo.ID, - CapabilityDonId: cfg.capDonInfo.ID, - CallerDonId: cfg.localDonID, - Method: types.MethodRegisterTrigger, - Payload: registration.rawRequest, // triggerID is in the raw request - CapabilityMethod: s.capMethodName, - } - err := s.dispatcher.Send(peerID, m) - if err != nil { - totalSendErrors++ - s.lggr.Errorw("failed to send message", "donId", cfg.capDonInfo.ID, "peerId", peerID, "err", err) - } else { - totalP2PSends++ - } + for _, reg := range regMap { + if reg.isRegistrationFinalized() { + continue } + s.sendRegistrationRequestToCapabilityDon(cfg, reg) } } s.mu.RUnlock() - s.lggr.Infow("registrationLoop tick: completed", - "donId", cfg.capDonInfo.ID, - "p2pSendsSent", totalP2PSends, - "p2pSendErrors", totalSendErrors) + } + } +} + +func (s *triggerSubscriber) sendRegistrationRequestToCapabilityDon(cfg *dynamicConfig, reg *triggerRegistrationState) { + for _, peerID := range cfg.capDonInfo.Members { + m := &types.MessageBody{ + CapabilityId: cfg.capInfo.ID, + CapabilityDonId: cfg.capDonInfo.ID, + CallerDonId: cfg.localDonID, + Method: types.MethodRegisterTrigger, + Payload: reg.getRawRequest(), + CapabilityMethod: s.capMethodName, + } + err := s.dispatcher.Send(peerID, m) + if err != nil { + s.lggr.Errorw("failed to send message", "donId", cfg.capDonInfo.ID, "peerId", peerID, "err", err) } } } @@ -278,16 +306,14 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo if !ok { return nil } - state := triggerMap[request.TriggerID] - if state != nil && state.callback != nil { - close(state.callback) + reg := triggerMap[request.TriggerID] + if reg != nil { + reg.close() } delete(triggerMap, request.TriggerID) if len(triggerMap) == 0 { delete(s.registeredWorkflows, request.Metadata.WorkflowID) } - // Registrations will quickly expire on all remote nodes. - // Alternatively, we could send UnregisterTrigger messages right away. return nil } @@ -325,16 +351,14 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { } s.mu.RLock() triggerMap, found := s.registeredWorkflows[workflowID] - var registration *subRegState + var registration *triggerRegistrationState if found { if triggerID != "" { - // received a message from updated publisher, which provided a triggerID registration = triggerMap[triggerID] } else { - // legacy flow, expect there to be only a single trigger of each type per workflow for tid, reg := range triggerMap { registration = reg - triggerID = tid // canonical registration id for caches and AckEvent replay + triggerID = tid break } if len(triggerMap) > 1 { @@ -356,7 +380,6 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { rk := ackReplayKey{triggerID: triggerID, triggerEventID: meta.TriggerEventId} s.mu.Lock() if _, ok := s.ackReplayCache[rk]; ok { - // Event has already been ACKd by engine, so we don't need to re-deliver s.mu.Unlock() ctx, cancel := s.stopCh.NewCtx() err := s.AckEvent(ctx, triggerID, meta.TriggerEventId, s.capMethodName) @@ -372,7 +395,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { nowMs := time.Now().UnixMilli() creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload) - ready, payloads := s.messageCache.Ready(key, cfg.remoteConfig.MinResponsesToAggregate, nowMs-cfg.remoteConfig.MessageExpiry.Milliseconds(), true) + ready, _, payloads := s.messageCache.Ready(key, cfg.remoteConfig.MinResponsesToAggregate, nowMs-cfg.remoteConfig.MessageExpiry.Milliseconds(), true) s.mu.Unlock() s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "workflowId", workflowID, "triggerID", triggerID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", cfg.remoteConfig.MinResponsesToAggregate) if ready { @@ -382,9 +405,35 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { continue } s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "workflowId", workflowID, "triggerID", triggerID) - registration.callback <- aggregatedResponse + registration.getTriggerResponseChannel() <- aggregatedResponse } } + case types.MethodTriggerRegistrationStatus: + meta := msg.GetTriggerRegistrationMetadata() + if meta == nil { + s.lggr.Errorw("received trigger registration status with nil metadata", "sender", sender) + return + } + + s.mu.RLock() + triggerMap, found := s.registeredWorkflows[meta.WorkflowId] + var reg *triggerRegistrationState + if found { + reg = triggerMap[meta.TriggerId] + } + s.mu.RUnlock() + if reg == nil { + s.lggr.Debugw("received registration status for unknown registration", "workflowID", meta.WorkflowId, "triggerID", meta.TriggerId, "sender", sender) + return + } + + reg.handleRegistrationStatusUpdate( + sender, + msg, + cfg.remoteConfig.MinResponsesToAggregate, + cfg.remoteConfig.MessageExpiry, + len(cfg.capDonInfo.Members), + ) case types.MethodTriggerRegistrationCheck: meta := msg.GetTriggerEventMetadata() if meta == nil { diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 0ce8cae9107..a8c1265e505 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-protos/cre/go/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" @@ -46,7 +47,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) @@ -57,7 +58,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { }, } triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), req) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) // calling UnregisterTrigger repeatedly is safe @@ -88,7 +89,7 @@ func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { MinResponsesToAggregate: 2, MessageExpiry: 10 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) @@ -99,7 +100,7 @@ func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { }, } triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), regReq) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), regReq)) require.NoError(t, subscriber.Close()) @@ -140,7 +141,7 @@ func TestTriggerSubscriber_SetConfig_Basic(t *testing.T) { t.Run("returns error when capability info ID doesn't match subscriber's ID", func(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) config := &commoncap.RemoteTriggerConfig{} mismatchedCapInfo := commoncap.CapabilityInfo{ID: "different_id", CapabilityType: commoncap.CapabilityTypeTrigger} err := subscriber.SetConfig(config, mismatchedCapInfo, workflowDon.ID, capDon, agg) @@ -152,7 +153,7 @@ func TestTriggerSubscriber_SetConfig_Basic(t *testing.T) { t.Run("returns error when aggregator is nil", func(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) config := &commoncap.RemoteTriggerConfig{} err := subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, nil) require.Error(t, err) @@ -161,7 +162,7 @@ func TestTriggerSubscriber_SetConfig_Basic(t *testing.T) { t.Run("updates existing config", func(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) // Set initial config initialConfig := &commoncap.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, @@ -186,7 +187,7 @@ func TestTriggerSubscriber_SetConfig_Basic(t *testing.T) { }) t.Run("handles nil initial config", func(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) // Set initial config as nil err := subscriber.SetConfig(nil, capInfo, workflowDon.ID, capDon, agg) require.NoError(t, err) @@ -210,7 +211,7 @@ func TestTriggerSubscriber_MultipleTriggersSameWorkflow(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) @@ -230,9 +231,9 @@ func TestTriggerSubscriber_MultipleTriggersSameWorkflow(t *testing.T) { } callbackCh1, err := subscriber.RegisterTrigger(t.Context(), req1) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) callbackCh2, err := subscriber.RegisterTrigger(t.Context(), req2) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req1)) @@ -282,7 +283,7 @@ func TestTriggerSubscriber_LegacyMessageWithoutTriggerID(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) @@ -296,7 +297,7 @@ func TestTriggerSubscriber_LegacyMessageWithoutTriggerID(t *testing.T) { } callbackCh, err := subscriber.RegisterTrigger(t.Context(), req) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) @@ -324,7 +325,7 @@ func TestTriggerSubscriber_AckReplayOnDuplicateReceive(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) @@ -337,7 +338,7 @@ func TestTriggerSubscriber_AckReplayOnDuplicateReceive(t *testing.T) { }, } callbackCh, err := subscriber.RegisterTrigger(t.Context(), req) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) require.NoError(t, subscriber.Close()) @@ -385,7 +386,7 @@ func TestTriggerSubscriber_UnregisterOneTriggerKeepsOther(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) @@ -405,9 +406,9 @@ func TestTriggerSubscriber_UnregisterOneTriggerKeepsOther(t *testing.T) { } _, err := subscriber.RegisterTrigger(t.Context(), req1) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) callbackCh2, err := subscriber.RegisterTrigger(t.Context(), req2) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) t.Cleanup(func() { require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req2)) @@ -446,7 +447,7 @@ func TestTriggerSubscriber_RegistrationCheck(t *testing.T) { MessageExpiry: time.Minute, } - sub := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + sub := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr, limits.NewTimeLimiter(time.Millisecond)) agg := aggregation.NewDefaultModeAggregator(1) require.NoError(t, sub.SetConfig(cfg, capInfo, workflowDon.ID, capDon, agg)) @@ -482,7 +483,7 @@ func TestTriggerSubscriber_RegistrationCheck(t *testing.T) { WorkflowID: workflowID1, }, }) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) dispatcher.Calls = nil sub.Receive(t.Context(), buildCheckMsg(workflowID1, "triggerA")) @@ -539,7 +540,7 @@ func TestTriggerSubscriber_RegistrationCheck(t *testing.T) { }, } _, err := sub.RegisterTrigger(t.Context(), req) - require.NoError(t, err) + require.ErrorIs(t, err, commoncap.ErrUnableToDetermineRegistrationStatus) // While registered, a check should NOT send anything (no resend). dispatcher.Calls = nil diff --git a/core/capabilities/remote/types/messages.pb.go b/core/capabilities/remote/types/messages.pb.go index 3eec0ed5a95..822dad20219 100644 --- a/core/capabilities/remote/types/messages.pb.go +++ b/core/capabilities/remote/types/messages.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc v5.29.3 // source: core/capabilities/remote/types/messages.proto @@ -79,6 +79,55 @@ func (Error) EnumDescriptor() ([]byte, []int) { return file_core_capabilities_remote_types_messages_proto_rawDescGZIP(), []int{0} } +type RegistrationStatus int32 + +const ( + RegistrationStatus_UNREGISTERED RegistrationStatus = 0 + RegistrationStatus_REGISTERED RegistrationStatus = 1 + RegistrationStatus_REGISTRATION_ERROR RegistrationStatus = 2 +) + +// Enum value maps for RegistrationStatus. +var ( + RegistrationStatus_name = map[int32]string{ + 0: "UNREGISTERED", + 1: "REGISTERED", + 2: "REGISTRATION_ERROR", + } + RegistrationStatus_value = map[string]int32{ + "UNREGISTERED": 0, + "REGISTERED": 1, + "REGISTRATION_ERROR": 2, + } +) + +func (x RegistrationStatus) Enum() *RegistrationStatus { + p := new(RegistrationStatus) + *p = x + return p +} + +func (x RegistrationStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (RegistrationStatus) Descriptor() protoreflect.EnumDescriptor { + return file_core_capabilities_remote_types_messages_proto_enumTypes[1].Descriptor() +} + +func (RegistrationStatus) Type() protoreflect.EnumType { + return &file_core_capabilities_remote_types_messages_proto_enumTypes[1] +} + +func (x RegistrationStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use RegistrationStatus.Descriptor instead. +func (RegistrationStatus) EnumDescriptor() ([]byte, []int) { + return file_core_capabilities_remote_types_messages_proto_rawDescGZIP(), []int{1} +} + type Message struct { state protoimpl.MessageState `protogen:"open.v1"` Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` @@ -321,6 +370,10 @@ func (*MessageBody_TriggerEventMetadata) isMessageBody_Metadata() {} type TriggerRegistrationMetadata struct { state protoimpl.MessageState `protogen:"open.v1"` LastReceivedEventId string `protobuf:"bytes,1,opt,name=last_received_event_id,json=lastReceivedEventId,proto3" json:"last_received_event_id,omitempty"` + TriggerId string `protobuf:"bytes,2,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + WorkflowId string `protobuf:"bytes,3,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + Status RegistrationStatus `protobuf:"varint,4,opt,name=status,proto3,enum=remote.RegistrationStatus" json:"status,omitempty"` + RegistrationError string `protobuf:"bytes,5,opt,name=registration_error,json=registrationError,proto3" json:"registration_error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -362,6 +415,34 @@ func (x *TriggerRegistrationMetadata) GetLastReceivedEventId() string { return "" } +func (x *TriggerRegistrationMetadata) GetTriggerId() string { + if x != nil { + return x.TriggerId + } + return "" +} + +func (x *TriggerRegistrationMetadata) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (x *TriggerRegistrationMetadata) GetStatus() RegistrationStatus { + if x != nil { + return x.Status + } + return RegistrationStatus_UNREGISTERED +} + +func (x *TriggerRegistrationMetadata) GetRegistrationError() string { + if x != nil { + return x.RegistrationError + } + return "" +} + type TriggerEventMetadata struct { state protoimpl.MessageState `protogen:"open.v1"` TriggerEventId string `protobuf:"bytes,1,opt,name=trigger_event_id,json=triggerEventId,proto3" json:"trigger_event_id,omitempty"` @@ -449,9 +530,15 @@ const file_core_capabilities_remote_types_messages_proto_rawDesc = "" + "\rcaller_don_id\x18\x10 \x01(\rR\vcallerDonId\x12+\n" + "\x11capability_method\x18\x11 \x01(\tR\x10capabilityMethodB\n" + "\n" + - "\bmetadataJ\x04\b\a\x10\bJ\x04\b\b\x10\t\"R\n" + + "\bmetadataJ\x04\b\a\x10\bJ\x04\b\b\x10\t\"\xf5\x01\n" + "\x1bTriggerRegistrationMetadata\x123\n" + - "\x16last_received_event_id\x18\x01 \x01(\tR\x13lastReceivedEventId\"\x84\x01\n" + + "\x16last_received_event_id\x18\x01 \x01(\tR\x13lastReceivedEventId\x12\x1d\n" + + "\n" + + "trigger_id\x18\x02 \x01(\tR\ttriggerId\x12\x1f\n" + + "\vworkflow_id\x18\x03 \x01(\tR\n" + + "workflowId\x122\n" + + "\x06status\x18\x04 \x01(\x0e2\x1a.remote.RegistrationStatusR\x06status\x12-\n" + + "\x12registration_error\x18\x05 \x01(\tR\x11registrationError\"\x84\x01\n" + "\x14TriggerEventMetadata\x12(\n" + "\x10trigger_event_id\x18\x01 \x01(\tR\x0etriggerEventId\x12!\n" + "\fworkflow_ids\x18\x02 \x03(\tR\vworkflowIds\x12\x1f\n" + @@ -463,7 +550,12 @@ const file_core_capabilities_remote_types_messages_proto_rawDesc = "" + "\x14CAPABILITY_NOT_FOUND\x10\x02\x12\x13\n" + "\x0fINVALID_REQUEST\x10\x03\x12\v\n" + "\aTIMEOUT\x10\x04\x12\x12\n" + - "\x0eINTERNAL_ERROR\x10\x05B Z\x1ecore/capabilities/remote/typesb\x06proto3" + "\x0eINTERNAL_ERROR\x10\x05*N\n" + + "\x12RegistrationStatus\x12\x10\n" + + "\fUNREGISTERED\x10\x00\x12\x0e\n" + + "\n" + + "REGISTERED\x10\x01\x12\x16\n" + + "\x12REGISTRATION_ERROR\x10\x02B Z\x1ecore/capabilities/remote/typesb\x06proto3" var ( file_core_capabilities_remote_types_messages_proto_rawDescOnce sync.Once @@ -477,24 +569,26 @@ func file_core_capabilities_remote_types_messages_proto_rawDescGZIP() []byte { return file_core_capabilities_remote_types_messages_proto_rawDescData } -var file_core_capabilities_remote_types_messages_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_core_capabilities_remote_types_messages_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_core_capabilities_remote_types_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_core_capabilities_remote_types_messages_proto_goTypes = []any{ (Error)(0), // 0: remote.Error - (*Message)(nil), // 1: remote.Message - (*MessageBody)(nil), // 2: remote.MessageBody - (*TriggerRegistrationMetadata)(nil), // 3: remote.TriggerRegistrationMetadata - (*TriggerEventMetadata)(nil), // 4: remote.TriggerEventMetadata + (RegistrationStatus)(0), // 1: remote.RegistrationStatus + (*Message)(nil), // 2: remote.Message + (*MessageBody)(nil), // 3: remote.MessageBody + (*TriggerRegistrationMetadata)(nil), // 4: remote.TriggerRegistrationMetadata + (*TriggerEventMetadata)(nil), // 5: remote.TriggerEventMetadata } var file_core_capabilities_remote_types_messages_proto_depIdxs = []int32{ 0, // 0: remote.MessageBody.error:type_name -> remote.Error - 3, // 1: remote.MessageBody.trigger_registration_metadata:type_name -> remote.TriggerRegistrationMetadata - 4, // 2: remote.MessageBody.trigger_event_metadata:type_name -> remote.TriggerEventMetadata - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 4, // 1: remote.MessageBody.trigger_registration_metadata:type_name -> remote.TriggerRegistrationMetadata + 5, // 2: remote.MessageBody.trigger_event_metadata:type_name -> remote.TriggerEventMetadata + 1, // 3: remote.TriggerRegistrationMetadata.status:type_name -> remote.RegistrationStatus + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_core_capabilities_remote_types_messages_proto_init() } @@ -511,7 +605,7 @@ func file_core_capabilities_remote_types_messages_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_core_capabilities_remote_types_messages_proto_rawDesc), len(file_core_capabilities_remote_types_messages_proto_rawDesc)), - NumEnums: 1, + NumEnums: 2, NumMessages: 4, NumExtensions: 0, NumServices: 0, diff --git a/core/capabilities/remote/types/messages.proto b/core/capabilities/remote/types/messages.proto index 6b942d59f84..11cbd2d14fd 100644 --- a/core/capabilities/remote/types/messages.proto +++ b/core/capabilities/remote/types/messages.proto @@ -44,6 +44,16 @@ message MessageBody { message TriggerRegistrationMetadata { string last_received_event_id = 1; + string trigger_id = 2; + string workflow_id = 3; + RegistrationStatus status = 4; + string registration_error = 5; +} + +enum RegistrationStatus { + UNREGISTERED = 0; + REGISTERED = 1; + REGISTRATION_ERROR = 2; } message TriggerEventMetadata { diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index c40a59c0b55..93025942e70 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -19,6 +19,7 @@ const ( MethodTriggerEvent = "TriggerEvent" MethodExecute = "Execute" MethodTriggerEventAck = "TriggerEventACK" + MethodTriggerRegistrationStatus = "TriggerRegistrationStatus" ) type Dispatcher interface { diff --git a/core/capabilities/streams/trigger_test.go b/core/capabilities/streams/trigger_test.go index 0e529eeff17..9b65ac3db5b 100644 --- a/core/capabilities/streams/trigger_test.go +++ b/core/capabilities/streams/trigger_test.go @@ -21,6 +21,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-protos/cre/go/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -91,7 +92,7 @@ func TestStreamsTrigger(t *testing.T) { config := &capabilities.RemoteTriggerConfig{ MinResponsesToAggregate: uint32(F + 1), } - subscriber := remote.NewTriggerSubscriber(triggerID, "method", nil, lggr) + subscriber := remote.NewTriggerSubscriber(triggerID, "method", nil, lggr, limits.NewTimeLimiter(time.Millisecond)) require.NoError(t, subscriber.SetConfig(config, capInfo, 1, capDonInfo, agg)) // register trigger @@ -101,7 +102,7 @@ func TestStreamsTrigger(t *testing.T) { }, } triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req) - require.NoError(t, err) + require.ErrorIs(t, err, capabilities.ErrUnableToDetermineRegistrationStatus) // send and process all trigger events startTs := time.Now().UnixMilli() diff --git a/core/scripts/cre/environment/configs/workflow-gateway-capabilities-don.toml b/core/scripts/cre/environment/configs/workflow-gateway-capabilities-don.toml index 7d050b80dce..865b2c388bd 100644 --- a/core/scripts/cre/environment/configs/workflow-gateway-capabilities-don.toml +++ b/core/scripts/cre/environment/configs/workflow-gateway-capabilities-don.toml @@ -46,7 +46,7 @@ # because bootstrap job for capability DON will be created on the boostrap node from this DON supported_evm_chains = [1337, 2337] - env_vars = { CL_EVM_CMD = "", OTEL_SERVICE_NAME = "chainlink-node", CL_CRE_SETTINGS = '{"global":{"PerOrg":{"BaseTriggerRetransmitEnabled":"true"}}}' } + env_vars = { CL_EVM_CMD = "", OTEL_SERVICE_NAME = "chainlink-node", CL_CRE_SETTINGS = '{"global":{"PerOrg":{"BaseTriggerRetransmitEnabled":"true"},"TriggerRegistrationStatusUpdateTimeout":"30s","PerWorkflow":{"TriggerRegistrationsTimeout":"60s"}}}' } capabilities = ["cron", "http-action", "http-trigger", "consensus", "don-time", "evm-1337"] registry_based_launch_allowlist = ["cron-trigger@1.0.0"] diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index bf3d74cc9cb..7451bec9d06 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -7,6 +7,7 @@ import ( "math/big" "strconv" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/google/uuid" @@ -26,6 +27,7 @@ import ( nodeauthjwt "github.com/smartcontractkit/chainlink-common/pkg/nodeauth/jwt" commonsrv "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/storage" @@ -467,6 +469,11 @@ func (s *Services) newRegistrySyncer( return nil, nil, err } + triggerRegistrationStatusUpdateTimeout := limits.NewTimeLimiter(30 * time.Second) + if timeout, timeoutErr := cresettings.Default.TriggerRegistrationStatusUpdateTimeout.GetOrDefault(context.Background(), nil); timeoutErr == nil && timeout > 0 { + triggerRegistrationStatusUpdateTimeout = limits.NewTimeLimiter(timeout) + } + wfLauncher, err := capabilities.NewLauncher( lggr, dispatcherWrapper.externalPeerWrapper, @@ -475,6 +482,7 @@ func (s *Services) newRegistrySyncer( dispatcherWrapper.dispatcher, opts.CapabilitiesRegistry, donNotifier, + triggerRegistrationStatusUpdateTimeout, ) if err != nil { return nil, nil, fmt.Errorf("could not create workflow launcher: %w", err) diff --git a/core/services/workflows/v2/engine.go b/core/services/workflows/v2/engine.go index 153f2c7c9fb..02a978fc889 100644 --- a/core/services/workflows/v2/engine.go +++ b/core/services/workflows/v2/engine.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/aggregation" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/contexts" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" @@ -535,9 +536,22 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error { // no Config needed - NoDAG uses Payload }) if regErr != nil { - e.logger().Errorw("Trigger registration failed", "triggerID", sub.Id, "err", regErr) - e.metrics.With(platform.KeyTriggerID, sub.Id).IncrementRegisterTriggerFailureCounter(gCtx) - return fmt.Errorf("failed to register trigger %s: %w", sub.Id, regErr) + if !errors.Is(regErr, capabilities.ErrUnableToDetermineRegistrationStatus) { + var capErr caperrors.Error + if errors.As(regErr, &capErr) { + if capErr.Origin() == caperrors.OriginUser { + e.logger().Errorw("Trigger registration failed due to user error", "triggerID", sub.Id, "userErr", regErr) + } else { + e.logger().Errorw("Trigger registration failed due to system error", "triggerID", sub.Id, "systemErr", regErr) + } + e.metrics.With(platform.KeyTriggerID, sub.Id, platform.KeyCapabilityErrorCode, capErr.Code().String()).IncrementRegisterTriggerFailureCounter(gCtx) + } else { + e.logger().Errorw("Trigger registration failed", "triggerID", sub.Id, "err", regErr) + e.metrics.With(platform.KeyTriggerID, sub.Id).IncrementRegisterTriggerFailureCounter(gCtx) + } + return fmt.Errorf("failed to register trigger %s: %w", sub.Id, regErr) + } + e.logger().Warnw("unable to determine trigger registration status for trigger registration request", "triggerID", sub.Id) } // Send successful result resultsCh <- triggerRegResult{ diff --git a/system-tests/tests/smoke/cre/cre_suite_test.go b/system-tests/tests/smoke/cre/cre_suite_test.go index 784768f6d47..ec29478d98a 100644 --- a/system-tests/tests/smoke/cre/cre_suite_test.go +++ b/system-tests/tests/smoke/cre/cre_suite_test.go @@ -202,6 +202,14 @@ func Test_CRE_V2_EVM_Write_LogTrigger(t *testing.T) { testEnv := t_helpers.SetupTestEnvironmentWithPerTestKeys(t, t_helpers.GetDefaultTestConfig(t)) ExecuteEVMLogTriggerTest(t, testEnv) }) + + t.Run("[v2] EVM LogTrigger User Error - "+topology, func(t *testing.T) { + if parallelEnabled { + t.Parallel() + } + testEnv := t_helpers.SetupTestEnvironmentWithPerTestKeys(t, t_helpers.GetDefaultTestConfig(t)) + ExecuteEVMLogTriggerUserErrorTest(t, testEnv) + }) } func Test_CRE_V2_EVM_Read_HeavyCalls(t *testing.T) { diff --git a/system-tests/tests/smoke/cre/evm_capability_test.go b/system-tests/tests/smoke/cre/evm_capability_test.go index 8d250785b6f..c7e70193ee6 100644 --- a/system-tests/tests/smoke/cre/evm_capability_test.go +++ b/system-tests/tests/smoke/cre/evm_capability_test.go @@ -313,6 +313,62 @@ func configureEVMLogTriggerWorkflow(t *testing.T, lggr zerolog.Logger, chain blo }, msgEmitter } +// ExecuteEVMLogTriggerUserErrorTest deploys a log trigger workflow with invalid config (no addresses) +// and verifies the workflow engine surfaces the user error via beholder logs. +func ExecuteEVMLogTriggerUserErrorTest(t *testing.T, testEnv *ttypes.TestEnvironment) { + const workflowFileLocation = "./evm/logtrigger/main.go" + lggr := framework.L + + userLogsCh := make(chan *workflowevents.UserLogs, 1000) + baseMessageCh := make(chan *commonevents.BaseMessage, 1000) + + server := t_helpers.StartChipTestSink(t, t_helpers.GetLoggingPublishFn(lggr, userLogsCh, baseMessageCh, "./logs/evm_log_trigger_user_err.log")) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + t_helpers.ShutdownChipSinkWithDrain(ctx, server, userLogsCh, baseMessageCh) + }) + + enabledChains := t_helpers.GetEVMEnabledChains(t, testEnv) + chainsToTest := make(map[string]blockchains.Blockchain) + + for _, bcOutput := range testEnv.CreEnvironment.Blockchains { + chainID := bcOutput.CtfOutput().ChainID + if _, ok := enabledChains[chainID]; !ok { + lggr.Info().Msgf("Skipping chain %s as it is not enabled for EVM LogTrigger user error test", chainID) + continue + } + chainsToTest[chainID] = bcOutput + } + require.NotEmpty(t, chainsToTest, "no enabled EVM chains for log trigger user error test") + + ctx, cancel := context.WithTimeout(t.Context(), 4*time.Minute) + defer cancel() + + const expectedUserError = "[3]InvalidArgument: no valid addresses provided (at least one address is required)" + for chainID, bcOutput := range chainsToTest { + lggr.Info().Msgf("Creating invalid EVM LogTrigger workflow configuration for chain %s", chainID) + workflowConfig, _ := configureEVMLogTriggerWorkflow(t, lggr, bcOutput) + workflowConfig.Addresses = nil + + workflowName := fmt.Sprintf("evm-logTrigger-user-error-%s-%04d", chainID, rand.Intn(10000)) + lggr.Info().Msgf("About to deploy Workflow %s on chain %s", workflowName, chainID) + t_helpers.CompileAndDeployWorkflow(t, testEnv, lggr, workflowName, &workflowConfig, workflowFileLocation) + + _, err := t_helpers.WaitForBaseMessage( + ctx, + lggr, + baseMessageCh, + "Trigger registration failed due to user error", + t_helpers.WithBaseMessageLabelContains("userErr", expectedUserError), + ) + require.NoErrorf(t, err, "failed to find expected user error beholder log for chain %s", chainID) + } + + lggr.Info().Msg("EVM LogTrigger user error test completed successfully") +} + func ExecuteEVMLogTriggerTest(t *testing.T, testEnv *ttypes.TestEnvironment) { const workflowFileLocation = "./evm/logtrigger/main.go" lggr := framework.L