Skip to content

Commit

Permalink
CCIP-4163 Out of order execution (#15489)
Browse files Browse the repository at this point in the history
* Out of order execution

* Linter fixes

* Linter fixes

* Post review fixes

* Minor fixes

* Minor fixes

* Don't enforce all messages to be within a single commit

* Post review fixes

* Post review fixes

* Post review fixes

* Post review fixes
  • Loading branch information
mateusz-sekara authored Dec 4, 2024
1 parent 29e5b3b commit c36f30d
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 78 deletions.
13 changes: 13 additions & 0 deletions .github/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,19 @@ runner-test-matrix:
E2E_TEST_SELECTED_NETWORK: SIMULATED_1,SIMULATED_2
E2E_JD_VERSION: 0.6.0

- id: smoke/ccip/ccip_ooo_execution_test.go:*
path: integration-tests/smoke/ccip/ccip_ooo_execution_test.go
test_env_type: docker
runs_on: ubuntu-latest
triggers:
- PR E2E Core Tests
- Nightly E2E Tests
test_cmd: cd integration-tests/ && go test smoke/ccip/ccip_ooo_execution_test.go -timeout 16m -test.parallel=1 -count=1 -json
pyroscope_env: ci-smoke-ccipv1_6-evm-simulated
test_env_vars:
E2E_TEST_SELECTED_NETWORK: SIMULATED_1,SIMULATED_2
E2E_JD_VERSION: 0.6.0

- id: smoke/ccip/ccip_usdc_test.go:*
path: integration-tests/smoke/ccip/ccip_usdc_test.go
test_env_type: docker
Expand Down
2 changes: 1 addition & 1 deletion deployment/ccip/changeset/cs_add_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestAddChainInbound(t *testing.T) {
commonutils.JustError(ConfirmCommitWithExpectedSeqNumRange(t, e.Env.Chains[initialDeploy[0]], e.Env.Chains[newChain], state.Chains[newChain].OffRamp, &startBlock, cciptypes.SeqNumRange{
cciptypes.SeqNum(1),
cciptypes.SeqNum(msgSentEvent.SequenceNumber),
})))
}, true)))
require.NoError(t,
commonutils.JustError(
ConfirmExecWithSeqNrs(
Expand Down
65 changes: 60 additions & 5 deletions deployment/ccip/changeset/test_assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func ConfirmCommitForAllWithExpectedSeqNums(
ccipocr3.SeqNumRange{
ccipocr3.SeqNum(expectedSeqNum),
ccipocr3.SeqNum(expectedSeqNum),
}))
},
true,
))
})
}
}
Expand All @@ -224,6 +226,39 @@ func ConfirmCommitForAllWithExpectedSeqNums(
)
}

type commitReportTracker struct {
seenMessages map[uint64]map[uint64]bool
}

func newCommitReportTracker(sourceChainSelector uint64, seqNrs ccipocr3.SeqNumRange) commitReportTracker {
seenMessages := make(map[uint64]map[uint64]bool)
seenMessages[sourceChainSelector] = make(map[uint64]bool)

for i := seqNrs.Start(); i <= seqNrs.End(); i++ {
seenMessages[sourceChainSelector][uint64(i)] = false
}
return commitReportTracker{seenMessages: seenMessages}
}

func (c *commitReportTracker) visitCommitReport(sourceChainSelector uint64, minSeqNr uint64, maxSeqNr uint64) {
if _, ok := c.seenMessages[sourceChainSelector]; !ok {
return
}

for i := minSeqNr; i <= maxSeqNr; i++ {
c.seenMessages[sourceChainSelector][i] = true
}
}

func (c *commitReportTracker) allCommited(sourceChainSelector uint64) bool {
for _, v := range c.seenMessages[sourceChainSelector] {
if !v {
return false
}
}
return true
}

