Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operator state cache to IndexedChainState #983

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
}

var (
indexedChainState core.IndexedChainState
chainState core.ChainState
indexedChainState *coremock.ChainDataMock
chainState *coremock.ChainDataMock
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
Expand Down Expand Up @@ -89,6 +89,7 @@ func setup(t *testing.T) {
if err != nil {
t.Fatalf("failed to create new mocked indexed chain data: %s", err)
}
indexedChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(chainState.Operators, nil)

nodeClient = clientsmock.NewNodeClient()
coordinator = &core.StdAssignmentCoordinator{}
Expand All @@ -100,12 +101,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
retrievalClient, err = clients.NewRetrievalClient(logger, indexedChainState, coordinator, nodeClient, v, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand All @@ -132,6 +128,7 @@ func setup(t *testing.T) {
},
Data: codec.ConvertByPaddingEmptyByte(gettysburgAddressBytes),
}

operatorState, err = indexedChainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID})
if err != nil {
t.Fatalf("failed to get operator state: %s", err)
Expand Down
39 changes: 37 additions & 2 deletions core/indexer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,43 @@ package indexer

import (
"context"
"encoding/binary"
"errors"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/indexer"
lru "github.com/hashicorp/golang-lru/v2"
)

type IndexedChainState struct {
core.ChainState

Indexer indexer.Indexer
Indexer indexer.Indexer
operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
}

var _ core.IndexedChainState = (*IndexedChainState)(nil)

func NewIndexedChainState(
chainState core.ChainState,
indexer indexer.Indexer,
cacheSize int,
) (*IndexedChainState, error) {
var operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
var err error

if cacheSize > 0 {
operatorStateCache, err = lru.New[string, *core.IndexedOperatorState](cacheSize)
if err != nil {
return nil, err
}
}

return &IndexedChainState{
ChainState: chainState,
Indexer: indexer,

operatorStateCache: operatorStateCache,
}, nil
}

Expand All @@ -32,6 +47,13 @@ func (ics *IndexedChainState) Start(ctx context.Context) error {
}

func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {
// Check if the indexed operator state has been cached
cacheKey := computeCacheKey(blockNumber, quorums)
if ics.operatorStateCache != nil {
if val, ok := ics.operatorStateCache.Get(cacheKey); ok {
return val, nil
}
}

pubkeys, sockets, err := ics.getObjects(blockNumber)
if err != nil {
Expand Down Expand Up @@ -73,11 +95,14 @@ func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, block
AggKeys: aggKeys,
}

if ics.operatorStateCache != nil {
ics.operatorStateCache.Add(cacheKey, state)
}

return state, nil
}

func (ics *IndexedChainState) GetIndexedOperators(ctx context.Context, blockNumber uint) (map[core.OperatorID]*core.IndexedOperatorInfo, error) {

pubkeys, sockets, err := ics.getObjects(blockNumber)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,3 +163,13 @@ func (ics *IndexedChainState) getObjects(blockNumber uint) (*OperatorPubKeys, Op
return pubkeys, sockets, nil

}

// Computes a cache key for the operator state cache. The cache key is a
// combination of the block number and the quorum IDs. Note: the order of the
// quorum IDs matters.
func computeCacheKey(blockNumber uint, quorumIDs []uint8) string {
bytes := make([]byte, 8+len(quorumIDs))
binary.LittleEndian.PutUint64(bytes, uint64(blockNumber))
copy(bytes[8:], quorumIDs)
return string(bytes)
}
80 changes: 80 additions & 0 deletions core/indexer/state_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package indexer_test

import (
"context"
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/indexer"
coremock "github.com/Layr-Labs/eigenda/core/mock"
indexermock "github.com/Layr-Labs/eigenda/indexer/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type testComponents struct {
ChainState *coremock.ChainDataMock
Indexer *indexermock.MockIndexer
IndexedChainState *indexer.IndexedChainState
}

func TestIndexedOperatorStateCache(t *testing.T) {
c := createTestComponents(t)
pubKeys := &indexer.OperatorPubKeys{}
c.Indexer.On("GetObject", mock.Anything, 0).Return(pubKeys, nil)
sockets := indexer.OperatorSockets{
core.OperatorID{0, 1}: "socket1",
}
c.Indexer.On("GetObject", mock.Anything, 1).Return(sockets, nil)

operatorState := &core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
core.OperatorID{0}: {
Stake: big.NewInt(100),
Index: 0,
},
},
},
}
c.ChainState.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(operatorState, nil)

ctx := context.Background()
// Get the operator state for block 100 and quorum 0
_, err := c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1)

