diff --git a/api/clients/retrieval_client_test.go b/api/clients/retrieval_client_test.go index 0c61531b42..5aae20baea 100644 --- a/api/clients/retrieval_client_test.go +++ b/api/clients/retrieval_client_test.go @@ -52,8 +52,8 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { } var ( - indexedChainState core.IndexedChainState - chainState core.ChainState + indexedChainState *coremock.ChainDataMock + chainState *coremock.ChainDataMock indexer *indexermock.MockIndexer operatorState *core.OperatorState nodeClient *clientsmock.MockNodeClient @@ -89,6 +89,7 @@ func setup(t *testing.T) { if err != nil { t.Fatalf("failed to create new mocked indexed chain data: %s", err) } + indexedChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) nodeClient = clientsmock.NewNodeClient() coordinator = &core.StdAssignmentCoordinator{} @@ -100,12 +101,7 @@ func setup(t *testing.T) { indexer = &indexermock.MockIndexer{} indexer.On("Index").Return(nil).Once() - ics, err := coreindexer.NewIndexedChainState(chainState, indexer) - if err != nil { - panic("failed to create a new indexed chain state") - } - - retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2) + retrievalClient, err = clients.NewRetrievalClient(logger, indexedChainState, coordinator, nodeClient, v, 2) if err != nil { panic("failed to create a new retrieval client") } @@ -132,6 +128,7 @@ func setup(t *testing.T) { }, Data: codec.ConvertByPaddingEmptyByte(gettysburgAddressBytes), } + operatorState, err = indexedChainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID}) if err != nil { t.Fatalf("failed to get operator state: %s", err) diff --git a/core/indexer/state.go b/core/indexer/state.go index 0570e09815..ebde2efed5 100644 --- a/core/indexer/state.go +++ b/core/indexer/state.go @@ -2,16 +2,19 @@ package indexer import ( "context" + "encoding/binary" "errors" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/indexer" + lru "github.com/hashicorp/golang-lru/v2" ) type IndexedChainState struct { core.ChainState - Indexer indexer.Indexer + Indexer indexer.Indexer + operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] } var _ core.IndexedChainState = (*IndexedChainState)(nil) @@ -19,11 +22,23 @@ var _ core.IndexedChainState = (*IndexedChainState)(nil) func NewIndexedChainState( chainState core.ChainState, indexer indexer.Indexer, + cacheSize int, ) (*IndexedChainState, error) { + var operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] + var err error + + if cacheSize > 0 { + operatorStateCache, err = lru.New[string, *core.IndexedOperatorState](cacheSize) + if err != nil { + return nil, err + } + } return &IndexedChainState{ ChainState: chainState, Indexer: indexer, + + operatorStateCache: operatorStateCache, }, nil } @@ -32,6 +47,13 @@ func (ics *IndexedChainState) Start(ctx context.Context) error { } func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) { + // Check if the indexed operator state has been cached + cacheKey := computeCacheKey(blockNumber, quorums) + if ics.operatorStateCache != nil { + if val, ok := ics.operatorStateCache.Get(cacheKey); ok { + return val, nil + } + } pubkeys, sockets, err := ics.getObjects(blockNumber) if err != nil { @@ -73,11 +95,14 @@ func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, block AggKeys: aggKeys, } + if ics.operatorStateCache != nil { + ics.operatorStateCache.Add(cacheKey, state) + } + return state, nil } func (ics *IndexedChainState) GetIndexedOperators(ctx context.Context, blockNumber uint) (map[core.OperatorID]*core.IndexedOperatorInfo, error) { - pubkeys, sockets, err := ics.getObjects(blockNumber) if err != nil { return nil, err @@ -138,3 +163,13 @@ func (ics *IndexedChainState) getObjects(blockNumber uint) (*OperatorPubKeys, Op return pubkeys, sockets, nil } + +// Computes a cache key for the operator state cache. The cache key is a +// combination of the block number and the quorum IDs. Note: the order of the +// quorum IDs matters. +func computeCacheKey(blockNumber uint, quorumIDs []uint8) string { + bytes := make([]byte, 8+len(quorumIDs)) + binary.LittleEndian.PutUint64(bytes, uint64(blockNumber)) + copy(bytes[8:], quorumIDs) + return string(bytes) +} diff --git a/core/indexer/state_mock_test.go b/core/indexer/state_mock_test.go new file mode 100644 index 0000000000..10bb927feb --- /dev/null +++ b/core/indexer/state_mock_test.go @@ -0,0 +1,80 @@ +package indexer_test + +import ( + "context" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/indexer" + coremock "github.com/Layr-Labs/eigenda/core/mock" + indexermock "github.com/Layr-Labs/eigenda/indexer/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type testComponents struct { + ChainState *coremock.ChainDataMock + Indexer *indexermock.MockIndexer + IndexedChainState *indexer.IndexedChainState +} + +func TestIndexedOperatorStateCache(t *testing.T) { + c := createTestComponents(t) + pubKeys := &indexer.OperatorPubKeys{} + c.Indexer.On("GetObject", mock.Anything, 0).Return(pubKeys, nil) + sockets := indexer.OperatorSockets{ + core.OperatorID{0, 1}: "socket1", + } + c.Indexer.On("GetObject", mock.Anything, 1).Return(sockets, nil) + + operatorState := &core.OperatorState{ + Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ + 0: { + core.OperatorID{0}: { + Stake: big.NewInt(100), + Index: 0, + }, + }, + }, + } + c.ChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(operatorState, nil) + + ctx := context.Background() + // Get the operator state for block 100 and quorum 0 + _, err := c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0}) + assert.NoError(t, err) + c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1) + + // Get the operator state for block 100 and quorum 0 again + _, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0}) + assert.NoError(t, err) + c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1) + + // Get the operator state for block 100 and quorum 1 + _, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{1}) + assert.NoError(t, err) + c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 2) + + // Get the operator state for block 101 and quorum 0 & 1 + _, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1}) + assert.NoError(t, err) + c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3) + + // Get the operator state for block 101 and quorum 0 & 1 again + _, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1}) + assert.NoError(t, err) + c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3) +} + +func createTestComponents(t *testing.T) *testComponents { + chainState := &coremock.ChainDataMock{} + idx := &indexermock.MockIndexer{} + ics, err := indexer.NewIndexedChainState(chainState, idx, 1) + assert.NoError(t, err) + return &testComponents{ + ChainState: chainState, + Indexer: idx, + IndexedChainState: ics, + } +} diff --git a/core/indexer/state_test.go b/core/indexer/state_test.go index 96e463638a..6802545465 100644 --- a/core/indexer/state_test.go +++ b/core/indexer/state_test.go @@ -137,7 +137,7 @@ func mustMakeChainState(env *deploy.Config, store indexer.HeaderStore, logger lo ) Expect(err).ToNot(HaveOccurred()) - chainState, err := indexedstate.NewIndexedChainState(cs, indexer) + chainState, err := indexedstate.NewIndexedChainState(cs, indexer, 0) if err != nil { panic(err) } diff --git a/core/mock/state.go b/core/mock/state.go index 2c2934357c..5d89b67b2a 100644 --- a/core/mock/state.go +++ b/core/mock/state.go @@ -225,6 +225,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl } func (d *ChainDataMock) GetOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) { + _ = d.Called(ctx, blockNumber, quorums) state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums) return state.OperatorState, nil diff --git a/core/test/core_test.go b/core/test/core_test.go index a47e7e44a2..088a0ef7b9 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -10,7 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" @@ -19,6 +19,7 @@ import ( "github.com/gammazero/workerpool" "github.com/hashicorp/go-multierror" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) var ( @@ -88,14 +89,15 @@ func makeTestBlob(t *testing.T, length int, securityParams []*core.SecurityParam // prepareBatch takes in multiple blob, encodes them, generates the associated assignments, and the batch header. // These are the products that a disperser will need in order to disperse data to the DA nodes. -func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) ([]core.EncodedBlob, core.BatchHeader, *mock.ChainDataMock) { +func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) ([]core.EncodedBlob, core.BatchHeader, *coremock.ChainDataMock) { - cst, err := mock.MakeChainDataMock(map[uint8]int{ + cst, err := coremock.MakeChainDataMock(map[uint8]int{ 0: int(operatorCount), 1: int(operatorCount), 2: int(operatorCount), }) assert.NoError(t, err) + cst.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(cst.Operators, nil) batchHeader := core.BatchHeader{ ReferenceBlockNumber: bn, diff --git a/core/thegraph/config.go b/core/thegraph/config.go index 9e1861906f..bc38ceb63a 100644 --- a/core/thegraph/config.go +++ b/core/thegraph/config.go @@ -8,15 +8,17 @@ import ( ) const ( - EndpointFlagName = "thegraph.endpoint" - BackoffFlagName = "thegraph.backoff" - MaxRetriesFlagName = "thegraph.max_retries" + EndpointFlagName = "thegraph.endpoint" + BackoffFlagName = "thegraph.backoff" + MaxRetriesFlagName = "thegraph.max_retries" + OperatorStateCacheSize = "thegraph.operator_state_cache_size" ) type Config struct { - Endpoint string // The Graph endpoint - PullInterval time.Duration // The interval to pull data from The Graph - MaxRetries int // The maximum number of retries to pull data from The Graph + Endpoint string // The Graph endpoint + PullInterval time.Duration // The interval to pull data from The Graph + MaxRetries int // The maximum number of retries to pull data from The Graph + OperatorStateCacheSize int // The size of the cache } func CLIFlags(envPrefix string) []cli.Flag { @@ -39,15 +41,22 @@ func CLIFlags(envPrefix string) []cli.Flag { Value: 5, EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_MAX_RETRIES"), }, + cli.IntFlag{ + Name: OperatorStateCacheSize, + Usage: "The size of the operator state cache in elements (0 to disable)", + Value: 0, + EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_OPERATOR_STATE_CACHE_SIZE"), + }, } } func ReadCLIConfig(ctx *cli.Context) Config { return Config{ - Endpoint: ctx.String(EndpointFlagName), - PullInterval: ctx.Duration(BackoffFlagName), - MaxRetries: ctx.Int(MaxRetriesFlagName), + Endpoint: ctx.String(EndpointFlagName), + PullInterval: ctx.Duration(BackoffFlagName), + MaxRetries: ctx.Int(MaxRetriesFlagName), + OperatorStateCacheSize: ctx.Int(OperatorStateCacheSize), } } diff --git a/core/thegraph/state.go b/core/thegraph/state.go index b0f99fe547..db84cca514 100644 --- a/core/thegraph/state.go +++ b/core/thegraph/state.go @@ -2,6 +2,7 @@ package thegraph import ( "context" + "encoding/binary" "errors" "fmt" "math" @@ -10,6 +11,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254" + lru "github.com/hashicorp/golang-lru/v2" "github.com/shurcooL/graphql" ) @@ -79,29 +81,41 @@ type ( core.ChainState querier GraphQLQuerier - logger logging.Logger + logger logging.Logger + operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] } ) var _ IndexedChainState = (*indexedChainState)(nil) -func MakeIndexedChainState(config Config, cs core.ChainState, logger logging.Logger) *indexedChainState { - +func MakeIndexedChainState(config Config, cs core.ChainState, logger logging.Logger) (*indexedChainState, error) { logger.Info("Using graph node") querier := graphql.NewClient(config.Endpoint, nil) // RetryQuerier is a wrapper around the GraphQLQuerier that retries queries on failure retryQuerier := NewRetryQuerier(querier, config.PullInterval, config.MaxRetries) - return NewIndexedChainState(cs, retryQuerier, logger) + return NewIndexedChainState(cs, retryQuerier, logger, config.OperatorStateCacheSize) } -func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger logging.Logger) *indexedChainState { +func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger logging.Logger, cacheSize int) (*indexedChainState, error) { + var operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] + var err error + + if cacheSize > 0 { + operatorStateCache, err = lru.New[string, *core.IndexedOperatorState](cacheSize) + if err != nil { + return nil, err + } + } + return &indexedChainState{ ChainState: cs, - querier: querier, - logger: logger.With("component", "IndexedChainState"), - } + + querier: querier, + logger: logger.With("component", "IndexedChainState"), + operatorStateCache: operatorStateCache, + }, nil } func (ics *indexedChainState) Start(ctx context.Context) error { @@ -124,6 +138,14 @@ func (ics *indexedChainState) Start(ctx context.Context) error { } func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) { + // Check if the indexed operator state has been cached + cacheKey := computeCacheKey(blockNumber, quorums) + if ics.operatorStateCache != nil { + if val, ok := ics.operatorStateCache.Get(cacheKey); ok { + return val, nil + } + } + operatorState, err := ics.ChainState.GetOperatorState(ctx, blockNumber, quorums) if err != nil { return nil, err @@ -173,6 +195,11 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block IndexedOperators: indexedOperators, AggKeys: aggKeys, } + + if ics.operatorStateCache != nil { + ics.operatorStateCache.Add(cacheKey, state) + } + return state, nil } @@ -365,3 +392,13 @@ func convertIndexedOperatorInfoGqlToIndexedOperatorInfo(operator *IndexedOperato Socket: string(operator.SocketUpdates[0].Socket), }, nil } + +// Computes a cache key for the operator state cache. The cache key is a +// combination of the block number and the quorum IDs. Note: the order of the +// quorum IDs matters. +func computeCacheKey(blockNumber uint, quorumIDs []uint8) string { + bytes := make([]byte, 8+len(quorumIDs)) + binary.LittleEndian.PutUint64(bytes, uint64(blockNumber)) + copy(bytes[8:], quorumIDs) + return string(bytes) +} diff --git a/core/thegraph/state_integration_test.go b/core/thegraph/state_integration_test.go index 8260db5094..d7af771bc5 100644 --- a/core/thegraph/state_integration_test.go +++ b/core/thegraph/state_integration_test.go @@ -86,7 +86,8 @@ func TestIndexerIntegration(t *testing.T) { tx, err := eth.NewWriter(logger, client, testConfig.EigenDA.OperatorStateRetreiver, testConfig.EigenDA.ServiceManager) assert.NoError(t, err) - cs := thegraph.NewIndexedChainState(eth.NewChainState(tx, client), graphql.NewClient(graphUrl, nil), logger) + cs, err := thegraph.NewIndexedChainState(eth.NewChainState(tx, client), graphql.NewClient(graphUrl, nil), logger, 0) + assert.NoError(t, err) time.Sleep(5 * time.Second) err = cs.Start(context.Background()) diff --git a/core/thegraph/state_test.go b/core/thegraph/state_test.go index f9fcf4f0af..e74b407b83 100644 --- a/core/thegraph/state_test.go +++ b/core/thegraph/state_test.go @@ -5,12 +5,13 @@ import ( "testing" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigensdk-go/logging" ethcomm "github.com/ethereum/go-ethereum/common" "github.com/shurcooL/graphql" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) var ( @@ -28,12 +29,14 @@ func (m mockGraphQLQuerier) Query(ctx context.Context, q any, variables map[stri func TestIndexedChainState_GetIndexedOperatorState(t *testing.T) { logger := logging.NewNoopLogger() - chainState, _ := mock.MakeChainDataMock(map[uint8]int{ + chainState, _ := coremock.MakeChainDataMock(map[uint8]int{ 0: 1, 1: 1, 2: 1, }) + chainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) state, err := chainState.GetOperatorState(context.Background(), 1, quorums) assert.NoError(t, err) @@ -80,7 +83,9 @@ func TestIndexedChainState_GetIndexedOperatorState(t *testing.T) { } } - cs := thegraph.NewIndexedChainState(chainState, querier, logger) + cs, err := thegraph.NewIndexedChainState(chainState, querier, logger, 0) + assert.NoError(t, err) + err = cs.Start(context.Background()) assert.NoError(t, err) @@ -95,12 +100,13 @@ func TestIndexedChainState_GetIndexedOperatorState(t *testing.T) { func TestIndexedChainState_GetIndexedOperatorStateMissingOperator(t *testing.T) { logger := logging.NewNoopLogger() - chainState, _ := mock.MakeChainDataMock(map[uint8]int{ + chainState, _ := coremock.MakeChainDataMock(map[uint8]int{ 0: 2, 1: 2, 2: 2, }) chainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) state, err := chainState.GetOperatorState(context.Background(), 1, quorums) assert.NoError(t, err) @@ -148,7 +154,9 @@ func TestIndexedChainState_GetIndexedOperatorStateMissingOperator(t *testing.T) } } - cs := thegraph.NewIndexedChainState(chainState, querier, logger) + cs, err := thegraph.NewIndexedChainState(chainState, querier, logger, 0) + assert.NoError(t, err) + err = cs.Start(context.Background()) assert.NoError(t, err) @@ -162,12 +170,13 @@ func TestIndexedChainState_GetIndexedOperatorStateMissingOperator(t *testing.T) func TestIndexedChainState_GetIndexedOperatorStateExtraOperator(t *testing.T) { logger := logging.NewNoopLogger() - chainState, _ := mock.MakeChainDataMock(map[uint8]int{ + chainState, _ := coremock.MakeChainDataMock(map[uint8]int{ 0: 1, 1: 1, 2: 1, }) chainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) state, err := chainState.GetOperatorState(context.Background(), 1, quorums) assert.NoError(t, err) @@ -229,7 +238,9 @@ func TestIndexedChainState_GetIndexedOperatorStateExtraOperator(t *testing.T) { } } - cs := thegraph.NewIndexedChainState(chainState, querier, logger) + cs, err := thegraph.NewIndexedChainState(chainState, querier, logger, 0) + assert.NoError(t, err) + err = cs.Start(context.Background()) assert.NoError(t, err) @@ -245,12 +256,13 @@ func TestIndexedChainState_GetIndexedOperatorStateExtraOperator(t *testing.T) { func TestIndexedChainState_GetIndexedOperatorInfoByOperatorId(t *testing.T) { logger := logging.NewNoopLogger() - chainState, _ := mock.MakeChainDataMock(map[uint8]int{ + chainState, _ := coremock.MakeChainDataMock(map[uint8]int{ 0: 1, 1: 1, 2: 1, }) chainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) state, err := chainState.GetOperatorState(context.Background(), 1, quorums) assert.NoError(t, err) @@ -283,7 +295,9 @@ func TestIndexedChainState_GetIndexedOperatorInfoByOperatorId(t *testing.T) { } } - cs := thegraph.NewIndexedChainState(chainState, querier, logger) + cs, err := thegraph.NewIndexedChainState(chainState, querier, logger, 0) + assert.NoError(t, err) + err = cs.Start(context.Background()) assert.NoError(t, err) @@ -296,3 +310,98 @@ func TestIndexedChainState_GetIndexedOperatorInfoByOperatorId(t *testing.T) { assert.Equal(t, "3336192159512049190945679273141887248666932624338963482128432381981287252980", info.PubkeyG1.X.String()) assert.Equal(t, "15195175002875833468883745675063986308012687914999552116603423331534089122704", info.PubkeyG1.Y.String()) } + +func TestIndexedOperatorStateCache_CacheEviction(t *testing.T) { + logger := logging.NewNoopLogger() + chainState, _ := coremock.MakeChainDataMock(map[uint8]int{ + 0: 1, + }) + + chainState.On("GetCurrentBlockNumber").Return(uint(1), nil) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil) + + state, err := chainState.GetOperatorState(context.Background(), 1, quorums) + assert.NoError(t, err) + id := "" + for key := range state.Operators[0] { + id = key.Hex() + } + + // Set up the mock querier to return consistent data + operatorsQueryCalled := false + querier := &mockGraphQLQuerier{} + querier.QueryFn = func(ctx context.Context, q any, variables map[string]any) error { + switch res := q.(type) { + case *thegraph.QueryQuorumAPKGql: + pubKey := thegraph.AggregatePubkeyKeyGql{ + Apk_X: "3829803941453902453085939595934570464887466392754984985219704448765546217155", + Apk_Y: "7864472681234874546092094912246874347602747071877011905183009416740980374479", + } + res.QuorumAPK = append(res.QuorumAPK, pubKey) + return nil + case *thegraph.QueryOperatorsGql: + if operatorsQueryCalled { + operatorsQueryCalled = false + return nil + } + res.Operators = []thegraph.IndexedOperatorInfoGql{ + { + Id: graphql.String(id), + PubkeyG1_X: "3336192159512049190945679273141887248666932624338963482128432381981287252980", + PubkeyG1_Y: "15195175002875833468883745675063986308012687914999552116603423331534089122704", + PubkeyG2_X: []graphql.String{ + "21597023645215426396093421944506635812143308313031252511177204078669540440732", + "11405255666568400552575831267661419473985517916677491029848981743882451844775", + }, + PubkeyG2_Y: []graphql.String{ + "9416989242565286095121881312760798075882411191579108217086927390793923664442", + "13612061731370453436662267863740141021994163834412349567410746669651828926551", + }, + SocketUpdates: []thegraph.SocketUpdates{{Socket: "localhost:32006;32007"}}, + }, + } + operatorsQueryCalled = true + return nil + default: + return nil + } + } + + // Create IndexedChainState with cache size 2 + cs, err := thegraph.NewIndexedChainState(chainState, querier, logger, 2) + assert.NoError(t, err) + + err = cs.Start(context.Background()) + assert.NoError(t, err) + + ctx := context.Background() + quorums := []core.QuorumID{0} + + // First call - should fetch the state + _, err = cs.GetIndexedOperatorState(ctx, 1, quorums) + assert.NoError(t, err) + + // Second call with different block - should fetch + _, err = cs.GetIndexedOperatorState(ctx, 2, quorums) + assert.NoError(t, err) + + // Third call with new block - should evict first entry + _, err = cs.GetIndexedOperatorState(ctx, 3, quorums) + assert.NoError(t, err) + + // Call again with block 1 - should fetch state again since it was evicted and evict block 2 + _, err = cs.GetIndexedOperatorState(ctx, 1, quorums) + assert.NoError(t, err) + + // Call with block 3 - should hit cache + _, err = cs.GetIndexedOperatorState(ctx, 3, quorums) + assert.NoError(t, err) + + // Call with block 1 - should hit cache + _, err = cs.GetIndexedOperatorState(ctx, 1, quorums) + assert.NoError(t, err) + + // Verify number of calls + // 4 calls to GetOperatorState + 1 from the initial call to GetOperatorState + chainState.AssertNumberOfCalls(t, "GetOperatorState", 5) +} diff --git a/core/v2/core_test.go b/core/v2/core_test.go index e4ecd520b3..9e98c5cd90 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -10,7 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" @@ -23,10 +23,11 @@ import ( "github.com/gammazero/workerpool" "github.com/hashicorp/go-multierror" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) var ( - dat *mock.ChainDataMock + dat *coremock.ChainDataMock agg core.SignatureAggregator p encoding.Prover @@ -46,7 +47,7 @@ var ( func TestMain(m *testing.M) { var err error - dat, err = mock.MakeChainDataMock(map[uint8]int{ + dat, err = coremock.MakeChainDataMock(map[uint8]int{ 0: 6, 1: 3, }) @@ -54,7 +55,7 @@ func TestMain(m *testing.M) { panic(err) } logger := logging.NewNoopLogger() - reader := &mock.MockWriter{} + reader := &coremock.MockWriter{} reader.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil) agg, err = core.NewStdSignatureAggregator(logger, reader) if err != nil { @@ -132,12 +133,13 @@ func prepareBlobs( referenceBlockNumber uint64, ) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) { - cst, err := mock.MakeChainDataMock(map[uint8]int{ + cst, err := coremock.MakeChainDataMock(map[uint8]int{ 0: int(operatorCount), 1: int(operatorCount), 2: int(operatorCount), }) assert.NoError(t, err) + cst.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(cst.Operators, nil) blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(certs)) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index eda194f5ef..918a017369 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -2,7 +2,6 @@ package batcher import ( "context" - "encoding/binary" "errors" "fmt" "strings" @@ -14,15 +13,12 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" - lru "github.com/hashicorp/golang-lru/v2" "github.com/wealdtech/go-merkletree/v2" grpc_metadata "google.golang.org/grpc/metadata" ) const encodingInterval = 2 * time.Second -const operatorStateCacheSize = 32 - var errNoEncodedResults = errors.New("no encoded results") type EncodedSizeNotifier struct { @@ -81,8 +77,6 @@ type EncodingStreamer struct { // Used to keep track of the last evaluated key for fetching metadatas exclusiveStartKey *disperser.BlobStoreExclusiveStartKey - - operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] } type batch struct { @@ -116,10 +110,7 @@ func NewEncodingStreamer( if config.EncodingQueueLimit <= 0 { return nil, errors.New("EncodingQueueLimit should be greater than 0") } - operatorStateCache, err := lru.New[string, *core.IndexedOperatorState](operatorStateCacheSize) - if err != nil { - return nil, err - } + return &EncodingStreamer{ StreamerConfig: config, EncodedBlobstore: newEncodedBlobStore(logger), @@ -135,7 +126,6 @@ func NewEncodingStreamer( batcherMetrics: batcherMetrics, logger: logger.With("component", "EncodingStreamer"), exclusiveStartKey: nil, - operatorStateCache: operatorStateCache, }, nil } @@ -680,16 +670,12 @@ func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*di i++ } - cacheKey := computeCacheKey(blockNumber, quorumIds) - if val, ok := e.operatorStateCache.Get(cacheKey); ok { - return val, nil - } // GetIndexedOperatorState should return state for valid quorums only state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds) if err != nil { return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err) } - e.operatorStateCache.Add(cacheKey, state) + return state, nil } @@ -715,10 +701,3 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe } return validMetadata } - -func computeCacheKey(blockNumber uint, quorumIDs []uint8) string { - bytes := make([]byte, 8+len(quorumIDs)) - binary.LittleEndian.PutUint64(bytes, uint64(blockNumber)) - copy(bytes[8:], quorumIDs) - return string(bytes) -} diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 86e7478891..134e1d9b5c 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -202,7 +202,10 @@ func RunBatcher(ctx *cli.Context) error { logger.Info("Using graph node") logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + ics, err = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + if err != nil { + return err + } } else { logger.Info("Using built-in indexer") @@ -216,7 +219,7 @@ func RunBatcher(ctx *cli.Context) error { if err != nil { return err } - ics, err = coreindexer.NewIndexedChainState(cs, indexer) + ics, err = coreindexer.NewIndexedChainState(cs, indexer, 0) if err != nil { return err } diff --git a/disperser/cmd/controller/main.go b/disperser/cmd/controller/main.go index 2725c3fd45..79ddcbb4ca 100644 --- a/disperser/cmd/controller/main.go +++ b/disperser/cmd/controller/main.go @@ -3,9 +3,6 @@ package main import ( "context" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" "log" "net/http" "os" @@ -25,6 +22,9 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/gammazero/workerpool" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli" ) @@ -126,7 +126,10 @@ func RunController(ctx *cli.Context) error { logger.Info("Using graph node") logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + ics, err = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + if err != nil { + return err + } } else { logger.Info("Using built-in indexer") rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0]) @@ -143,7 +146,7 @@ func RunController(ctx *cli.Context) error { if err != nil { return err } - ics, err = indexer.NewIndexedChainState(chainState, idx) + ics, err = indexer.NewIndexedChainState(chainState, idx, 0) if err != nil { return err } diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 7053639e0c..b4925414ed 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -90,14 +90,18 @@ func RunDataApi(ctx *cli.Context) error { return err } + chainState := coreeth.NewChainState(tx, client) + indexedChainState, err := thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + if err != nil { + return err + } + var ( promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) - chainState = coreeth.NewChainState(tx, client) - indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) server = dataapi.NewServer( dataapi.Config{ diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 05c23efc3f..a00f5780c1 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -369,6 +369,7 @@ func TestFetchMetricsHandler(t *testing.T) { mockTx.On("GetQuorumCount").Return(uint8(2), nil) mockSubgraphApi.On("QueryBatches").Return(subgraphBatches, nil) mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once() + mockChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(mockChainState.Operators) r.GET("/v1/metrics", testDataApiServer.FetchMetricsHandler) diff --git a/disperser/encoder/server_test.go b/disperser/encoder/server_test.go index b8c83e0503..8f9b60e971 100644 --- a/disperser/encoder/server_test.go +++ b/disperser/encoder/server_test.go @@ -81,6 +81,8 @@ func getTestData() (core.Blob, encoding.EncodingParams) { 1: 10, 2: 10, }) + indexedChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(indexedChainState.Operators, nil) + operatorState, err := indexedChainState.GetOperatorState(context.Background(), uint(0), []core.QuorumID{quorumID}) if err != nil { log.Fatalf("failed to get operator state: %s", err) diff --git a/inabox/deploy/config_types.go b/inabox/deploy/config_types.go index 875a70b803..a9b8b898ae 100644 --- a/inabox/deploy/config_types.go +++ b/inabox/deploy/config_types.go @@ -179,7 +179,6 @@ func (c Config) IsEigenDADeployed() bool { } func NewTestConfig(testName, rootPath string) (testEnv *Config) { - rootPath, err := filepath.Abs(rootPath) if err != nil { log.Panicf("Error %s:", err.Error()) diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 3a4e53bbbb..003c8def24 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -191,7 +191,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error { return err } - ics, err := coreindexer.NewIndexedChainState(cs, indexer) + ics, err := coreindexer.NewIndexedChainState(cs, indexer, 0) if err != nil { return err } diff --git a/node/node_test.go b/node/node_test.go index 8000b2e2d4..fc2cce05c1 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -76,6 +76,7 @@ func newComponents(t *testing.T) *components { 1: 4, 2: 4, }) + chainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators) store, err := node.NewLevelDBStore(dbPath, logger, nil, 1e9, 1e9) if err != nil { diff --git a/operators/churner/cmd/main.go b/operators/churner/cmd/main.go index a9ecd76e87..1325c119c1 100644 --- a/operators/churner/cmd/main.go +++ b/operators/churner/cmd/main.go @@ -84,7 +84,10 @@ func run(ctx *cli.Context) error { logger.Info("Using graph node") logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + indexer, err := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + if err != nil { + log.Fatalln("could not create indexer", err) + } metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) diff --git a/relay/cmd/main.go b/relay/cmd/main.go index bffdd9fa9e..5efab16147 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -80,7 +80,10 @@ func RunRelay(ctx *cli.Context) error { } cs := coreeth.NewChainState(tx, client) - ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + ics, err := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + if err != nil { + return fmt.Errorf("failed to create indexed chain state: %w", err) + } server, err := relay.NewServer( context.Background(), diff --git a/retriever/cmd/main.go b/retriever/cmd/main.go index 8ea7b0429a..2983d3c766 100644 --- a/retriever/cmd/main.go +++ b/retriever/cmd/main.go @@ -110,7 +110,10 @@ func RetrieverMain(ctx *cli.Context) error { logger.Info("Using graph node") logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + ics, err = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + if err != nil { + return err + } } else { logger.Info("Using built-in indexer") @@ -124,7 +127,7 @@ func RetrieverMain(ctx *cli.Context) error { if err != nil { return err } - ics, err = coreindexer.NewIndexedChainState(cs, indexer) + ics, err = coreindexer.NewIndexedChainState(cs, indexer, 0) if err != nil { return err } diff --git a/test/integration_test.go b/test/integration_test.go index 5016f3598f..38c4a0df6f 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -470,8 +470,8 @@ func TestDispersalAndRetrieval(t *testing.T) { 2: numOperators, }) assert.NoError(t, err) - cst.On("GetCurrentBlockNumber").Return(uint(10), nil) + cst.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(cst.Operators, nil) logger := logging.NewNoopLogger() assert.NoError(t, err) diff --git a/tools/semverscan/cmd/main.go b/tools/semverscan/cmd/main.go index aeb2ba4575..0eb4a44452 100644 --- a/tools/semverscan/cmd/main.go +++ b/tools/semverscan/cmd/main.go @@ -62,7 +62,10 @@ func RunScan(ctx *cli.Context) error { chainState := eth.NewChainState(tx, gethClient) logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + ics, err := thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + if err != nil { + return fmt.Errorf("failed to create indexed chain state - %s", err) + } currentBlock, err := ics.GetCurrentBlockNumber() if err != nil { diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index beec5e393b..8123aeaff8 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -173,7 +173,10 @@ func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth cs := eth.NewChainState(tx, gethClient) - chainState := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) + chainState, err := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) + if err != nil { + panic(fmt.Sprintf("Unable to instantiate chainState: %s", err)) + } var assignmentCoordinator core.AssignmentCoordinator = &core.StdAssignmentCoordinator{}