Skip to content

Commit

Permalink
make committer detect if it is stuck and solve it
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 25, 2024
1 parent 31df1e0 commit 6238fe2
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 20 deletions.
23 changes: 23 additions & 0 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
}
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
return nil, nil
}

Expand Down Expand Up @@ -189,3 +190,25 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
// Checks if there are any blocks in staging after the current range end
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
if err != nil {
log.Error().Err(err).Msg("Error checking staged data for missing range")
return
}
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
return
}
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())

poller := NewBoundlessPoller(c.rpc, c.storage)
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
}
poller.Poll(blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
107 changes: 88 additions & 19 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
assert.Equal(t, big.NewInt(101), blockNumbers[0])
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommit(t *testing.T) {
Expand Down Expand Up @@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 3, len(*result))

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
Expand Down Expand Up @@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestCommit(t *testing.T) {
Expand All @@ -157,9 +146,6 @@ func TestCommit(t *testing.T) {
err := committer.commit(&blockData)

assert.NoError(t, err)

mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestHandleGap(t *testing.T) {
Expand Down Expand Up @@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) {
}

committer := NewCommitter(mockRPC, mockStorage)
committer.storage = mockStorage
committer.triggerIntervalMs = 100 // Set a short interval for testing

chainID := big.NewInt(1)
Expand All @@ -226,9 +211,93 @@ func TestStartCommitter(t *testing.T) {

// Wait for a short time to allow the committer to run
time.Sleep(200 * time.Millisecond)
}

func TestHandleMissingStagingData(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 100,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
{BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}},
{BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

// Assert that the expected methods were called
mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
assert.NoError(t, err)
assert.Nil(t, result)
}

func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5
config.Cfg.Poller.BlocksPerPoll = 3

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 3,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

assert.NoError(t, err)
assert.Nil(t, result)
}
1 change: 0 additions & 1 deletion internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
}
return nil, err
}
zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String())
return maxBlockNumber, nil
}

Expand Down

0 comments on commit 6238fe2

Please sign in to comment.