Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure only offsets are pulled after a filter is esstablished #159

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/ethereum/event_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down
7 changes: 5 additions & 2 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
lastUpdate := -1
failCount := 0
filterResetRequired := false
filterRPCMethodToUse := ""
for {
if es.c.doFailureDelay(es.ctx, failCount) {
log.L(es.ctx).Debugf("Stream loop exiting")
Expand Down Expand Up @@ -329,6 +330,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
es.uninstallFilter(&filter)
}
filterResetRequired = false
filterRPCMethodToUse = "eth_getFilterLogs" // first JSON/RPC for a new filter ID fetches all the historical logs to ensure no gaps
// Determine the earliest block we need to poll from
fromBlock := int64(-1)
for _, l := range ag.listeners {
Expand Down Expand Up @@ -362,17 +364,18 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
// Get the next batch of logs
var ethLogs []*logJSONRPC
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, "eth_getFilterLogs", filter)
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, filterRPCMethodToUse, filter)
// If we fail to query we just retry - setting filter to nil if not found
if rpcErr != nil {
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message)
filter = ""
}
log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message)
log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPCMethodToUse, rpcErr.Message)
failCount++
continue
}
filterRPCMethodToUse = "eth_getFilterChanges" // subsequent JSON/RPC calls after the initial fetch, this fetches only the new logs
// Enrich the events
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
if enrichErr != nil {
Expand Down
17 changes: 17 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,11 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
}).Once()

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
{
BlockNumber: ethtypes.NewHexInteger64(212122),
Expand All @@ -408,6 +412,9 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
},
}
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(212122),
Expand Down Expand Up @@ -464,6 +471,7 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {
}).Once().Run(func(args mock.Arguments) {
close(filtered)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -648,6 +656,9 @@ func TestStreamLoopChangeFilter(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -690,6 +701,9 @@ func TestStreamLoopFilterReset(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -747,6 +761,9 @@ func TestStreamLoopEnrichFail(t *testing.T) {
close(errorReturned)
}).
Return(&rpcbackend.RPCError{Message: "pop"}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down
Loading