diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 1c237c6..86eaa9b 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -296,6 +296,7 @@ func (es *eventStream) leadGroupSteadyState() bool { lastUpdate := -1 failCount := 0 filterRPC := "" + filterResetRequired := false for { if es.c.doFailureDelay(es.ctx, failCount) { log.L(es.ctx).Debugf("Stream loop exiting") @@ -303,7 +304,7 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Build the aggregated listener list if it has changed - listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) + listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) || filterResetRequired // No need to poll for events, if we don't have any listeners if len(ag.signatureSet) > 0 { @@ -321,6 +322,7 @@ func (es *eventStream) leadGroupSteadyState() bool { if filter != "" { es.uninstallFilter(&filter) } + filterResetRequired = false filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID // Determine the earliest block we need to poll from fromBlock := int64(-1) @@ -372,6 +374,8 @@ func (es *eventStream) leadGroupSteadyState() bool { events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs) if enrichErr != nil { log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr) + // We have to reset our filter, as otherwise we'll skip past these events. + filterResetRequired = true failCount++ continue }