diff --git a/.mockery.yaml b/.mockery.yaml index 347d69c58..8b13bd649 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -41,3 +41,9 @@ packages: github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller: interfaces: RPCClient: + ORM: + config: + inpackage: True + dir: "pkg/solana/logpoller" + filename: mock_orm.go + mockname: mockORM diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go new file mode 100644 index 000000000..4a1496371 --- /dev/null +++ b/pkg/solana/logpoller/filters.go @@ -0,0 +1,316 @@ +package logpoller + +import ( + "context" + "errors" + "fmt" + "iter" + "maps" + "sync" + "sync/atomic" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type filters struct { + orm ORM + lggr logger.SugaredLogger + + filtersByID map[int64]*Filter + filtersByName map[string]int64 + filtersByAddress map[PublicKey]map[EventSignature]map[int64]struct{} + filtersToBackfill map[int64]struct{} + filtersToDelete map[int64]Filter + filtersMutex sync.RWMutex + loadedFilters atomic.Bool +} + +func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { + return &filters{ + orm: orm, + lggr: lggr, + } +} + +// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. +func (fl *filters) PruneFilters(ctx context.Context) error { + err := fl.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + fl.filtersMutex.Lock() + filtersToDelete := fl.filtersToDelete + fl.filtersToDelete = make(map[int64]Filter) + fl.filtersMutex.Unlock() + + if len(filtersToDelete) == 0 { + return nil + } + + err = fl.orm.DeleteFilters(ctx, filtersToDelete) + if err != nil { + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + maps.Copy(fl.filtersToDelete, filtersToDelete) + return fmt.Errorf("failed to delete filters: %w", err) + } + + return nil +} + +// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address +// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. +// The filter may be unregistered later by filter.Name. +// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if +// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. +// Otherwise, updates remaining fields and schedules backfill. +// Warnings/debug information is keyed by filter name. +func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { + if len(filter.Name) == 0 { + return errors.New("name is required") + } + + err := fl.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + + filter.IsBackfilled = false + if existingFilterID, ok := fl.filtersByName[filter.Name]; ok { + existingFilter := fl.filtersByID[existingFilterID] + if !existingFilter.MatchSameLogs(filter) { + return ErrFilterNameConflict + } + if existingFilter.IsBackfilled { + // if existing filter was already backfilled, but starting block was higher we need to backfill filter again + filter.IsBackfilled = existingFilter.StartingBlock <= filter.StartingBlock + } + + fl.removeFilterFromIndexes(*existingFilter) + } + + filterID, err := fl.orm.InsertFilter(ctx, filter) + if err != nil { + return fmt.Errorf("failed to insert filter: %w", err) + } + + filter.ID = filterID + + fl.filtersByName[filter.Name] = filter.ID + fl.filtersByID[filter.ID] = &filter + filtersForAddress, ok := fl.filtersByAddress[filter.Address] + if !ok { + filtersForAddress = make(map[EventSignature]map[int64]struct{}) + fl.filtersByAddress[filter.Address] = filtersForAddress + } + + filtersForEventSig, ok := filtersForAddress[filter.EventSig] + if !ok { + filtersForEventSig = make(map[int64]struct{}) + filtersForAddress[filter.EventSig] = filtersForEventSig + } + + filtersForEventSig[filter.ID] = struct{}{} + if !filter.IsBackfilled { + fl.filtersToBackfill[filter.ID] = struct{}{} + } + return nil +} + +// UnregisterFilter will mark the filter with the given name for pruning and async prune all corresponding logs. +// If the name does not exist, it will log an error but not return an error. +// Warnings/debug information is keyed by filter name. +func (fl *filters) UnregisterFilter(ctx context.Context, name string) error { + err := fl.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + + filterID, ok := fl.filtersByName[name] + if !ok { + fl.lggr.Warnw("Filter not found in filtersByName", "name", name) + return nil + } + + filter, ok := fl.filtersByID[filterID] + if !ok { + fl.lggr.Errorw("Filter not found in filtersByID", "id", filterID, "name", name) + return nil + } + + if err := fl.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { + return fmt.Errorf("failed to mark filter deleted: %w", err) + } + + fl.removeFilterFromIndexes(*filter) + + fl.filtersToDelete[filter.ID] = *filter + return nil +} + +func (fl *filters) removeFilterFromIndexes(filter Filter) { + delete(fl.filtersByName, filter.Name) + delete(fl.filtersToBackfill, filter.ID) + delete(fl.filtersByID, filter.ID) + + filtersForAddress, ok := fl.filtersByAddress[filter.Address] + if !ok { + fl.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address) + return + } + + filtersForEventSig, ok := filtersForAddress[filter.EventSig] + if !ok { + fl.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) + return + } + + delete(filtersForEventSig, filter.ID) + if len(filtersForEventSig) == 0 { + delete(filtersForAddress, filter.EventSig) + } + + if len(filtersForAddress) == 0 { + delete(fl.filtersByAddress, filter.Address) + } +} + +// MatchingFilters - returns iterator to go through all matching filters. +// Requires LoadFilters to be called at least once. +func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + if !fl.loadedFilters.Load() { + fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") + return nil + } + return func(yield func(Filter) bool) { + fl.filtersMutex.RLock() + defer fl.filtersMutex.RUnlock() + filters, ok := fl.filtersByAddress[addr] + if !ok { + return + } + + for filterID := range filters[eventSignature] { + filter, ok := fl.filtersByID[filterID] + if !ok { + fl.lggr.Errorw("expected filter to exist in filtersByID", "filterID", filterID) + continue + } + if !yield(*filter) { + return + } + } + } +} + +// GetFiltersToBackfill - returns copy of backfill queue +// Requires LoadFilters to be called at least once. +func (fl *filters) GetFiltersToBackfill() []Filter { + if !fl.loadedFilters.Load() { + fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") + return nil + } + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + result := make([]Filter, 0, len(fl.filtersToBackfill)) + for filterID := range fl.filtersToBackfill { + filter, ok := fl.filtersByID[filterID] + if !ok { + fl.lggr.Errorw("expected filter to exist in filtersByID", "filterID", filterID) + continue + } + result = append(result, *filter) + } + + return result +} + +func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) error { + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + filter, ok := fl.filtersByID[filterID] + if !ok { + return fmt.Errorf("filter %d not found", filterID) + } + err := fl.orm.MarkFilterBackfilled(ctx, filterID) + if err != nil { + return fmt.Errorf("failed to mark filter backfilled: %w", err) + } + + filter.IsBackfilled = true + delete(fl.filtersToBackfill, filter.ID) + return nil +} + +// LoadFilters - loads filters from database. Can be called multiple times without side effects. +func (fl *filters) LoadFilters(ctx context.Context) error { + if fl.loadedFilters.Load() { + return nil + } + + fl.lggr.Debugw("Loading filters from db") + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + // reset filters' indexes to ensure we do not have partial data from the previous run + fl.filtersByID = make(map[int64]*Filter) + fl.filtersByName = make(map[string]int64) + fl.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]struct{}) + fl.filtersToBackfill = make(map[int64]struct{}) + fl.filtersToDelete = make(map[int64]Filter) + + filters, err := fl.orm.SelectFilters(ctx) + if err != nil { + return fmt.Errorf("failed to select filters from db: %w", err) + } + + for i := range filters { + filter := filters[i] + if filter.IsDeleted { + fl.filtersToDelete[filter.ID] = filter + continue + } + + fl.filtersByID[filter.ID] = &filter + + if _, ok := fl.filtersByName[filter.Name]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) + fl.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + fl.filtersByName[filter.Name] = filter.ID + filtersForAddress, ok := fl.filtersByAddress[filter.Address] + if !ok { + filtersForAddress = make(map[EventSignature]map[int64]struct{}) + fl.filtersByAddress[filter.Address] = filtersForAddress + } + + filtersForEventSig, ok := filtersForAddress[filter.EventSig] + if !ok { + filtersForEventSig = make(map[int64]struct{}) + filtersForAddress[filter.EventSig] = filtersForEventSig + } + + if _, ok := filtersForEventSig[filter.ID]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique ID: %d ", filter.ID) + fl.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + filtersForEventSig[filter.ID] = struct{}{} + if !filter.IsBackfilled { + fl.filtersToBackfill[filter.ID] = struct{}{} + } + } + + fl.loadedFilters.Store(true) + + return nil +} diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go new file mode 100644 index 000000000..9f8058703 --- /dev/null +++ b/pkg/solana/logpoller/filters_test.go @@ -0,0 +1,365 @@ +package logpoller + +import ( + "errors" + "fmt" + "slices" + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/google/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +func TestFilters_LoadFilters(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(logger.Sugared(logger.Test(t)), orm) + ctx := tests.Context(t) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + deleted := Filter{ + ID: 3, + Name: "Deleted", + IsDeleted: true, + } + happyPath := Filter{ + ID: 1, + Name: "Happy path", + } + happyPath2 := Filter{ + ID: 2, + Name: "Happy path 2", + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + deleted, + happyPath, + happyPath2, + }, nil).Once() + + err := fs.LoadFilters(ctx) + require.EqualError(t, err, "failed to select filters from db: db failed") + err = fs.LoadFilters(ctx) + require.NoError(t, err) + // only one filter to delete + require.Len(t, fs.filtersToDelete, 1) + require.Equal(t, deleted, fs.filtersToDelete[deleted.ID]) + // both happy path are indexed + require.Len(t, fs.filtersByAddress, 1) + require.Len(t, fs.filtersByAddress[happyPath.Address], 1) + require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath.ID) + require.Equal(t, happyPath, *fs.filtersByID[happyPath.ID]) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2.ID) + require.Equal(t, happyPath2, *fs.filtersByID[happyPath2.ID]) + require.Len(t, fs.filtersByName, 2) + require.Equal(t, fs.filtersByName[happyPath.Name], happyPath.ID) + require.Equal(t, fs.filtersByName[happyPath2.Name], happyPath2.ID) + // any call following successful should be noop + err = fs.LoadFilters(ctx) + require.NoError(t, err) +} + +func TestFilters_RegisterFilter(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Returns an error if name is empty", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + err := fs.RegisterFilter(tests.Context(t), Filter{}) + require.EqualError(t, err, "name is required") + }) + t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + err := fs.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) + require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") + }) + t.Run("Returns an error if trying to update primary fields", func(t *testing.T) { + testCases := []struct { + Name string + ModifyField func(*Filter) + }{ + { + Name: "Address", + ModifyField: func(f *Filter) { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + f.Address = PublicKey(privateKey.PublicKey()) + }, + }, + { + Name: "EventSig", + ModifyField: func(f *Filter) { + f.EventSig = EventSignature{3, 2, 1} + }, + }, + { + Name: "EventIDL", + ModifyField: func(f *Filter) { + f.EventIDL = uuid.NewString() + }, + }, + { + Name: "SubkeyPaths", + ModifyField: func(f *Filter) { + f.SubkeyPaths = [][]string{{uuid.NewString()}} + }, + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("Updating %s", tc.Name), func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + dbFilter := Filter{Name: filterName} + orm.On("SelectFilters", mock.Anything).Return([]Filter{dbFilter}, nil).Once() + newFilter := dbFilter + tc.ModifyField(&newFilter) + err := fs.RegisterFilter(tests.Context(t), newFilter) + require.EqualError(t, err, ErrFilterNameConflict.Error()) + }) + } + }) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() + filter := Filter{Name: filterName} + err := fs.RegisterFilter(tests.Context(t), filter) + require.Error(t, err) + // can readd after db issue is resolved + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() + err = fs.RegisterFilter(tests.Context(t), filter) + require.NoError(t, err) + // can update non-primary fields + filter.EventName = uuid.NewString() + filter.StartingBlock++ + filter.Retention++ + filter.MaxLogsKept++ + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() + err = fs.RegisterFilter(tests.Context(t), filter) + require.NoError(t, err) + storedFilters := slices.Collect(fs.MatchingFilters(filter.Address, filter.EventSig)) + require.Len(t, storedFilters, 1) + filter.ID = 1 + require.Equal(t, filter, storedFilters[0]) + }) + t.Run("Can reregister after unregister", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + const filterID = int64(10) + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() + err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + orm.On("MarkFilterDeleted", mock.Anything, filterID).Return(nil).Once() + err = fs.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID+1, nil).Once() + err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + require.Len(t, fs.filtersToDelete, 1) + require.Equal(t, Filter{Name: filterName, ID: filterID}, fs.filtersToDelete[filterID]) + require.Len(t, fs.filtersToBackfill, 1) + require.Contains(t, fs.filtersToBackfill, filterID+1) + }) +} + +func TestFilters_UnregisterFilter(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + err := fs.UnregisterFilter(tests.Context(t), "Filter") + require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") + }) + t.Run("Noop if filter is not present", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + err := fs.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + }) + t.Run("Returns error if fails to mark filter as deleted", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + const id int64 = 10 + orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() + err := fs.UnregisterFilter(tests.Context(t), filterName) + require.EqualError(t, err, "failed to mark filter deleted: db query failed") + }) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + const id int64 = 10 + orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() + err := fs.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + require.Len(t, fs.filtersToDelete, 1) + require.Len(t, fs.filtersToBackfill, 0) + require.Len(t, fs.filtersByName, 0) + require.Len(t, fs.filtersByAddress, 0) + }) +} + +func TestFilters_PruneFilters(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + toDelete := Filter{ + ID: 1, + Name: "To delete", + IsDeleted: true, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + toDelete, + { + ID: 2, + Name: "To keep", + }, + }, nil).Once() + orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(nil).Once() + err := fs.PruneFilters(tests.Context(t)) + require.NoError(t, err) + require.Len(t, fs.filtersToDelete, 0) + }) + t.Run("If DB removal fails will add filters back into removal slice ", func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + toDelete := Filter{ + ID: 1, + Name: "To delete", + IsDeleted: true, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + toDelete, + { + ID: 2, + Name: "To keep", + }, + }, nil).Once() + newToDelete := Filter{ + ID: 3, + Name: "To delete 2", + } + orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { + orm.On("MarkFilterDeleted", mock.Anything, newToDelete.ID).Return(nil).Once() + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(newToDelete.ID, nil).Once() + require.NoError(t, fs.RegisterFilter(tests.Context(t), newToDelete)) + require.NoError(t, fs.UnregisterFilter(tests.Context(t), newToDelete.Name)) + }).Once() + err := fs.PruneFilters(tests.Context(t)) + require.EqualError(t, err, "failed to delete filters: db failed") + require.Equal(t, fs.filtersToDelete, map[int64]Filter{newToDelete.ID: newToDelete, toDelete.ID: toDelete}) + }) +} + +func TestFilters_MatchingFilters(t *testing.T) { + orm := newMockORM(t) + lggr := logger.Sugared(logger.Test(t)) + expectedFilter1 := Filter{ + ID: 1, + Name: "expectedFilter1", + Address: newRandomPublicKey(t), + EventSig: newRandomEventSignature(t), + } + expectedFilter2 := Filter{ + ID: 2, + Name: "expectedFilter2", + Address: expectedFilter1.Address, + EventSig: expectedFilter1.EventSig, + } + sameAddress := Filter{ + ID: 3, + Name: "sameAddressWrongEventSig", + Address: expectedFilter1.Address, + EventSig: newRandomEventSignature(t), + } + + sameEventSig := Filter{ + ID: 4, + Name: "wrongAddressSameEventSig", + Address: newRandomPublicKey(t), + EventSig: expectedFilter1.EventSig, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{expectedFilter1, expectedFilter2, sameAddress, sameEventSig}, nil).Once() + filters := newFilters(lggr, orm) + err := filters.LoadFilters(tests.Context(t)) + require.NoError(t, err) + matchingFilters := slices.Collect(filters.MatchingFilters(expectedFilter1.Address, expectedFilter1.EventSig)) + require.Len(t, matchingFilters, 2) + require.Contains(t, matchingFilters, expectedFilter1) + require.Contains(t, matchingFilters, expectedFilter2) + // if at least one key does not match - returns empty iterator + require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), expectedFilter1.EventSig))) + require.Empty(t, slices.Collect(filters.MatchingFilters(expectedFilter1.Address, newRandomEventSignature(t)))) + require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), newRandomEventSignature(t)))) +} + +func TestFilters_GetFiltersToBackfill(t *testing.T) { + orm := newMockORM(t) + lggr := logger.Sugared(logger.Test(t)) + backfilledFilter := Filter{ + ID: 1, + Name: "backfilled", + StartingBlock: 100, + IsBackfilled: true, + } + notBackfilled := Filter{ + ID: 2, + StartingBlock: 101, + Name: "notBackfilled", + } + orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{backfilledFilter, notBackfilled}, nil).Once() + filters := newFilters(lggr, orm) + err := filters.LoadFilters(tests.Context(t)) + require.NoError(t, err) + // filters that were not backfilled are properly identified on load + ensureInQueue := func(expectedFilters ...Filter) { + filtersToBackfill := filters.GetFiltersToBackfill() + require.Len(t, filtersToBackfill, len(expectedFilters)) + for _, expectedFilter := range expectedFilters { + require.Contains(t, filtersToBackfill, expectedFilter) + } + } + ensureInQueue(notBackfilled) + // filter remains in queue if failed to mark as backfilled + orm.EXPECT().MarkFilterBackfilled(mock.Anything, notBackfilled.ID).Return(errors.New("db call failed")).Once() + err = filters.MarkFilterBackfilled(tests.Context(t), notBackfilled.ID) + require.Error(t, err) + ensureInQueue(notBackfilled) + // filter is removed from queue, if marked as backfilled + orm.EXPECT().MarkFilterBackfilled(mock.Anything, notBackfilled.ID).Return(nil).Once() + err = filters.MarkFilterBackfilled(tests.Context(t), notBackfilled.ID) + require.NoError(t, err) + require.Empty(t, filters.GetFiltersToBackfill()) + // re adding identical filter won't trigger backfill + orm.EXPECT().InsertFilter(mock.Anything, mock.Anything).Return(backfilledFilter.ID, nil).Once() + require.NoError(t, filters.RegisterFilter(tests.Context(t), backfilledFilter)) + orm.EXPECT().InsertFilter(mock.Anything, mock.Anything).Return(notBackfilled.ID, nil).Once() + require.NoError(t, filters.RegisterFilter(tests.Context(t), notBackfilled)) + require.Empty(t, filters.GetFiltersToBackfill()) + // older StartingBlock trigger backfill + notBackfilled.StartingBlock = notBackfilled.StartingBlock - 1 + orm.EXPECT().InsertFilter(mock.Anything, mock.Anything).Return(notBackfilled.ID, nil).Once() + require.NoError(t, filters.RegisterFilter(tests.Context(t), notBackfilled)) + ensureInQueue(notBackfilled) + // new filter is always added to the queue + newFilter := Filter{Name: "new filter", ID: 3} + orm.EXPECT().InsertFilter(mock.Anything, newFilter).Return(newFilter.ID, nil).Once() + require.NoError(t, filters.RegisterFilter(tests.Context(t), newFilter)) + ensureInQueue(notBackfilled, newFilter) +} diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go new file mode 100644 index 000000000..4c386693e --- /dev/null +++ b/pkg/solana/logpoller/log_poller.go @@ -0,0 +1,145 @@ +package logpoller + +import ( + "context" + "errors" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +var ( + ErrFilterNameConflict = errors.New("filter with such name already exists") +) + +type ORM interface { + InsertFilter(ctx context.Context, filter Filter) (id int64, err error) + SelectFilters(ctx context.Context) ([]Filter, error) + DeleteFilters(ctx context.Context, filters map[int64]Filter) error + MarkFilterDeleted(ctx context.Context, id int64) (err error) + MarkFilterBackfilled(ctx context.Context, id int64) (err error) +} + +type LogPoller struct { + services.Service + eng *services.Engine + + lggr logger.SugaredLogger + orm ORM + + filters *filters +} + +func New(lggr logger.SugaredLogger, orm ORM) *LogPoller { + lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) + lp := &LogPoller{ + orm: orm, + lggr: lggr, + filters: newFilters(lggr, orm), + } + + lp.Service, lp.eng = services.Config{ + Name: "LogPollerService", + Start: lp.start, + }.NewServiceEngine(lggr) + lp.lggr = lp.eng.SugaredLogger + return lp +} + +func (lp *LogPoller) start(context.Context) error { + lp.eng.Go(lp.run) + lp.eng.Go(lp.backgroundWorkerRun) + return nil +} + +// RegisterFilter - refer to filters.RegisterFilter for details. +func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { + ctx, cancel := lp.eng.Ctx(ctx) + defer cancel() + return lp.filters.RegisterFilter(ctx, filter) +} + +// UnregisterFilter refer to filters.UnregisterFilter for details +func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { + ctx, cancel := lp.eng.Ctx(ctx) + defer cancel() + return lp.filters.UnregisterFilter(ctx, name) +} + +func (lp *LogPoller) loadFilters(ctx context.Context) error { + retryTicker := services.TickerConfig{Initial: 0, JitterPct: services.DefaultJitter}.NewTicker(time.Second) + defer retryTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-retryTicker.C: + } + err := lp.filters.LoadFilters(ctx) + if err != nil { + lp.lggr.Errorw("Failed loading filters in init logpoller loop, retrying later", "err", err) + continue + } + + return nil + } +} + +func (lp *LogPoller) run(ctx context.Context) { + err := lp.loadFilters(ctx) + if err != nil { + lp.lggr.Warnw("Failed loading filters", "err", err) + return + } + + var blocks chan struct { + BlockNumber int64 + Logs any // to be defined + } + + for { + select { + case <-ctx.Done(): + return + case block := <-blocks: + filtersToBackfill := lp.filters.GetFiltersToBackfill() + + // TODO: NONEVM-916 parse, filters and persist logs + // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert + // of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error + for i := range filtersToBackfill { + filter := filtersToBackfill[i] + lp.eng.Go(func(ctx context.Context) { + lp.startFilterBackfill(ctx, filter, block.BlockNumber) + }) + } + } + } +} + +func (lp *LogPoller) backgroundWorkerRun(ctx context.Context) { + pruneFilters := services.NewTicker(time.Minute) + defer pruneFilters.Stop() + for { + select { + case <-ctx.Done(): + return + case <-pruneFilters.C: + err := lp.filters.PruneFilters(ctx) + if err != nil { + lp.lggr.Errorw("Failed to prune filters", "err", err) + } + } + } +} + +func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { + // TODO: NONEVM-916 start backfill + lp.lggr.Debugw("Starting filter backfill", "filter", filter) + err := lp.filters.MarkFilterBackfilled(ctx, filter.ID) + if err != nil { + lp.lggr.Errorw("Failed to mark filter backfill", "filter", filter, "err", err) + } +} diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go new file mode 100644 index 000000000..0595ea718 --- /dev/null +++ b/pkg/solana/logpoller/mock_orm.go @@ -0,0 +1,292 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package logpoller + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// mockORM is an autogenerated mock type for the ORM type +type mockORM struct { + mock.Mock +} + +type mockORM_Expecter struct { + mock *mock.Mock +} + +func (_m *mockORM) EXPECT() *mockORM_Expecter { + return &mockORM_Expecter{mock: &_m.Mock} +} + +// DeleteFilters provides a mock function with given fields: ctx, filters +func (_m *mockORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { + ret := _m.Called(ctx, filters) + + if len(ret) == 0 { + panic("no return value specified for DeleteFilters") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[int64]Filter) error); ok { + r0 = rf(ctx, filters) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_DeleteFilters_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteFilters' +type mockORM_DeleteFilters_Call struct { + *mock.Call +} + +// DeleteFilters is a helper method to define mock.On call +// - ctx context.Context +// - filters map[int64]Filter +func (_e *mockORM_Expecter) DeleteFilters(ctx interface{}, filters interface{}) *mockORM_DeleteFilters_Call { + return &mockORM_DeleteFilters_Call{Call: _e.mock.On("DeleteFilters", ctx, filters)} +} + +func (_c *mockORM_DeleteFilters_Call) Run(run func(ctx context.Context, filters map[int64]Filter)) *mockORM_DeleteFilters_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[int64]Filter)) + }) + return _c +} + +func (_c *mockORM_DeleteFilters_Call) Return(_a0 error) *mockORM_DeleteFilters_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockORM_DeleteFilters_Call) RunAndReturn(run func(context.Context, map[int64]Filter) error) *mockORM_DeleteFilters_Call { + _c.Call.Return(run) + return _c +} + +// InsertFilter provides a mock function with given fields: ctx, filter +func (_m *mockORM) InsertFilter(ctx context.Context, filter Filter) (int64, error) { + ret := _m.Called(ctx, filter) + + if len(ret) == 0 { + panic("no return value specified for InsertFilter") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, Filter) (int64, error)); ok { + return rf(ctx, filter) + } + if rf, ok := ret.Get(0).(func(context.Context, Filter) int64); ok { + r0 = rf(ctx, filter) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, Filter) error); ok { + r1 = rf(ctx, filter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_InsertFilter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertFilter' +type mockORM_InsertFilter_Call struct { + *mock.Call +} + +// InsertFilter is a helper method to define mock.On call +// - ctx context.Context +// - filter Filter +func (_e *mockORM_Expecter) InsertFilter(ctx interface{}, filter interface{}) *mockORM_InsertFilter_Call { + return &mockORM_InsertFilter_Call{Call: _e.mock.On("InsertFilter", ctx, filter)} +} + +func (_c *mockORM_InsertFilter_Call) Run(run func(ctx context.Context, filter Filter)) *mockORM_InsertFilter_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(Filter)) + }) + return _c +} + +func (_c *mockORM_InsertFilter_Call) Return(id int64, err error) *mockORM_InsertFilter_Call { + _c.Call.Return(id, err) + return _c +} + +func (_c *mockORM_InsertFilter_Call) RunAndReturn(run func(context.Context, Filter) (int64, error)) *mockORM_InsertFilter_Call { + _c.Call.Return(run) + return _c +} + +// MarkFilterBackfilled provides a mock function with given fields: ctx, id +func (_m *mockORM) MarkFilterBackfilled(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterBackfilled") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_MarkFilterBackfilled_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkFilterBackfilled' +type mockORM_MarkFilterBackfilled_Call struct { + *mock.Call +} + +// MarkFilterBackfilled is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *mockORM_Expecter) MarkFilterBackfilled(ctx interface{}, id interface{}) *mockORM_MarkFilterBackfilled_Call { + return &mockORM_MarkFilterBackfilled_Call{Call: _e.mock.On("MarkFilterBackfilled", ctx, id)} +} + +func (_c *mockORM_MarkFilterBackfilled_Call) Run(run func(ctx context.Context, id int64)) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *mockORM_MarkFilterBackfilled_Call) Return(err error) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_MarkFilterBackfilled_Call) RunAndReturn(run func(context.Context, int64) error) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Return(run) + return _c +} + +// MarkFilterDeleted provides a mock function with given fields: ctx, id +func (_m *mockORM) MarkFilterDeleted(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterDeleted") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_MarkFilterDeleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkFilterDeleted' +type mockORM_MarkFilterDeleted_Call struct { + *mock.Call +} + +// MarkFilterDeleted is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *mockORM_Expecter) MarkFilterDeleted(ctx interface{}, id interface{}) *mockORM_MarkFilterDeleted_Call { + return &mockORM_MarkFilterDeleted_Call{Call: _e.mock.On("MarkFilterDeleted", ctx, id)} +} + +func (_c *mockORM_MarkFilterDeleted_Call) Run(run func(ctx context.Context, id int64)) *mockORM_MarkFilterDeleted_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *mockORM_MarkFilterDeleted_Call) Return(err error) *mockORM_MarkFilterDeleted_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_MarkFilterDeleted_Call) RunAndReturn(run func(context.Context, int64) error) *mockORM_MarkFilterDeleted_Call { + _c.Call.Return(run) + return _c +} + +// SelectFilters provides a mock function with given fields: ctx +func (_m *mockORM) SelectFilters(ctx context.Context) ([]Filter, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectFilters") + } + + var r0 []Filter + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]Filter, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []Filter); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]Filter) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_SelectFilters_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectFilters' +type mockORM_SelectFilters_Call struct { + *mock.Call +} + +// SelectFilters is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockORM_Expecter) SelectFilters(ctx interface{}) *mockORM_SelectFilters_Call { + return &mockORM_SelectFilters_Call{Call: _e.mock.On("SelectFilters", ctx)} +} + +func (_c *mockORM_SelectFilters_Call) Run(run func(ctx context.Context)) *mockORM_SelectFilters_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockORM_SelectFilters_Call) Return(_a0 []Filter, _a1 error) *mockORM_SelectFilters_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockORM_SelectFilters_Call) RunAndReturn(run func(context.Context) ([]Filter, error)) *mockORM_SelectFilters_Call { + _c.Call.Return(run) + return _c +} + +// newMockORM creates a new instance of mockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockORM(t interface { + mock.TestingT + Cleanup(func()) +}) *mockORM { + mock := &mockORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index 7afb0a50e..0be5b4874 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -7,16 +7,22 @@ import ( ) type Filter struct { - ID int64 + ID int64 // only for internal usage. Values set externally are ignored. Name string Address PublicKey EventName string - EventSig []byte + EventSig EventSignature StartingBlock int64 EventIDL string SubkeyPaths SubkeyPaths Retention time.Duration MaxLogsKept int64 + IsDeleted bool // only for internal usage. Values set externally are ignored. + IsBackfilled bool // only for internal usage. Values set externally are ignored. +} + +func (f Filter) MatchSameLogs(other Filter) bool { + return f.Address == other.Address && f.EventSig == other.EventSig && f.EventIDL == other.EventIDL && f.SubkeyPaths.Equal(other.SubkeyPaths) } type Log struct { @@ -28,7 +34,7 @@ type Log struct { BlockNumber int64 BlockTimestamp time.Time Address PublicKey - EventSig []byte + EventSig EventSignature SubkeyValues pq.ByteaArray TxHash Signature Data []byte diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 12035438f..4ca6fee5b 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -9,6 +9,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) +var _ ORM = (*DSORM)(nil) + type DSORM struct { chainID string ds sqlutil.DataSource @@ -47,6 +49,7 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err withStartingBlock(filter.StartingBlock). withEventIDL(filter.EventIDL). withSubkeyPaths(filter.SubkeyPaths). + withIsBackfilled(filter.IsBackfilled). toArgs() if err != nil { return 0, err @@ -56,8 +59,14 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err // https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428 query := ` INSERT INTO solana.log_poller_filters - (chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept) - VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept) + (chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept, is_backfilled) + VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept, :is_backfilled) + ON CONFLICT (chain_id, name) WHERE NOT is_deleted DO UPDATE SET + event_name = EXCLUDED.event_name, + starting_block = EXCLUDED.starting_block, + retention = EXCLUDED.retention, + max_logs_kept = EXCLUDED.max_logs_kept, + is_backfilled = EXCLUDED.is_backfilled RETURNING id;` query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -72,13 +81,48 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err // GetFilterByID returns filter by ID func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) { - query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept - FROM solana.log_poller_filters WHERE id = $1` + query := filtersQuery("WHERE id = $1") var result Filter err := o.ds.GetContext(ctx, &result, query, id) return result, err } +func (o *DSORM) MarkFilterDeleted(ctx context.Context, id int64) (err error) { + query := `UPDATE solana.log_poller_filters SET is_deleted = true WHERE id = $1` + _, err = o.ds.ExecContext(ctx, query, id) + return err +} + +func (o *DSORM) MarkFilterBackfilled(ctx context.Context, id int64) (err error) { + query := `UPDATE solana.log_poller_filters SET is_backfilled = true WHERE id = $1` + _, err = o.ds.ExecContext(ctx, query, id) + return err +} + +func (o *DSORM) DeleteFilter(ctx context.Context, id int64) (err error) { + query := `DELETE FROM solana.log_poller_filters WHERE id = $1` + _, err = o.ds.ExecContext(ctx, query, id) + return err +} + +func (o *DSORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { + for _, filter := range filters { + err := o.DeleteFilter(ctx, filter.ID) + if err != nil { + return fmt.Errorf("error deleting filter %s (%d): %w", filter.Name, filter.ID, err) + } + } + + return nil +} + +func (o *DSORM) SelectFilters(ctx context.Context) ([]Filter, error) { + query := filtersQuery("WHERE chain_id = $1") + var filters []Filter + err := o.ds.SelectContext(ctx, &filters, query, o.chainID) + return filters, err +} + // InsertLogs is idempotent to support replays. func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { if err := o.validateLogs(logs); err != nil { @@ -127,7 +171,7 @@ func (o *DSORM) validateLogs(logs []Log) error { } // SelectLogs finds the logs in a given block range. -func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) { +func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig EventSignature) ([]Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withStartBlock(start). withEndBlock(end). diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 626f17bdd..53512d696 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -8,10 +8,10 @@ import ( "github.com/gagliardetto/solana-go" "github.com/google/uuid" - _ "github.com/jackc/pgx/v4/stdlib" - "github.com/lib/pq" "github.com/stretchr/testify/require" + _ "github.com/jackc/pgx/v4/stdlib" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -21,9 +21,6 @@ import ( func TestLogPollerFilters(t *testing.T) { lggr := logger.Test(t) - chainID := uuid.NewString() - dbx := pg.NewTestDB(t, pg.TestURL(t)) - orm := NewORM(chainID, dbx, lggr) privateKey, err := solana.NewRandomPrivateKey() require.NoError(t, err) @@ -34,7 +31,7 @@ func TestLogPollerFilters(t *testing.T) { Name: "happy path", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}), @@ -45,7 +42,7 @@ func TestLogPollerFilters(t *testing.T) { Name: "empty sub key paths", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: SubkeyPaths([][]string{}), @@ -56,7 +53,7 @@ func TestLogPollerFilters(t *testing.T) { Name: "nil sub key paths", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: nil, @@ -68,6 +65,9 @@ func TestLogPollerFilters(t *testing.T) { for _, filter := range filters { t.Run("Read/write filter: "+filter.Name, func(t *testing.T) { ctx := tests.Context(t) + chainID := uuid.NewString() + dbx := pg.NewTestDB(t, pg.TestURL(t)) + orm := NewORM(chainID, dbx, lggr) id, err := orm.InsertFilter(ctx, filter) require.NoError(t, err) filter.ID = id @@ -77,32 +77,109 @@ func TestLogPollerFilters(t *testing.T) { }) } }) - t.Run("Returns and error if name is not unique", func(t *testing.T) { + t.Run("Updates non primary fields if name and chainID is not unique", func(t *testing.T) { + chainID := uuid.NewString() + dbx := pg.NewTestDB(t, pg.TestURL(t)) + orm := NewORM(chainID, dbx, lggr) filter := newRandomFilter(t) ctx := tests.Context(t) - _, err = orm.InsertFilter(ctx, filter) + id, err := orm.InsertFilter(ctx, filter) require.NoError(t, err) - filter.EventSig = []byte(uuid.NewString()) - _, err = orm.InsertFilter(ctx, filter) - require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`) + filter.EventName = uuid.NewString() + filter.StartingBlock++ + filter.Retention++ + filter.MaxLogsKept++ + id2, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + require.Equal(t, id, id2) + dbFilter, err := orm.GetFilterByID(ctx, id) + require.NoError(t, err) + filter.ID = id + require.Equal(t, filter, dbFilter) }) -} + t.Run("Allows reuse name of a filter marked as deleted", func(t *testing.T) { + chainID := uuid.NewString() + dbx := pg.NewTestDB(t, pg.TestURL(t)) + orm := NewORM(chainID, dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + // mark deleted + err = orm.MarkFilterDeleted(ctx, filterID) + require.NoError(t, err) + // ensure marked as deleted + dbFilter, err := orm.GetFilterByID(ctx, filterID) + require.NoError(t, err) + require.True(t, dbFilter.IsDeleted, "expected to be deleted") + newFilterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + require.NotEqual(t, newFilterID, filterID, "expected db to generate new filter as we can not be sure that new one matches the same logs") + }) + t.Run("Allows reuse name for a filter with different chainID", func(t *testing.T) { + dbx := pg.NewTestDB(t, pg.TestURL(t)) + orm1 := NewORM(uuid.NewString(), dbx, lggr) + orm2 := NewORM(uuid.NewString(), dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID1, err := orm1.InsertFilter(ctx, filter) + require.NoError(t, err) + filterID2, err := orm2.InsertFilter(ctx, filter) + require.NoError(t, err) + require.NotEqual(t, filterID1, filterID2) + }) + t.Run("Deletes log on parent filter deletion", func(t *testing.T) { + dbx := pg.NewTestDB(t, pg.TestURL(t)) + chainID := uuid.NewString() + orm := NewORM(chainID, dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + log := newRandomLog(t, filterID, chainID) + err = orm.InsertLogs(ctx, []Log{log}) + require.NoError(t, err) + logs, err := orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 1) + err = orm.MarkFilterDeleted(ctx, filterID) + require.NoError(t, err) + // logs are expected to be present in db even if filter was marked as deleted + logs, err = orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 1) + err = orm.DeleteFilter(ctx, filterID) + require.NoError(t, err) + logs, err = orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 0) + }) + t.Run("MarkBackfilled updated corresponding filed", func(t *testing.T) { + dbx := pg.NewTestDB(t, pg.TestURL(t)) + chainID := uuid.NewString() + orm := NewORM(chainID, dbx, lggr) -func newRandomFilter(t *testing.T) Filter { - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() - return Filter{ - Name: uuid.NewString(), - Address: PublicKey(pubKey), - EventName: "event", - EventSig: []byte{1, 2, 3}, - StartingBlock: 1, - EventIDL: "{}", - SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, - Retention: 1000, - MaxLogsKept: 3, - } + filter := newRandomFilter(t) + ctx := tests.Context(t) + filter.IsBackfilled = true + filterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + ensureIsBackfilled := func(expectedIsBackfilled bool) { + filter, err = orm.GetFilterByID(ctx, filterID) + require.NoError(t, err) + require.Equal(t, expectedIsBackfilled, filter.IsBackfilled) + } + ensureIsBackfilled(true) + // insert overrides + filter.IsBackfilled = false + _, err = orm.InsertFilter(ctx, filter) + require.NoError(t, err) + ensureIsBackfilled(false) + // mark changes value to true + err = orm.MarkFilterBackfilled(ctx, filterID) + require.NoError(t, err) + ensureIsBackfilled(true) + }) } func TestLogPollerLogs(t *testing.T) { @@ -111,28 +188,46 @@ func TestLogPollerLogs(t *testing.T) { dbx := pg.NewTestDB(t, pg.TestURL(t)) orm := NewORM(chainID, dbx, lggr) - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() - ctx := tests.Context(t) // create filter as it's required for a log - filterID, err := orm.InsertFilter(ctx, Filter{ - Name: "awesome filter", - Address: PublicKey(pubKey), + filterID, err := orm.InsertFilter(ctx, newRandomFilter(t)) + require.NoError(t, err) + log := newRandomLog(t, filterID, chainID) + err = orm.InsertLogs(ctx, []Log{log}) + require.NoError(t, err) + // insert of the same Log should not produce two instances + err = orm.InsertLogs(ctx, []Log{log}) + require.NoError(t, err) + dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, dbLogs, 1) + log.ID = dbLogs[0].ID + log.CreatedAt = dbLogs[0].CreatedAt + require.Equal(t, log, dbLogs[0]) +} + +func newRandomFilter(t *testing.T) Filter { + return Filter{ + Name: uuid.NewString(), + Address: newRandomPublicKey(t), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: newRandomEventSignature(t), StartingBlock: 1, EventIDL: "{}", SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, Retention: 1000, MaxLogsKept: 3, - }) + } +} + +func newRandomLog(t *testing.T, filterID int64, chainID string) Log { + privateKey, err := solana.NewRandomPrivateKey() require.NoError(t, err) + pubKey := privateKey.PublicKey() data := []byte("solana is fun") signature, err := privateKey.Sign(data) require.NoError(t, err) - log := Log{ + return Log{ FilterID: filterID, ChainID: chainID, LogIndex: 1, @@ -140,20 +235,9 @@ func TestLogPollerLogs(t *testing.T) { BlockNumber: 10, BlockTimestamp: time.Unix(1731590113, 0), Address: PublicKey(pubKey), - EventSig: []byte{3, 2, 1}, - SubkeyValues: pq.ByteaArray([][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}), + EventSig: EventSignature{3, 2, 1}, + SubkeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, TxHash: Signature(signature), Data: data, } - err = orm.InsertLogs(ctx, []Log{log}) - require.NoError(t, err) - // insert of the same Log should not produce two instances - err = orm.InsertLogs(ctx, []Log{log}) - require.NoError(t, err) - dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig) - require.NoError(t, err) - require.Len(t, dbLogs, 1) - log.ID = dbLogs[0].ID - log.CreatedAt = dbLogs[0].CreatedAt - require.Equal(t, log, dbLogs[0]) } diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index a0dd44b52..be97a4f48 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -1,5 +1,6 @@ package logpoller var ( - logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", "expires_at", "sequence_num"} + logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", "expires_at", "sequence_num"} + filterFields = [...]string{"id", "name", "address", "event_name", "event_sig", "starting_block", "event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted", "is_backfilled"} ) diff --git a/pkg/solana/logpoller/query.go b/pkg/solana/logpoller/query.go index 36e5fb159..65a792449 100644 --- a/pkg/solana/logpoller/query.go +++ b/pkg/solana/logpoller/query.go @@ -70,7 +70,7 @@ func (q *queryArgs) withEventName(eventName string) *queryArgs { } // withEventSig sets the EventSig field in queryArgs. -func (q *queryArgs) withEventSig(eventSig []byte) *queryArgs { +func (q *queryArgs) withEventSig(eventSig EventSignature) *queryArgs { return q.withField("event_sig", eventSig) } @@ -99,7 +99,7 @@ func (q *queryArgs) withMaxLogsKept(maxLogsKept int64) *queryArgs { return q.withField("max_logs_kept", maxLogsKept) } -func newQueryArgsForEvent(chainID string, address PublicKey, eventSig []byte) *queryArgs { +func newQueryArgsForEvent(chainID string, address PublicKey, eventSig EventSignature) *queryArgs { return newQueryArgs(chainID). withAddress(address). withEventSig(eventSig) @@ -113,10 +113,19 @@ func (q *queryArgs) withEndBlock(endBlock int64) *queryArgs { return q.withField("end_block", endBlock) } +// withIsBackfilled sets the isBackfilled field in queryArgs. +func (q *queryArgs) withIsBackfilled(isBackfilled bool) *queryArgs { + return q.withField("is_backfilled", isBackfilled) +} + func logsQuery(clause string) string { return fmt.Sprintf(`SELECT %s FROM solana.logs %s`, strings.Join(logsFields[:], ", "), clause) } +func filtersQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM solana.log_poller_filters %s`, strings.Join(filterFields[:], ", "), clause) +} + func (q *queryArgs) toArgs() (map[string]any, error) { if len(q.err) > 0 { return nil, errors.Join(q.err...) diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 0581a7630..143c28898 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "slices" "github.com/gagliardetto/solana-go" ) @@ -96,3 +97,21 @@ func (p *SubkeyPaths) Scan(src interface{}) error { return nil } + +func (p SubkeyPaths) Equal(o SubkeyPaths) bool { + return slices.EqualFunc(p, o, slices.Equal) +} + +const EventSignatureLength = 8 + +type EventSignature [EventSignatureLength]byte + +// Scan implements Scanner for database/sql. +func (s *EventSignature) Scan(src interface{}) error { + return scanFixedLengthArray("EventSignature", EventSignatureLength, src, s[:]) +} + +// Value implements valuer for database/sql. +func (s EventSignature) Value() (driver.Value, error) { + return s[:], nil +} diff --git a/pkg/solana/logpoller/types_test.go b/pkg/solana/logpoller/types_test.go new file mode 100644 index 000000000..263c22bab --- /dev/null +++ b/pkg/solana/logpoller/types_test.go @@ -0,0 +1,20 @@ +package logpoller + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/require" +) + +func newRandomPublicKey(t *testing.T) PublicKey { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + return PublicKey(pubKey) +} + +func newRandomEventSignature(t *testing.T) EventSignature { + pubKey := newRandomPublicKey(t) + return EventSignature(pubKey[:8]) +}