diff --git a/.changeset/brave-ads-explode.md b/.changeset/brave-ads-explode.md new file mode 100644 index 00000000000..4be608e2d1d --- /dev/null +++ b/.changeset/brave-ads-explode.md @@ -0,0 +1,9 @@ +--- +"chainlink": patch +--- + +Remove finality depth as the default value for minConfirmation for tx jobs. +Update the sql query for fetching pending callback transactions: +if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation +else we check if the tx block is <= finalizedBlock +#updated diff --git a/.changeset/chilled-months-bow.md b/.changeset/chilled-months-bow.md new file mode 100644 index 00000000000..d3bbf7f97e3 --- /dev/null +++ b/.changeset/chilled-months-bow.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added oracle support in standard capabilities diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 4eaa6739d58..d2d491d794c 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro if ec.resumeCallback != nil { mark = time.Now() - if err := ec.ResumePendingTaskRuns(ctx, head); err != nil { + if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil { return fmt.Errorf("ResumePendingTaskRuns failed: %w", err) } @@ -1259,8 +1259,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen } // ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error { - receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID) +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error { + receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID) if err != nil { return err diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 4467729e167..279a7252f1a 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -1096,9 +1096,9 @@ func (_c *TxStore_FindTxesByMetaFieldAndStates_Call[ADDR, CHAIN_ID, TX_HASH, BLO return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1106,19 +1106,19 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPen var r0 []txmgrtypes.ReceiptPlus[R] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1133,15 +1133,16 @@ type TxStore_FindTxesPendingCallback_Call[ADDR types.Hashable, CHAIN_ID types.ID // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(CHAIN_ID)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(CHAIN_ID)) }) return _c } @@ -1151,7 +1152,7 @@ func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HA return _c } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Return(run) return _c } diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 3d874cc4366..668b8db2049 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -34,7 +34,7 @@ type TxStore[ TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled - FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index a5e19cda277..a75b7709787 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -3055,7 +3055,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { // It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.NoError(t, err) }) @@ -3073,7 +3073,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.NoError(t, err) }) @@ -3101,7 +3101,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) if !assert.NoError(t, err2) { return } @@ -3155,7 +3155,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) if !assert.NoError(t, err2) { return } @@ -3192,7 +3192,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.Error(t, err) // Retrieve Tx to check if callback completed flag was left unchanged diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index f56e8fab3fd..b75533e8d05 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1055,7 +1055,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e } // Find confirmed txes requiring callback but have not yet been signaled -func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { +func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { var rs []dbReceiptPlus var cancel context.CancelFunc @@ -1066,8 +1066,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64 INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE - AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 - `, blockNum, chainID.String()) + AND ( + (evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations)) + OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2) + ) + AND evm.txes.evm_chain_id = $3 + `, latest, finalized, chainID.String()) if err != nil { return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err) } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index ffea13def04..9e1f135e0b2 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -652,7 +652,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) attempt1 := etx1.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash) + etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID) // Callback to pipeline service completed. Should be ignored @@ -685,10 +685,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID) // Search evm.txes table for tx requiring callback - receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID()) + receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) require.NoError(t, err) - assert.Len(t, receiptsPlus, 1) - assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } + + // Clear min_confirmations + pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID) + + // Search evm.txes table for tx requiring callback + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) + require.NoError(t, err) + assert.Empty(t, receiptsPlus) + + // Search evm.txes table for tx requiring callback, with block 1 finalized + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID()) + require.NoError(t, err) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } } func Test_FindTxWithIdempotencyKey(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a175e3d94..7800b26e47a 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1395,9 +1395,9 @@ func (_c *EvmTxStore_FindTxesByMetaFieldAndStates_Call) RunAndReturn(run func(co return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1405,19 +1405,19 @@ func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int6 var r0 []types.ReceiptPlus[*evmtypes.Receipt] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1432,15 +1432,16 @@ type EvmTxStore_FindTxesPendingCallback_Call struct { // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID *big.Int -func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { - return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { + return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, blockNum int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, latest int64, finalized int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(*big.Int)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(*big.Int)) }) return _c } @@ -1450,7 +1451,7 @@ func (_c *EvmTxStore_FindTxesPendingCallback_Call) Return(receiptsPlus []types.R return _c } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Return(run) return _c } diff --git a/core/scripts/go.mod b/core/scripts/go.mod index af7ddc2f98f..5c9043111da 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -22,7 +22,7 @@ require ( github.com/prometheus/client_golang v1.20.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 github.com/spf13/cobra v1.8.1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 684155bba51..edb7d16b965 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1099,8 +1099,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index c9e72244cf1..784abac9516 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -453,15 +453,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { pipelineRunner, cfg.JobPipeline(), ), - job.StandardCapabilities: standardcapabilities.NewDelegate( - globalLogger, - opts.DS, jobORM, - opts.CapabilitiesRegistry, - loopRegistrarConfig, - telemetryManager, - pipelineRunner, - opts.RelayerChainInteroperators, - gatewayConnectorWrapper), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) @@ -501,6 +492,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("P2P stack required for OCR or OCR2") } + // If peer wrapper is initialized, Oracle Factory dependency will be available to standard capabilities + delegates[job.StandardCapabilities] = standardcapabilities.NewDelegate( + globalLogger, + opts.DS, jobORM, + opts.CapabilitiesRegistry, + loopRegistrarConfig, + telemetryManager, + pipelineRunner, + opts.RelayerChainInteroperators, + gatewayConnectorWrapper, + keyStore, + peerWrapper, + ) + if cfg.OCR().Enabled() { delegates[job.OffchainReporting] = ocr.NewDelegate( opts.DS, diff --git a/core/services/job/models.go b/core/services/job/models.go index f4b773a1bfb..2c225cd0f2b 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -948,12 +948,35 @@ func (w *WorkflowSpec) RawSpec(ctx context.Context) ([]byte, error) { return rs, nil } +type OracleFactoryConfig struct { + Enabled bool `toml:"enabled"` + BootstrapPeers []string `toml:"bootstrap_peers"` // e.g.,["12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690"] + OCRContractAddress string `toml:"ocr_contract_address"` // e.g., 0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6 + ChainID string `toml:"chain_id"` // e.g., "31337" + Network string `toml:"network"` // e.g., "evm" +} + +// Value returns this instance serialized for database storage. +func (ofc OracleFactoryConfig) Value() (driver.Value, error) { + return json.Marshal(ofc) +} + +// Scan reads the database value and returns an instance. +func (ofc *OracleFactoryConfig) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.Errorf("expected bytes got %T", b) + } + return json.Unmarshal(b, &ofc) +} + type StandardCapabilitiesSpec struct { - ID int32 - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - Command string `toml:"command"` - Config string `toml:"config"` + ID int32 + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + Command string `toml:"command" db:"command"` + Config string `toml:"config" db:"config"` + OracleFactory OracleFactoryConfig `toml:"oracle_factory" db:"oracle_factory"` } func (w *StandardCapabilitiesSpec) GetID() string { diff --git a/core/services/job/orm.go b/core/services/job/orm.go index d02e0b29200..071ca37203e 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -418,8 +418,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { } jb.WorkflowSpecID = &specID case StandardCapabilities: - sql := `INSERT INTO standardcapabilities_specs (command, config, created_at, updated_at) - VALUES (:command, :config, NOW(), NOW()) + sql := `INSERT INTO standardcapabilities_specs (command, config, oracle_factory, created_at, updated_at) + VALUES (:command, :config, :oracle_factory, NOW(), NOW()) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.StandardCapabilitiesSpec) if err != nil { diff --git a/core/services/ocr2/plugins/generic/oraclefactory.go b/core/services/ocr2/plugins/generic/oraclefactory.go new file mode 100644 index 00000000000..7d44a239d2e --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactory.go @@ -0,0 +1,140 @@ +package generic + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + ocr "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" + + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +type oracleFactory struct { + database ocr3types.Database + jobID int32 + jobName string + jobORM job.ORM + kb ocr2key.KeyBundle + lggr logger.Logger + config job.OracleFactoryConfig + peerWrapper *ocrcommon.SingletonPeerWrapper + relayerSet *RelayerSet + transmitterID string +} + +type OracleFactoryParams struct { + JobID int32 + JobName string + JobORM job.ORM + KB ocr2key.KeyBundle + Logger logger.Logger + Config job.OracleFactoryConfig + PeerWrapper *ocrcommon.SingletonPeerWrapper + RelayerSet *RelayerSet + TransmitterID string +} + +func NewOracleFactory(params OracleFactoryParams) (core.OracleFactory, error) { + return &oracleFactory{ + database: OracleFactoryDB(params.JobID, params.Logger), + jobID: params.JobID, + jobName: params.JobName, + jobORM: params.JobORM, + kb: params.KB, + lggr: params.Logger, + config: params.Config, + peerWrapper: params.PeerWrapper, + relayerSet: params.RelayerSet, + transmitterID: params.TransmitterID, + }, nil +} + +func (of *oracleFactory) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) { + if !of.peerWrapper.IsStarted() { + return nil, errors.New("peer wrapper not started") + } + + relayer, err := of.relayerSet.Get(ctx, types.RelayID{Network: of.config.Network, ChainID: of.config.ChainID}) + if err != nil { + return nil, fmt.Errorf("error when getting relayer: %w", err) + } + + var relayConfig = struct { + ChainID string `json:"chainID"` + EffectiveTransmitterID string `json:"effectiveTransmitterID"` + SendingKeys []string `json:"sendingKeys"` + }{ + ChainID: of.config.ChainID, + EffectiveTransmitterID: of.transmitterID, + SendingKeys: []string{of.transmitterID}, + } + relayConfigBytes, err := json.Marshal(relayConfig) + if err != nil { + return nil, fmt.Errorf("error when marshalling relay config: %w", err) + } + + pluginProvider, err := relayer.NewPluginProvider(ctx, core.RelayArgs{ + ContractID: of.config.OCRContractAddress, + ProviderType: "plugin", + RelayConfig: relayConfigBytes, + }, core.PluginArgs{ + TransmitterID: of.transmitterID, + }) + if err != nil { + return nil, fmt.Errorf("error when getting offchain digester: %w", err) + } + + bootstrapPeers, err := ocrcommon.ParseBootstrapPeers(of.config.BootstrapPeers) + if err != nil { + return nil, fmt.Errorf("failed to parse bootstrap peers: %w", err) + } + + oracle, err := ocr.NewOracle(ocr.OCR3OracleArgs[[]byte]{ + // We are relying on the relayer plugin provider for the offchain config digester + // and the contract config tracker to save time. + ContractConfigTracker: pluginProvider.ContractConfigTracker(), + OffchainConfigDigester: pluginProvider.OffchainConfigDigester(), + LocalConfig: args.LocalConfig, + ContractTransmitter: NewContractTransmitter(of.transmitterID, args.ContractTransmitter), + ReportingPluginFactory: args.ReportingPluginFactoryService, + BinaryNetworkEndpointFactory: of.peerWrapper.Peer2, + V2Bootstrappers: bootstrapPeers, + Database: of.database, + Logger: ocrcommon.NewOCRWrapper(of.lggr, true, func(ctx context.Context, msg string) { + logger.Sugared(of.lggr).ErrorIf(of.jobORM.RecordError(ctx, of.jobID, msg), "unable to record error") + }), + MonitoringEndpoint: &telemetry.NoopAgent{}, + OffchainKeyring: of.kb, + OnchainKeyring: ocrcommon.NewOCR3OnchainKeyringAdapter(of.kb), + MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": of.jobName}, prometheus.DefaultRegisterer), + }) + + if err != nil { + return nil, fmt.Errorf("%w: failed to create new OCR oracle", err) + } + + return &adaptedOracle{oracle: oracle}, nil +} + +type adaptedOracle struct { + oracle ocr.Oracle +} + +func (a *adaptedOracle) Start(ctx context.Context) error { + return a.oracle.Start() +} + +func (a *adaptedOracle) Close(ctx context.Context) error { + return a.oracle.Close() +} diff --git a/core/services/ocr2/plugins/generic/oraclefactorydb.go b/core/services/ocr2/plugins/generic/oraclefactorydb.go new file mode 100644 index 00000000000..5db6f8a2ffe --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactorydb.go @@ -0,0 +1,135 @@ +package generic + +import ( + "context" + "encoding/json" + "fmt" + "time" + + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type oracleFactoryDb struct { + // The ID is used for logging and error messages + // A single standard capabilities spec can instantiate multiple oracles + // TODO: NewOracle should take a unique identifier for the oracle + specID int32 + lggr logger.SugaredLogger + config *ocrtypes.ContractConfig + states map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState + pendingTransmissions map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission + protocolStates map[ocrtypes.ConfigDigest]map[string][]byte +} + +var ( + _ ocrtypes.Database = &oracleFactoryDb{} +) + +// NewDB returns a new DB scoped to this instanceID +func OracleFactoryDB(specID int32, lggr logger.Logger) *oracleFactoryDb { + return &oracleFactoryDb{ + specID: specID, + lggr: logger.Sugared(lggr.Named("OracleFactoryMemoryDb")), + states: make(map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState), + pendingTransmissions: make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission), + protocolStates: make(map[ocrtypes.ConfigDigest]map[string][]byte), + } +} + +func (ofdb *oracleFactoryDb) ReadState(ctx context.Context, cd ocrtypes.ConfigDigest) (ps *ocrtypes.PersistentState, err error) { + ps, ok := ofdb.states[cd] + if !ok { + return nil, fmt.Errorf("state not found for standard capabilities spec ID %d, config digest %s", ofdb.specID, cd) + } + + return ps, nil +} + +func (ofdb *oracleFactoryDb) WriteState(ctx context.Context, cd ocrtypes.ConfigDigest, state ocrtypes.PersistentState) error { + ofdb.states[cd] = &state + return nil +} + +func (ofdb *oracleFactoryDb) ReadConfig(ctx context.Context) (c *ocrtypes.ContractConfig, err error) { + if ofdb.config == nil { + // Returning nil, nil because this is a cache miss + return nil, nil + } + return ofdb.config, nil +} + +func (ofdb *oracleFactoryDb) WriteConfig(ctx context.Context, c ocrtypes.ContractConfig) error { + ofdb.config = &c + + cBytes, err := json.Marshal(c) + if err != nil { + return fmt.Errorf("MemoryDB: WriteConfig failed to marshal config: %w", err) + } + + ofdb.lggr.Debugw("MemoryDB: WriteConfig", "ocrtypes.ContractConfig", string(cBytes)) + + return nil +} + +func (ofdb *oracleFactoryDb) StorePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp, tx ocrtypes.PendingTransmission) error { + ofdb.pendingTransmissions[t] = tx + return nil +} + +func (ofdb *oracleFactoryDb) PendingTransmissionsWithConfigDigest(ctx context.Context, cd ocrtypes.ConfigDigest) (map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission, error) { + m := make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission) + for k, v := range ofdb.pendingTransmissions { + if k.ConfigDigest == cd { + m[k] = v + } + } + + return m, nil +} + +func (ofdb *oracleFactoryDb) DeletePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp) error { + delete(ofdb.pendingTransmissions, t) + return nil +} + +func (ofdb *oracleFactoryDb) DeletePendingTransmissionsOlderThan(ctx context.Context, t time.Time) error { + for k, v := range ofdb.pendingTransmissions { + if v.Time.Before(t) { + delete(ofdb.pendingTransmissions, k) + } + } + + return nil +} + +func (ofdb *oracleFactoryDb) ReadProtocolState( + ctx context.Context, + configDigest ocrtypes.ConfigDigest, + key string, +) ([]byte, error) { + value, ok := ofdb.protocolStates[configDigest][key] + if !ok { + // Previously implementation returned nil if the state is not found + return nil, nil + } + return value, nil +} + +func (ofdb *oracleFactoryDb) WriteProtocolState( + ctx context.Context, + configDigest ocrtypes.ConfigDigest, + key string, + value []byte, +) error { + if value == nil { + delete(ofdb.protocolStates[configDigest], key) + } else { + if ofdb.protocolStates[configDigest] == nil { + ofdb.protocolStates[configDigest] = make(map[string][]byte) + } + ofdb.protocolStates[configDigest][key] = value + } + return nil +} diff --git a/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go b/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go new file mode 100644 index 00000000000..41a2c3fa1c4 --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go @@ -0,0 +1,36 @@ +package generic + +import ( + "context" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +) + +var _ ocr3types.ContractTransmitter[[]byte] = (*contractTransmitter)(nil) + +type contractTransmitter struct { + impl ocr3types.ContractTransmitter[[]byte] + transmitterID string +} + +func NewContractTransmitter(transmitterID string, impl ocr3types.ContractTransmitter[[]byte]) *contractTransmitter { + return &contractTransmitter{ + impl: impl, + transmitterID: transmitterID, + } +} + +func (ct *contractTransmitter) Transmit( + ctx context.Context, + configDigest types.ConfigDigest, + seqNr uint64, + reportWithInfo ocr3types.ReportWithInfo[[]byte], + attributedOnchainSignature []types.AttributedOnchainSignature, +) error { + return ct.impl.Transmit(ctx, configDigest, seqNr, reportWithInfo, attributedOnchainSignature) +} + +func (ct *contractTransmitter) FromAccount() (types.Account, error) { + return types.Account(ct.transmitterID), nil +} diff --git a/core/services/pipeline/task.eth_tx.go b/core/services/pipeline/task.eth_tx.go index 506a2518f76..3c340261966 100644 --- a/core/services/pipeline/task.eth_tx.go +++ b/core/services/pipeline/task.eth_tx.go @@ -64,11 +64,11 @@ func (t *ETHTxTask) getEvmChainID() string { return t.EVMChainID } -func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { +func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) { var chainID StringParam err := errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.getEvmChainID(), vars), NonemptyString(t.getEvmChainID()), "")), "evmChainID") if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } chain, err := t.legacyChains.Get(string(chainID)) @@ -81,7 +81,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu txManager := chain.TxManager() _, err = CheckInputs(inputs, -1, -1, 0) if err != nil { - return Result{Error: errors.Wrap(err, "task inputs")}, runInfo + return Result{Error: errors.Wrap(err, "task inputs")}, RunInfo{} } maximumGasLimit := SelectGasLimit(cfg.GasEstimator(), t.jobType, t.specGasLimit) @@ -107,25 +107,20 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu errors.Wrap(ResolveParam(&failOnRevert, From(NonemptyString(t.FailOnRevert), false)), "failOnRevert"), ) if err != nil { - return Result{Error: err}, runInfo - } - var minOutgoingConfirmations uint64 - if min, isSet := maybeMinConfirmations.Uint64(); isSet { - minOutgoingConfirmations = min - } else { - minOutgoingConfirmations = uint64(cfg.FinalityDepth()) + return Result{Error: err}, RunInfo{} } + minOutgoingConfirmations, isMinConfirmationSet := maybeMinConfirmations.Uint64() txMeta, err := decodeMeta(txMetaMap) if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } txMeta.FailOnRevert = null.BoolFrom(bool(failOnRevert)) setJobIDOnMeta(lggr, vars, txMeta) transmitChecker, err := decodeTransmitChecker(transmitCheckerMap) if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } fromAddr, err := t.keyStore.GetRoundRobinAddress(ctx, chain.ID(), fromAddrs...) @@ -159,8 +154,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu SignalCallback: true, } - if minOutgoingConfirmations > 0 { - // Store the task run ID, so we can resume the pipeline when tx is confirmed + if !isMinConfirmationSet { + // Store the task run ID, so we can resume the pipeline when tx is finalized + txRequest.PipelineTaskRunID = &t.uuid + } else if minOutgoingConfirmations > 0 { + // Store the task run ID, so we can resume the pipeline after minOutgoingConfirmations txRequest.PipelineTaskRunID = &t.uuid txRequest.MinConfirmations = clnull.Uint32From(uint32(minOutgoingConfirmations)) } @@ -170,11 +168,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu return Result{Error: errors.Wrapf(ErrTaskRunFailed, "while creating transaction: %v", err)}, retryableRunInfo() } - if minOutgoingConfirmations > 0 { - return Result{}, pendingRunInfo() + if txRequest.PipelineTaskRunID != nil { + return Result{}, RunInfo{IsPending: true} } - return Result{Value: nil}, runInfo + return Result{}, RunInfo{} } func decodeMeta(metaMap MapParam) (*txmgr.TxMeta, error) { diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index c22134bb48b..7c370d4f8de 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -18,7 +18,12 @@ import ( webapitarget "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/target" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/plugins" @@ -39,6 +44,8 @@ type Delegate struct { pipelineRunner pipeline.Runner relayers RelayGetter gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + ks keystore.Master + peerWrapper *ocrcommon.SingletonPeerWrapper isNewlyCreatedJob bool } @@ -49,11 +56,33 @@ const ( commandOverrideForCustomComputeAction = "__builtin_custom-compute-action" ) -func NewDelegate(logger logger.Logger, ds sqlutil.DataSource, jobORM job.ORM, registry core.CapabilitiesRegistry, - cfg plugins.RegistrarConfig, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, pipelineRunner pipeline.Runner, - relayers RelayGetter, gatewayConnectorWrapper *gatewayconnector.ServiceWrapper) *Delegate { - return &Delegate{logger: logger, ds: ds, jobORM: jobORM, registry: registry, cfg: cfg, monitoringEndpointGen: monitoringEndpointGen, pipelineRunner: pipelineRunner, - relayers: relayers, isNewlyCreatedJob: false, gatewayConnectorWrapper: gatewayConnectorWrapper} +func NewDelegate( + logger logger.Logger, + ds sqlutil.DataSource, + jobORM job.ORM, + registry core.CapabilitiesRegistry, + cfg plugins.RegistrarConfig, + monitoringEndpointGen telemetry.MonitoringEndpointGenerator, + pipelineRunner pipeline.Runner, + relayers RelayGetter, + gatewayConnectorWrapper *gatewayconnector.ServiceWrapper, + ks keystore.Master, + peerWrapper *ocrcommon.SingletonPeerWrapper, +) *Delegate { + return &Delegate{ + logger: logger, + ds: ds, + jobORM: jobORM, + registry: registry, + cfg: cfg, + monitoringEndpointGen: monitoringEndpointGen, + pipelineRunner: pipelineRunner, + relayers: relayers, + isNewlyCreatedJob: false, + gatewayConnectorWrapper: gatewayConnectorWrapper, + ks: ks, + peerWrapper: peerWrapper, + } } func (d *Delegate) JobType() job.Type { @@ -78,6 +107,63 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return nil, fmt.Errorf("failed to create relayer set: %w", err) } + ocrKeyBundles, err := d.ks.OCR2().GetAll() + if err != nil { + return nil, err + } + + if len(ocrKeyBundles) > 1 { + return nil, fmt.Errorf("expected exactly one OCR key bundle, but found: %d", len(ocrKeyBundles)) + } + + var ocrKeyBundle ocr2key.KeyBundle + if len(ocrKeyBundles) == 0 { + ocrKeyBundle, err = d.ks.OCR2().Create(ctx, chaintype.EVM) + if err != nil { + return nil, errors.Wrap(err, "failed to create OCR key bundle") + } + } else { + ocrKeyBundle = ocrKeyBundles[0] + } + + ethKeyBundles, err := d.ks.Eth().GetAll(ctx) + if err != nil { + return nil, err + } + if len(ethKeyBundles) > 1 { + return nil, fmt.Errorf("expected exactly one ETH key bundle, but found: %d", len(ethKeyBundles)) + } + + var ethKeyBundle ethkey.KeyV2 + if len(ethKeyBundles) == 0 { + ethKeyBundle, err = d.ks.Eth().Create(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create ETH key bundle") + } + } else { + ethKeyBundle = ethKeyBundles[0] + } + + log.Debug("oracleFactoryConfig: ", spec.StandardCapabilitiesSpec.OracleFactory) + + if spec.StandardCapabilitiesSpec.OracleFactory.Enabled && d.peerWrapper == nil { + return nil, errors.New("P2P stack required for Oracle Factory") + } + + oracleFactory, err := generic.NewOracleFactory(generic.OracleFactoryParams{ + Logger: log, + JobORM: d.jobORM, + JobID: spec.ID, + JobName: spec.Name.ValueOrZero(), + KB: ocrKeyBundle, + Config: spec.StandardCapabilitiesSpec.OracleFactory, + PeerWrapper: d.peerWrapper, + RelayerSet: relayerSet, + TransmitterID: ethKeyBundle.Address.String(), + }) + if err != nil { + return nil, fmt.Errorf("failed to create oracle factory: %w", err) + } // NOTE: special cases for built-in capabilities (to be moved into LOOPPs in the future) if spec.StandardCapabilitiesSpec.Command == commandOverrideForWebAPITrigger { if d.gatewayConnectorWrapper == nil { @@ -122,7 +208,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser } standardCapability := newStandardCapabilities(log, spec.StandardCapabilitiesSpec, d.cfg, telemetryService, kvStore, d.registry, errorLog, - pr, relayerSet) + pr, relayerSet, oracleFactory) return []job.ServiceCtx{standardCapability}, nil } @@ -161,6 +247,22 @@ func ValidatedStandardCapabilitiesSpec(tomlString string) (job.Job, error) { return jb, errors.Errorf("standard capabilities command must be set") } + // Skip validation if Oracle Factory is not enabled + if !jb.StandardCapabilitiesSpec.OracleFactory.Enabled { + return jb, nil + } + + // If Oracle Factory is enabled, it must have at least one bootstrap peer + if len(jb.StandardCapabilitiesSpec.OracleFactory.BootstrapPeers) == 0 { + return jb, errors.New("no bootstrap peers found") + } + + // Validate bootstrap peers + _, err = ocrcommon.ParseBootstrapPeers(jb.StandardCapabilitiesSpec.OracleFactory.BootstrapPeers) + if err != nil { + return jb, errors.Wrap(err, "failed to parse bootstrap peers") + } + return jb, nil } diff --git a/core/services/standardcapabilities/delegate_test.go b/core/services/standardcapabilities/delegate_test.go new file mode 100644 index 00000000000..27b6734f911 --- /dev/null +++ b/core/services/standardcapabilities/delegate_test.go @@ -0,0 +1,121 @@ +package standardcapabilities_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities" +) + +func Test_ValidatedStandardCapabilitiesSpec(t *testing.T) { + type testCase struct { + name string + tomlString string + expectedError string + expectedSpec *job.StandardCapabilitiesSpec + } + + testCases := []testCase{ + { + name: "invalid TOML string", + tomlString: `[[]`, + expectedError: "toml error on load standard capabilities", + }, + { + name: "incorrect job type", + tomlString: ` + type="nonstandardcapabilities" + `, + expectedError: "standard capabilities unsupported job type", + }, + { + name: "command unset", + tomlString: ` + type="standardcapabilities" + `, + expectedError: "standard capabilities command must be set", + }, + { + name: "invalid oracle config: malformed peer", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [oracle_factory] + enabled=true + bootstrap_peers = [ + "invalid_p2p_id@invalid_ip:1111" + ] + `, + expectedError: "failed to parse bootstrap peers", + }, + { + name: "invalid oracle config: missing bootstrap peers", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [oracle_factory] + enabled=true + `, + expectedError: "no bootstrap peers found", + }, + { + name: "valid spec", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + `, + }, + { + name: "valid spec with oracle config", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [capabilities] + target = "enabled" + + [oracle_factory] + enabled=true + bootstrap_peers = [ + "12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690" + ] + network="evm" + chain_id="31337" + ocr_contract_address="0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" + `, + expectedSpec: &job.StandardCapabilitiesSpec{ + Command: "path/to/binary", + OracleFactory: job.OracleFactoryConfig{ + Enabled: true, + BootstrapPeers: []string{ + "12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690", + }, + OCRContractAddress: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + ChainID: "31337", + Network: "evm", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + jobSpec, err := standardcapabilities.ValidatedStandardCapabilitiesSpec(tc.tomlString) + + if tc.expectedError != "" { + assert.ErrorContains(t, err, tc.expectedError) + } else { + require.NoError(t, err) + } + + if tc.expectedSpec != nil { + assert.EqualValues(t, tc.expectedSpec, jobSpec.StandardCapabilitiesSpec) + } + }) + } +} diff --git a/core/services/standardcapabilities/standard_capabilities.go b/core/services/standardcapabilities/standard_capabilities.go index a8d007d5df8..fe3dad7bb2f 100644 --- a/core/services/standardcapabilities/standard_capabilities.go +++ b/core/services/standardcapabilities/standard_capabilities.go @@ -23,18 +23,23 @@ type standardCapabilities struct { errorLog core.ErrorLog pipelineRunner core.PipelineRunnerService relayerSet core.RelayerSet + oracleFactory core.OracleFactory capabilitiesLoop *loop.StandardCapabilitiesService } -func newStandardCapabilities(log logger.Logger, spec *job.StandardCapabilitiesSpec, +func newStandardCapabilities( + log logger.Logger, + spec *job.StandardCapabilitiesSpec, pluginRegistrar plugins.RegistrarConfig, telemetryService core.TelemetryService, store core.KeyValueStore, CapabilitiesRegistry core.CapabilitiesRegistry, errorLog core.ErrorLog, pipelineRunner core.PipelineRunnerService, - relayerSet core.RelayerSet) *standardCapabilities { + relayerSet core.RelayerSet, + oracleFactory core.OracleFactory, +) *standardCapabilities { return &standardCapabilities{ log: log, spec: spec, @@ -45,6 +50,7 @@ func newStandardCapabilities(log logger.Logger, spec *job.StandardCapabilitiesSp errorLog: errorLog, pipelineRunner: pipelineRunner, relayerSet: relayerSet, + oracleFactory: oracleFactory, } } @@ -73,7 +79,7 @@ func (s *standardCapabilities) Start(ctx context.Context) error { } if err = s.capabilitiesLoop.Service.Initialise(ctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog, - s.pipelineRunner, s.relayerSet); err != nil { + s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil { return fmt.Errorf("error initialising standard capabilities service: %v", err) } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index ed584ba5aec..680c29371a0 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -11,9 +11,8 @@ import ( "github.com/jonboulle/clockwork" - "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" - "github.com/smartcontractkit/chainlink-common/pkg/workflows/exec" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" diff --git a/core/store/migrate/migrations/0255_standard_capabilities_extension.sql b/core/store/migrate/migrations/0255_standard_capabilities_extension.sql new file mode 100644 index 00000000000..d81b4864eb4 --- /dev/null +++ b/core/store/migrate/migrations/0255_standard_capabilities_extension.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE standardcapabilities_specs +ADD COLUMN oracle_factory JSONB; +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +ALTER TABLE standardcapabilities_specs DROP COLUMN oracle_factory; +-- +goose StatementEnd diff --git a/go.mod b/go.mod index 93696685c60..a2c9b1b211f 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f diff --git a/go.sum b/go.sum index 2e4a7775a60..8fad9ec97ad 100644 --- a/go.sum +++ b/go.sum @@ -1060,8 +1060,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index d3ad17f908c..34cf2df9cbb 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -40,7 +40,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/lib/grafana v1.50.0 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 90c7f08b0f9..bd416855c59 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1439,8 +1439,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 8b3fed5432b..85b57698a3e 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -15,7 +15,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.12.2 - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.1 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.0 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 8bdc23dc35f..b921352eeb5 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1415,8 +1415,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/plugins/cmd/capabilities/log-event-trigger/main.go b/plugins/cmd/capabilities/log-event-trigger/main.go index 7cf66f9c847..d01485a743f 100644 --- a/plugins/cmd/capabilities/log-event-trigger/main.go +++ b/plugins/cmd/capabilities/log-event-trigger/main.go @@ -92,6 +92,7 @@ func (cs *LogEventTriggerGRPCService) Initialise( errorLog core.ErrorLog, pipelineRunner core.PipelineRunnerService, relayerSet core.RelayerSet, + oracleFactory core.OracleFactory, ) error { cs.s.Logger.Debugf("Initialising %s", serviceName)