Skip to content

Commit

Permalink
CAPPL-372 update workflow registry to use the same chainreader for in…
Browse files Browse the repository at this point in the history
…itialisation and event reading (#15753)

* update workflow registry to use the same chainreader for initialisation and event handling

* tidy

* tidy
  • Loading branch information
ettec authored Dec 18, 2024
1 parent ec35d77 commit 085b47c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 126 deletions.
5 changes: 0 additions & 5 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(lggr, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, eventHandler)

globalLogger.Debugw("Creating WorkflowRegistrySyncer")
wfSyncer := syncer.NewWorkflowRegistry(
lggr,
Expand All @@ -322,7 +318,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
QueryCount: 100,
},
eventHandler,
loader,
workflowDonNotifier,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ func Test_EventHandlerStateSync(t *testing.T) {
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the registry
registry := syncer.NewWorkflowRegistry(
Expand All @@ -140,7 +137,6 @@ func Test_EventHandlerStateSync(t *testing.T) {
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -255,9 +251,6 @@ func Test_InitialStateSync(t *testing.T) {
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the worker
worker := syncer.NewWorkflowRegistry(
Expand All @@ -270,7 +263,6 @@ func Test_InitialStateSync(t *testing.T) {
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -346,8 +338,11 @@ func Test_SecretsWorker(t *testing.T) {
require.NoError(t, err)
require.Equal(t, contents, giveContents)

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})
handler := &testSecretsWorkEventHandler{
wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}),
registeredCh: make(chan syncer.Event, 1),
}

worker := syncer.NewWorkflowRegistry(
lggr,
Expand All @@ -357,7 +352,6 @@ func Test_SecretsWorker(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand All @@ -374,6 +368,9 @@ func Test_SecretsWorker(t *testing.T) {

servicetest.Run(t, worker)

// wait for the workflow to be registered
<-handler.registeredCh

// generate a log event
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)

Expand Down Expand Up @@ -434,7 +431,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -543,7 +539,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -708,3 +703,24 @@ func updateWorkflow(
th.Backend.Commit()
th.Backend.Commit()
}

type evtHandler interface {
Handle(ctx context.Context, event syncer.Event) error
}

type testSecretsWorkEventHandler struct {
wrappedHandler evtHandler
registeredCh chan syncer.Event
}

func (m *testSecretsWorkEventHandler) Handle(ctx context.Context, event syncer.Event) error {
switch {
case event.GetEventType() == syncer.ForceUpdateSecretsEvent:
return m.wrappedHandler.Handle(ctx, event)
case event.GetEventType() == syncer.WorkflowRegisteredEvent:
m.registeredCh <- event
return nil
default:
panic(fmt.Sprintf("unexpected event type: %v", event.GetEventType()))
}
}
142 changes: 34 additions & 108 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,11 @@ type workflowRegistry struct {

newContractReaderFn newContractReaderFn

eventPollerCfg WorkflowEventPollerConfig
eventTypes []WorkflowRegistryEventType
handler evtHandler
initialWorkflowsStateLoader initialWorkflowsStateLoader
eventPollerCfg WorkflowEventPollerConfig
eventTypes []WorkflowRegistryEventType
handler evtHandler

workflowDonNotifier donNotifier

reader ContractReader
}

// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful
Expand All @@ -152,12 +149,6 @@ type evtHandler interface {
Handle(ctx context.Context, event Event) error
}

type initialWorkflowsStateLoader interface {
// LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the
// point in time at which the load occurred.
LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error)
}

type donNotifier interface {
WaitForDon(ctx context.Context) (capabilities.DON, error)
}
Expand All @@ -172,7 +163,6 @@ func NewWorkflowRegistry(
addr string,
eventPollerConfig WorkflowEventPollerConfig,
handler evtHandler,
initialWorkflowsStateLoader initialWorkflowsStateLoader,
workflowDonNotifier donNotifier,
opts ...func(*workflowRegistry),
) *workflowRegistry {
Expand All @@ -184,16 +174,16 @@ func NewWorkflowRegistry(
WorkflowRegisteredEvent,
WorkflowUpdatedEvent,
}

wr := &workflowRegistry{
lggr: lggr,
newContractReaderFn: newContractReaderFn,
workflowRegistryAddress: addr,
eventPollerCfg: eventPollerConfig,
stopCh: make(services.StopChan),
eventTypes: ets,
handler: handler,
initialWorkflowsStateLoader: initialWorkflowsStateLoader,
workflowDonNotifier: workflowDonNotifier,
lggr: lggr,
newContractReaderFn: newContractReaderFn,
workflowRegistryAddress: addr,
eventPollerCfg: eventPollerConfig,
stopCh: make(services.StopChan),
eventTypes: ets,
handler: handler,
workflowDonNotifier: workflowDonNotifier,
}

for _, opt := range opts {
Expand All @@ -220,8 +210,14 @@ func (w *workflowRegistry) Start(_ context.Context) error {
return
}

reader, err := w.newWorkflowRegistryContractReader(ctx)
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return
}

w.lggr.Debugw("Loading initial workflows for DON", "DON", don.ID)
loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx, don)
loadWorkflowsHead, err := w.loadWorkflows(ctx, don, reader)
if err != nil {
// TODO - this is a temporary fix to handle the case where the chainreader errors because the contract
// contains no workflows. To track: https://smartcontract-it.atlassian.net/browse/CAPPL-393
Expand All @@ -235,12 +231,6 @@ func (w *workflowRegistry) Start(_ context.Context) error {
}
}

reader, err := w.getContractReader(ctx)
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return
}

w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height)
}()

Expand Down Expand Up @@ -359,36 +349,19 @@ func (w *workflowRegistry) getTicker() <-chan time.Time {
return w.ticker
}

// getContractReader initializes a contract reader if needed, otherwise returns the existing
// reader.
func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) {
c := types.BoundContract{
Name: WorkflowRegistryContractName,
Address: w.workflowRegistryAddress,
}

if w.reader == nil {
reader, err := getWorkflowRegistryEventReader(ctx, w.newContractReaderFn, c)
if err != nil {
return nil, err
}

w.reader = reader
}

return w.reader, nil
}

type sequenceWithEventType struct {
Sequence types.Sequence
EventType WorkflowRegistryEventType
}

func getWorkflowRegistryEventReader(
func (w *workflowRegistry) newWorkflowRegistryContractReader(
ctx context.Context,
newReaderFn newContractReaderFn,
bc types.BoundContract,
) (ContractReader, error) {
bc := types.BoundContract{
Name: WorkflowRegistryContractName,
Address: w.workflowRegistryAddress,
}

contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
WorkflowRegistryContractName: {
Expand All @@ -404,6 +377,9 @@ func getWorkflowRegistryEventReader(
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: GetWorkflowMetadataListByDONMethodName,
},
string(ForceUpdateSecretsEvent): {
ChainSpecificName: string(ForceUpdateSecretsEvent),
ReadType: evmtypes.Event,
Expand Down Expand Up @@ -438,7 +414,7 @@ func getWorkflowRegistryEventReader(
return nil, err
}

reader, err := newReaderFn(ctx, marshalledCfg)
reader, err := w.newContractReaderFn(ctx, marshalledCfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,59 +444,9 @@ func (r workflowAsEvent) GetData() any {
return r.Data
}

type workflowRegistryContractLoader struct {
lggr logger.Logger
workflowRegistryAddress string
newContractReaderFn newContractReaderFn
handler evtHandler
}

func NewWorkflowRegistryContractLoader(
lggr logger.Logger,
workflowRegistryAddress string,
newContractReaderFn newContractReaderFn,
handler evtHandler,
) *workflowRegistryContractLoader {
return &workflowRegistryContractLoader{
lggr: lggr.Named("WorkflowRegistryContractLoader"),
workflowRegistryAddress: workflowRegistryAddress,
newContractReaderFn: newContractReaderFn,
handler: handler,
}
}

func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) {
// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
WorkflowRegistryContractName: {
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: GetWorkflowMetadataListByDONMethodName,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
if err != nil {
return nil, fmt.Errorf("failed to marshal contract reader config: %w", err)
}

contractReader, err := l.newContractReaderFn(ctx, contractReaderCfgBytes)
if err != nil {
return nil, fmt.Errorf("failed to create contract reader: %w", err)
}

err = contractReader.Bind(ctx, []types.BoundContract{{Name: WorkflowRegistryContractName, Address: l.workflowRegistryAddress}})
if err != nil {
return nil, fmt.Errorf("failed to bind contract reader: %w", err)
}

func (w *workflowRegistry) loadWorkflows(ctx context.Context, don capabilities.DON, contractReader ContractReader) (*types.Head, error) {
contractBinding := types.BoundContract{
Address: l.workflowRegistryAddress,
Address: w.workflowRegistryAddress,
Name: WorkflowRegistryContractName,
}

Expand All @@ -540,7 +466,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don
return nil, fmt.Errorf("failed to get lastest value with head data %w", err)
}

l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))
w.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))
for _, workflow := range workflows.WorkflowMetadataList {
toRegisteredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: workflow.WorkflowID,
Expand All @@ -552,11 +478,11 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don
ConfigURL: workflow.ConfigURL,
SecretsURL: workflow.SecretsURL,
}
if err = l.handler.Handle(ctx, workflowAsEvent{
if err = w.handler.Handle(ctx, workflowAsEvent{
Data: toRegisteredEvent,
EventType: WorkflowRegisteredEvent,
}); err != nil {
l.lggr.Errorf("failed to handle workflow registration: %s", err)
w.lggr.Errorf("failed to handle workflow registration: %s", err)
}
}

Expand Down

0 comments on commit 085b47c

Please sign in to comment.