From 085b47c4e22bc058fbc8fad50b8a7559e28d3ce6 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 18 Dec 2024 15:25:23 +0000 Subject: [PATCH] CAPPL-372 update workflow registry to use the same chainreader for initialisation and event reading (#15753) * update workflow registry to use the same chainreader for initialisation and event handling * tidy * tidy --- core/services/chainlink/application.go | 5 - .../workflows/syncer/workflow_syncer_test.go | 42 ++++-- .../workflows/syncer/workflow_registry.go | 142 +++++------------- 3 files changed, 63 insertions(+), 126 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index d8b9777cb5a..4004b86c341 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -307,10 +307,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { fetcher.Fetch, workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry, custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0]) - loader := syncer.NewWorkflowRegistryContractLoader(lggr, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return relayer.NewContractReader(ctx, bytes) - }, eventHandler) - globalLogger.Debugw("Creating WorkflowRegistrySyncer") wfSyncer := syncer.NewWorkflowRegistry( lggr, @@ -322,7 +318,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { QueryCount: 100, }, eventHandler, - loader, workflowDonNotifier, ) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index c7c164803cb..6b3f7c7018d 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -125,9 +125,6 @@ func Test_EventHandlerStateSync(t *testing.T) { } testEventHandler := newTestEvtHandler() - loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return backendTH.NewContractReader(ctx, t, bytes) - }, testEventHandler) // Create the registry registry := syncer.NewWorkflowRegistry( @@ -140,7 +137,6 @@ func Test_EventHandlerStateSync(t *testing.T) { QueryCount: 20, }, testEventHandler, - loader, &testDonNotifier{ don: capabilities.DON{ ID: donID, @@ -255,9 +251,6 @@ func Test_InitialStateSync(t *testing.T) { } testEventHandler := newTestEvtHandler() - loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return backendTH.NewContractReader(ctx, t, bytes) - }, testEventHandler) // Create the worker worker := syncer.NewWorkflowRegistry( @@ -270,7 +263,6 @@ func Test_InitialStateSync(t *testing.T) { QueryCount: 20, }, testEventHandler, - loader, &testDonNotifier{ don: capabilities.DON{ ID: donID, @@ -346,8 +338,11 @@ func Test_SecretsWorker(t *testing.T) { require.NoError(t, err) require.Equal(t, contents, giveContents) - handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}) + handler := &testSecretsWorkEventHandler{ + wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, + emitter, clockwork.NewFakeClock(), workflowkey.Key{}), + registeredCh: make(chan syncer.Event, 1), + } worker := syncer.NewWorkflowRegistry( lggr, @@ -357,7 +352,6 @@ func Test_SecretsWorker(t *testing.T) { wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{QueryCount: 20}, handler, - &testWorkflowRegistryContractLoader{}, &testDonNotifier{ don: capabilities.DON{ ID: donID, @@ -374,6 +368,9 @@ func Test_SecretsWorker(t *testing.T) { servicetest.Run(t, worker) + // wait for the workflow to be registered + <-handler.registeredCh + // generate a log event requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL) @@ -434,7 +431,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{QueryCount: 20}, handler, - &testWorkflowRegistryContractLoader{}, &testDonNotifier{ don: capabilities.DON{ ID: donID, @@ -543,7 +539,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{QueryCount: 20}, handler, - &testWorkflowRegistryContractLoader{}, &testDonNotifier{ don: capabilities.DON{ ID: donID, @@ -708,3 +703,24 @@ func updateWorkflow( th.Backend.Commit() th.Backend.Commit() } + +type evtHandler interface { + Handle(ctx context.Context, event syncer.Event) error +} + +type testSecretsWorkEventHandler struct { + wrappedHandler evtHandler + registeredCh chan syncer.Event +} + +func (m *testSecretsWorkEventHandler) Handle(ctx context.Context, event syncer.Event) error { + switch { + case event.GetEventType() == syncer.ForceUpdateSecretsEvent: + return m.wrappedHandler.Handle(ctx, event) + case event.GetEventType() == syncer.WorkflowRegisteredEvent: + m.registeredCh <- event + return nil + default: + panic(fmt.Sprintf("unexpected event type: %v", event.GetEventType())) + } +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 4809f3563ca..26c23411d67 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -130,14 +130,11 @@ type workflowRegistry struct { newContractReaderFn newContractReaderFn - eventPollerCfg WorkflowEventPollerConfig - eventTypes []WorkflowRegistryEventType - handler evtHandler - initialWorkflowsStateLoader initialWorkflowsStateLoader + eventPollerCfg WorkflowEventPollerConfig + eventTypes []WorkflowRegistryEventType + handler evtHandler workflowDonNotifier donNotifier - - reader ContractReader } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -152,12 +149,6 @@ type evtHandler interface { Handle(ctx context.Context, event Event) error } -type initialWorkflowsStateLoader interface { - // LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the - // point in time at which the load occurred. - LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) -} - type donNotifier interface { WaitForDon(ctx context.Context) (capabilities.DON, error) } @@ -172,7 +163,6 @@ func NewWorkflowRegistry( addr string, eventPollerConfig WorkflowEventPollerConfig, handler evtHandler, - initialWorkflowsStateLoader initialWorkflowsStateLoader, workflowDonNotifier donNotifier, opts ...func(*workflowRegistry), ) *workflowRegistry { @@ -184,16 +174,16 @@ func NewWorkflowRegistry( WorkflowRegisteredEvent, WorkflowUpdatedEvent, } + wr := &workflowRegistry{ - lggr: lggr, - newContractReaderFn: newContractReaderFn, - workflowRegistryAddress: addr, - eventPollerCfg: eventPollerConfig, - stopCh: make(services.StopChan), - eventTypes: ets, - handler: handler, - initialWorkflowsStateLoader: initialWorkflowsStateLoader, - workflowDonNotifier: workflowDonNotifier, + lggr: lggr, + newContractReaderFn: newContractReaderFn, + workflowRegistryAddress: addr, + eventPollerCfg: eventPollerConfig, + stopCh: make(services.StopChan), + eventTypes: ets, + handler: handler, + workflowDonNotifier: workflowDonNotifier, } for _, opt := range opts { @@ -220,8 +210,14 @@ func (w *workflowRegistry) Start(_ context.Context) error { return } + reader, err := w.newWorkflowRegistryContractReader(ctx) + if err != nil { + w.lggr.Criticalf("contract reader unavailable : %s", err) + return + } + w.lggr.Debugw("Loading initial workflows for DON", "DON", don.ID) - loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx, don) + loadWorkflowsHead, err := w.loadWorkflows(ctx, don, reader) if err != nil { // TODO - this is a temporary fix to handle the case where the chainreader errors because the contract // contains no workflows. To track: https://smartcontract-it.atlassian.net/browse/CAPPL-393 @@ -235,12 +231,6 @@ func (w *workflowRegistry) Start(_ context.Context) error { } } - reader, err := w.getContractReader(ctx) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return - } - w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height) }() @@ -359,36 +349,19 @@ func (w *workflowRegistry) getTicker() <-chan time.Time { return w.ticker } -// getContractReader initializes a contract reader if needed, otherwise returns the existing -// reader. -func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) { - c := types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: w.workflowRegistryAddress, - } - - if w.reader == nil { - reader, err := getWorkflowRegistryEventReader(ctx, w.newContractReaderFn, c) - if err != nil { - return nil, err - } - - w.reader = reader - } - - return w.reader, nil -} - type sequenceWithEventType struct { Sequence types.Sequence EventType WorkflowRegistryEventType } -func getWorkflowRegistryEventReader( +func (w *workflowRegistry) newWorkflowRegistryContractReader( ctx context.Context, - newReaderFn newContractReaderFn, - bc types.BoundContract, ) (ContractReader, error) { + bc := types.BoundContract{ + Name: WorkflowRegistryContractName, + Address: w.workflowRegistryAddress, + } + contractReaderCfg := evmtypes.ChainReaderConfig{ Contracts: map[string]evmtypes.ChainContractReader{ WorkflowRegistryContractName: { @@ -404,6 +377,9 @@ func getWorkflowRegistryEventReader( }, ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, Configs: map[string]*evmtypes.ChainReaderDefinition{ + GetWorkflowMetadataListByDONMethodName: { + ChainSpecificName: GetWorkflowMetadataListByDONMethodName, + }, string(ForceUpdateSecretsEvent): { ChainSpecificName: string(ForceUpdateSecretsEvent), ReadType: evmtypes.Event, @@ -438,7 +414,7 @@ func getWorkflowRegistryEventReader( return nil, err } - reader, err := newReaderFn(ctx, marshalledCfg) + reader, err := w.newContractReaderFn(ctx, marshalledCfg) if err != nil { return nil, err } @@ -468,59 +444,9 @@ func (r workflowAsEvent) GetData() any { return r.Data } -type workflowRegistryContractLoader struct { - lggr logger.Logger - workflowRegistryAddress string - newContractReaderFn newContractReaderFn - handler evtHandler -} - -func NewWorkflowRegistryContractLoader( - lggr logger.Logger, - workflowRegistryAddress string, - newContractReaderFn newContractReaderFn, - handler evtHandler, -) *workflowRegistryContractLoader { - return &workflowRegistryContractLoader{ - lggr: lggr.Named("WorkflowRegistryContractLoader"), - workflowRegistryAddress: workflowRegistryAddress, - newContractReaderFn: newContractReaderFn, - handler: handler, - } -} - -func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - WorkflowRegistryContractName: { - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - GetWorkflowMetadataListByDONMethodName: { - ChainSpecificName: GetWorkflowMetadataListByDONMethodName, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - if err != nil { - return nil, fmt.Errorf("failed to marshal contract reader config: %w", err) - } - - contractReader, err := l.newContractReaderFn(ctx, contractReaderCfgBytes) - if err != nil { - return nil, fmt.Errorf("failed to create contract reader: %w", err) - } - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: WorkflowRegistryContractName, Address: l.workflowRegistryAddress}}) - if err != nil { - return nil, fmt.Errorf("failed to bind contract reader: %w", err) - } - +func (w *workflowRegistry) loadWorkflows(ctx context.Context, don capabilities.DON, contractReader ContractReader) (*types.Head, error) { contractBinding := types.BoundContract{ - Address: l.workflowRegistryAddress, + Address: w.workflowRegistryAddress, Name: WorkflowRegistryContractName, } @@ -540,7 +466,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don return nil, fmt.Errorf("failed to get lastest value with head data %w", err) } - l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList)) + w.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList)) for _, workflow := range workflows.WorkflowMetadataList { toRegisteredEvent := WorkflowRegistryWorkflowRegisteredV1{ WorkflowID: workflow.WorkflowID, @@ -552,11 +478,11 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don ConfigURL: workflow.ConfigURL, SecretsURL: workflow.SecretsURL, } - if err = l.handler.Handle(ctx, workflowAsEvent{ + if err = w.handler.Handle(ctx, workflowAsEvent{ Data: toRegisteredEvent, EventType: WorkflowRegisteredEvent, }); err != nil { - l.lggr.Errorf("failed to handle workflow registration: %s", err) + w.lggr.Errorf("failed to handle workflow registration: %s", err) } }