// Get the operator state for block 100 and quorum 0 again
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1)

// Get the operator state for block 100 and quorum 1
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 2)

// Get the operator state for block 101 and quorum 0 & 1
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3)

// Get the operator state for block 101 and quorum 0 & 1 again
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3)
}

func createTestComponents(t *testing.T) *testComponents {
chainState := &coremock.ChainDataMock{}
idx := &indexermock.MockIndexer{}
ics, err := indexer.NewIndexedChainState(chainState, idx, 1)
assert.NoError(t, err)
return &testComponents{
ChainState: chainState,
Indexer: idx,
IndexedChainState: ics,
}
}
2 changes: 1 addition & 1 deletion core/indexer/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func mustMakeChainState(env *deploy.Config, store indexer.HeaderStore, logger lo
)
Expect(err).ToNot(HaveOccurred())

chainState, err := indexedstate.NewIndexedChainState(cs, indexer)
chainState, err := indexedstate.NewIndexedChainState(cs, indexer, 0)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions core/mock/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
}

func (d *ChainDataMock) GetOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {
_ = d.Called(ctx, blockNumber, quorums)
state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums)

return state.OperatorState, nil
Expand Down
8 changes: 5 additions & 3 deletions core/test/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/mock"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
Expand All @@ -19,6 +19,7 @@ import (
"github.com/gammazero/workerpool"
"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

var (
Expand Down Expand Up @@ -88,14 +89,15 @@ func makeTestBlob(t *testing.T, length int, securityParams []*core.SecurityParam

// prepareBatch takes in multiple blob, encodes them, generates the associated assignments, and the batch header.
// These are the products that a disperser will need in order to disperse data to the DA nodes.
func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) ([]core.EncodedBlob, core.BatchHeader, *mock.ChainDataMock) {
func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) ([]core.EncodedBlob, core.BatchHeader, *coremock.ChainDataMock) {

cst, err := mock.MakeChainDataMock(map[uint8]int{
cst, err := coremock.MakeChainDataMock(map[uint8]int{
0: int(operatorCount),
1: int(operatorCount),
2: int(operatorCount),
})
assert.NoError(t, err)
cst.On("GetOperatorState", mock.Anything, mock.Anything, mock.Anything).Return(cst.Operators, nil)

batchHeader := core.BatchHeader{
ReferenceBlockNumber: bn,
Expand Down
27 changes: 18 additions & 9 deletions core/thegraph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
)

const (
EndpointFlagName = "thegraph.endpoint"
BackoffFlagName = "thegraph.backoff"
MaxRetriesFlagName = "thegraph.max_retries"
EndpointFlagName = "thegraph.endpoint"
BackoffFlagName = "thegraph.backoff"
MaxRetriesFlagName = "thegraph.max_retries"
OperatorStateCacheSize = "thegraph.operator_state_cache_size"
)

type Config struct {
Endpoint string // The Graph endpoint
PullInterval time.Duration // The interval to pull data from The Graph
MaxRetries int // The maximum number of retries to pull data from The Graph
Endpoint string // The Graph endpoint
PullInterval time.Duration // The interval to pull data from The Graph
MaxRetries int // The maximum number of retries to pull data from The Graph
OperatorStateCacheSize int // The size of the cache
}

func CLIFlags(envPrefix string) []cli.Flag {
Expand All @@ -39,15 +41,22 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: 5,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_MAX_RETRIES"),
},
cli.IntFlag{
Name: OperatorStateCacheSize,
Usage: "The size of the operator state cache in elements (0 to disable)",
Value: 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should default be 0 or 32?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the estimated size bytes for 32 entries and how many instances of this struct is expected to be created? If it's small enough it should be fine to enable by default

Copy link
Contributor Author

@dmanc dmanc Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the amount of operators but let's assume 200. The order of magnitude is ~1-5mb.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it seems this cache is not really useful:

Where does the cache help?

EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_OPERATOR_STATE_CACHE_SIZE"),
},
}
}

func ReadCLIConfig(ctx *cli.Context) Config {

return Config{
Endpoint: ctx.String(EndpointFlagName),
PullInterval: ctx.Duration(BackoffFlagName),
MaxRetries: ctx.Int(MaxRetriesFlagName),
Endpoint: ctx.String(EndpointFlagName),
PullInterval: ctx.Duration(BackoffFlagName),
MaxRetries: ctx.Int(MaxRetriesFlagName),
OperatorStateCacheSize: ctx.Int(OperatorStateCacheSize),
}

}
Loading
Loading