// ConfirmCommitWithExpectedSeqNumRange waits for a commit report on the destination chain with the expected sequence number range.
// startBlock is the block number to start watching from.
// If startBlock is nil, it will start watching from the latest block.
Expand All @@ -234,6 +269,7 @@ func ConfirmCommitWithExpectedSeqNumRange(
offRamp *offramp.OffRamp,
startBlock *uint64,
expectedSeqNumRange ccipocr3.SeqNumRange,
enforceSingleCommit bool,
) (*offramp.OffRampCommitReportAccepted, error) {
sink := make(chan *offramp.OffRampCommitReportAccepted)
subscription, err := offRamp.WatchCommitReportAccepted(&bind.WatchOpts{
Expand All @@ -244,6 +280,8 @@ func ConfirmCommitWithExpectedSeqNumRange(
return nil, fmt.Errorf("error to subscribe CommitReportAccepted : %w", err)
}

seenMessages := newCommitReportTracker(src.Selector, expectedSeqNumRange)

defer subscription.Unsubscribe()
var duration time.Duration
deadline, ok := t.Deadline()
Expand Down Expand Up @@ -279,11 +317,19 @@ func ConfirmCommitWithExpectedSeqNumRange(
event := iter.Event
if len(event.MerkleRoots) > 0 {
for _, mr := range event.MerkleRoots {
t.Logf("Received commit report for [%d, %d] on selector %d from source selector %d expected seq nr range %s, token prices: %v, tx hash: %s",
mr.MinSeqNr, mr.MaxSeqNr, dest.Selector, src.Selector, expectedSeqNumRange.String(), event.PriceUpdates.TokenPriceUpdates, event.Raw.TxHash.String())
seenMessages.visitCommitReport(src.Selector, mr.MinSeqNr, mr.MaxSeqNr)

if mr.SourceChainSelector == src.Selector &&
uint64(expectedSeqNumRange.Start()) >= mr.MinSeqNr &&
uint64(expectedSeqNumRange.End()) <= mr.MaxSeqNr {
t.Logf("Received commit report for [%d, %d] on selector %d from source selector %d expected seq nr range %s, token prices: %v, tx hash: %s",
mr.MinSeqNr, mr.MaxSeqNr, dest.Selector, src.Selector, expectedSeqNumRange.String(), event.PriceUpdates.TokenPriceUpdates, event.Raw.TxHash.String())
t.Logf("All sequence numbers commited in a single report [%d, %d]", expectedSeqNumRange.Start(), expectedSeqNumRange.End())
return event, nil
}

if !enforceSingleCommit && seenMessages.allCommited(src.Selector) {
t.Logf("All sequence numbers already commited from range [%d, %d]", expectedSeqNumRange.Start(), expectedSeqNumRange.End())
return event, nil
}
}
Expand All @@ -299,11 +345,20 @@ func ConfirmCommitWithExpectedSeqNumRange(
// Check the interval of sequence numbers and make sure it matches
// the expected range.
for _, mr := range report.MerkleRoots {
t.Logf("Received commit report for [%d, %d] on selector %d from source selector %d expected seq nr range %s, token prices: %v",
mr.MinSeqNr, mr.MaxSeqNr, dest.Selector, src.Selector, expectedSeqNumRange.String(), report.PriceUpdates.TokenPriceUpdates)

seenMessages.visitCommitReport(src.Selector, mr.MinSeqNr, mr.MaxSeqNr)

if mr.SourceChainSelector == src.Selector &&
uint64(expectedSeqNumRange.Start()) >= mr.MinSeqNr &&
uint64(expectedSeqNumRange.End()) <= mr.MaxSeqNr {
t.Logf("Received commit report for [%d, %d] on selector %d from source selector %d expected seq nr range %s, token prices: %v",
mr.MinSeqNr, mr.MaxSeqNr, dest.Selector, src.Selector, expectedSeqNumRange.String(), report.PriceUpdates.TokenPriceUpdates)
t.Logf("All sequence numbers commited in a single report [%d, %d]", expectedSeqNumRange.Start(), expectedSeqNumRange.End())
return report, nil
}

if !enforceSingleCommit && seenMessages.allCommited(src.Selector) {
t.Logf("All sequence numbers already commited from range [%d, %d]", expectedSeqNumRange.Start(), expectedSeqNumRange.End())
return report, nil
}
}
Expand Down
119 changes: 79 additions & 40 deletions deployment/ccip/changeset/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,18 @@ func NewMemoryEnvironmentWithJobs(t *testing.T, lggr logger.Logger, config memor
// We don't need to return exactly the same attestation, because our Mocked USDC contract doesn't rely on any specific
// value, but instead of that it just checks if the attestation is present. Therefore, it makes the test a bit simpler
// and doesn't require very detailed mocks. Please see tests in chainlink-ccip for detailed tests using real attestations
func mockAttestationResponse() *httptest.Server {
func mockAttestationResponse(isFaulty bool) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := `{
"status": "complete",
"attestation": "0x9049623e91719ef2aa63c55f357be2529b0e7122ae552c18aff8db58b4633c4d3920ff03d3a6d1ddf11f06bf64d7fd60d45447ac81f527ba628877dc5ca759651b08ffae25a6d3b1411749765244f0a1c131cbfe04430d687a2e12fd9d2e6dc08e118ad95d94ad832332cf3c4f7a4f3da0baa803b7be024b02db81951c0f0714de1b"
}`

if isFaulty {
response = `{
"status": "pending",
"error": "internal error"
}`
}
_, err := w.Write([]byte(response))
if err != nil {
panic(err)
Expand All @@ -261,9 +266,10 @@ func mockAttestationResponse() *httptest.Server {
}

type TestConfigs struct {
IsUSDC bool
IsMultiCall3 bool
OCRConfigOverride func(CCIPOCRParams) CCIPOCRParams
IsUSDC bool
IsUSDCAttestationMissing bool
IsMultiCall3 bool
OCRConfigOverride func(CCIPOCRParams) CCIPOCRParams
}

func NewMemoryEnvironmentWithJobsAndContracts(t *testing.T, lggr logger.Logger, config memory.MemoryEnvironmentConfig, tCfg *TestConfigs) DeployedEnv {
Expand Down Expand Up @@ -325,7 +331,7 @@ func NewMemoryEnvironmentWithJobsAndContracts(t *testing.T, lggr logger.Logger,
tokenConfig := NewTestTokenConfig(state.Chains[e.FeedChainSel].USDFeeds)
var tokenDataProviders []pluginconfig.TokenDataObserverConfig
if len(usdcChains) > 0 {
server := mockAttestationResponse()
server := mockAttestationResponse(tCfg.IsUSDCAttestationMissing)
endpoint := server.URL
t.Cleanup(func() {
server.Close()
Expand Down Expand Up @@ -769,7 +775,7 @@ func ConfirmRequestOnSourceAndDest(t *testing.T, env deployment.Environment, sta
commonutils.JustError(ConfirmCommitWithExpectedSeqNumRange(t, env.Chains[sourceCS], env.Chains[destCS], state.Chains[destCS].OffRamp, &startBlock, cciptypes.SeqNumRange{
cciptypes.SeqNum(msgSentEvent.SequenceNumber),
cciptypes.SeqNum(msgSentEvent.SequenceNumber),
})))
}, true)))

fmt.Printf("Commit confirmed for seqnr %d", msgSentEvent.SequenceNumber)
require.NoError(
Expand Down Expand Up @@ -1131,37 +1137,55 @@ func deployTransferTokenOneEnd(
return tokenContract.Contract, tokenPool.Contract, nil
}

type MintTokenInfo struct {
auth *bind.TransactOpts
sender *bind.TransactOpts
tokens []*burn_mint_erc677.BurnMintERC677
}

func NewMintTokenInfo(auth *bind.TransactOpts, tokens ...*burn_mint_erc677.BurnMintERC677) MintTokenInfo {
return MintTokenInfo{auth: auth, tokens: tokens}
}

func NewMintTokenWithCustomSender(auth *bind.TransactOpts, sender *bind.TransactOpts, tokens ...*burn_mint_erc677.BurnMintERC677) MintTokenInfo {
return MintTokenInfo{auth: auth, sender: sender, tokens: tokens}
}

// MintAndAllow mints tokens for deployers and allow router to spend them
func MintAndAllow(
t *testing.T,
e deployment.Environment,
state CCIPOnChainState,
owners map[uint64]*bind.TransactOpts,
tkMap map[uint64][]*burn_mint_erc677.BurnMintERC677,
tokenMap map[uint64][]MintTokenInfo,
) {
configurePoolGrp := errgroup.Group{}
tenCoins := new(big.Int).Mul(big.NewInt(1e18), big.NewInt(10))

for chain, tokens := range tkMap {
owner, ok := owners[chain]
require.True(t, ok)
for chain, mintTokenInfos := range tokenMap {
mintTokenInfos := mintTokenInfos

tokens := tokens
configurePoolGrp.Go(func() error {
for _, token := range tokens {
tx, err := token.Mint(
owner,
e.Chains[chain].DeployerKey.From,
new(big.Int).Mul(tenCoins, big.NewInt(10)),
)
require.NoError(t, err)
_, err = e.Chains[chain].Confirm(tx)
require.NoError(t, err)

tx, err = token.Approve(e.Chains[chain].DeployerKey, state.Chains[chain].Router.Address(), tenCoins)
require.NoError(t, err)
_, err = e.Chains[chain].Confirm(tx)
require.NoError(t, err)
for _, mintTokenInfo := range mintTokenInfos {
sender := mintTokenInfo.sender
if sender == nil {
sender = e.Chains[chain].DeployerKey
}

for _, token := range mintTokenInfo.tokens {
tx, err := token.Mint(
mintTokenInfo.auth,
sender.From,
new(big.Int).Mul(tenCoins, big.NewInt(10)),
)
require.NoError(t, err)
_, err = e.Chains[chain].Confirm(tx)
require.NoError(t, err)

tx, err = token.Approve(sender, state.Chains[chain].Router.Address(), tenCoins)
require.NoError(t, err)
_, err = e.Chains[chain].Confirm(tx)
require.NoError(t, err)
}
}
return nil
})
Expand All @@ -1170,27 +1194,17 @@ func MintAndAllow(
require.NoError(t, configurePoolGrp.Wait())
}

// TransferAndWaitForSuccess sends a message from sourceChain to destChain and waits for it to be executed
func TransferAndWaitForSuccess(
func Transfer(
ctx context.Context,
t *testing.T,
env deployment.Environment,
state CCIPOnChainState,
sourceChain, destChain uint64,
tokens []router.ClientEVMTokenAmount,
receiver common.Address,
data []byte,
expectedStatus int,
extraArgs []byte,
) {
identifier := SourceDestPair{
SourceChainSelector: sourceChain,
DestChainSelector: destChain,
}

data, extraArgs []byte,
) (*onramp.OnRampCCIPMessageSent, map[uint64]*uint64) {
startBlocks := make(map[uint64]*uint64)
expectedSeqNum := make(map[SourceDestPair]uint64)
expectedSeqNumExec := make(map[SourceDestPair][]uint64)

latesthdr, err := env.Chains[destChain].Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)
Expand All @@ -1204,6 +1218,31 @@ func TransferAndWaitForSuccess(
FeeToken: common.HexToAddress("0x0"),
ExtraArgs: extraArgs,
})
return msgSentEvent, startBlocks
}

// TransferAndWaitForSuccess sends a message from sourceChain to destChain and waits for it to be executed
func TransferAndWaitForSuccess(
ctx context.Context,
t *testing.T,
env deployment.Environment,
state CCIPOnChainState,
sourceChain, destChain uint64,
tokens []router.ClientEVMTokenAmount,
receiver common.Address,
data []byte,
expectedStatus int,
extraArgs []byte,
) {
identifier := SourceDestPair{
SourceChainSelector: sourceChain,
DestChainSelector: destChain,
}

expectedSeqNum := make(map[SourceDestPair]uint64)
expectedSeqNumExec := make(map[SourceDestPair][]uint64)

msgSentEvent, startBlocks := Transfer(ctx, t, env, state, sourceChain, destChain, tokens, receiver, data, extraArgs)
expectedSeqNum[identifier] = msgSentEvent.SequenceNumber
expectedSeqNumExec[identifier] = []uint64{msgSentEvent.SequenceNumber}

Expand Down
19 changes: 14 additions & 5 deletions integration-tests/ccip-tests/actions/ccip_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4379,21 +4379,30 @@ func NewBalanceSheet() *BalanceSheet {
}
}

type attestationStatusResponse struct {
Status string `json:"status"`
Attestation string `json:"attestation"`
Error string `json:"error"`
}

// SetMockServerWithUSDCAttestation responds with a mock attestation for any msgHash
// The path is set with regex to match any path that starts with /v1/attestations
func SetMockServerWithUSDCAttestation(
killGrave *ctftestenv.Killgrave,
mockserver *ctfClient.MockserverClient,
isFaulty bool,
) error {
path := "/v1/attestations"
response := struct {
Status string `json:"status"`
Attestation string `json:"attestation"`
Error string `json:"error"`
}{
response := attestationStatusResponse{
Status: "complete",
Attestation: "0x9049623e91719ef2aa63c55f357be2529b0e7122ae552c18aff8db58b4633c4d3920ff03d3a6d1ddf11f06bf64d7fd60d45447ac81f527ba628877dc5ca759651b08ffae25a6d3b1411749765244f0a1c131cbfe04430d687a2e12fd9d2e6dc08e118ad95d94ad832332cf3c4f7a4f3da0baa803b7be024b02db81951c0f0714de1b",
}
if isFaulty {
response = attestationStatusResponse{
Status: "pending",
Error: "internal error",
}
}
if killGrave == nil && mockserver == nil {
return fmt.Errorf("both killgrave and mockserver are nil")
}
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/ccip-tests/testsetups/ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func CCIPDefaultTestSetUp(
// if it's a new USDC deployment, set up mock server for attestation,
// we need to set it only once for all the lanes as the attestation path uses regex to match the path for
// all messages across all lanes
err = actions.SetMockServerWithUSDCAttestation(killgrave, setUpArgs.Env.MockServer)
err = actions.SetMockServerWithUSDCAttestation(killgrave, setUpArgs.Env.MockServer, false)
require.NoError(t, err, "failed to set up mock server for attestation")
}
}
Expand Down
Loading

0 comments on commit c36f30d

Please sign in to comment.