From 6a47fdc412208a84a9afc4ec166d57ec3a183787 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 3 Mar 2023 09:54:33 -0500 Subject: [PATCH 1/3] Reset filter after enrichment failure Signed-off-by: Peter Broadhurst --- internal/ethereum/event_stream.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 1c237c6..02a7ad3 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -296,6 +296,7 @@ func (es *eventStream) leadGroupSteadyState() bool { lastUpdate := -1 failCount := 0 filterRPC := "" + listenerResetRequired := false for { if es.c.doFailureDelay(es.ctx, failCount) { log.L(es.ctx).Debugf("Stream loop exiting") @@ -303,7 +304,8 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Build the aggregated listener list if it has changed - listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) + listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) || listenerResetRequired + listenerResetRequired = false // No need to poll for events, if we don't have any listeners if len(ag.signatureSet) > 0 { @@ -372,6 +374,8 @@ func (es *eventStream) leadGroupSteadyState() bool { events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs) if enrichErr != nil { log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr) + // We have to reset our filter, as otherwise we'll skip past these events. + listenerResetRequired = true failCount++ continue } From 09175120ab1261fa7d140d9d9f311b85f21a015e Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 3 Mar 2023 09:57:35 -0500 Subject: [PATCH 2/3] Clear reset only when we uninstall Signed-off-by: Peter Broadhurst --- internal/ethereum/event_stream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 02a7ad3..3aa344e 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -296,7 +296,7 @@ func (es *eventStream) leadGroupSteadyState() bool { lastUpdate := -1 failCount := 0 filterRPC := "" - listenerResetRequired := false + filterResetRequired := false for { if es.c.doFailureDelay(es.ctx, failCount) { log.L(es.ctx).Debugf("Stream loop exiting") @@ -304,8 +304,7 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Build the aggregated listener list if it has changed - listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) || listenerResetRequired - listenerResetRequired = false + listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) || filterResetRequired // No need to poll for events, if we don't have any listeners if len(ag.signatureSet) > 0 { @@ -322,6 +321,7 @@ func (es *eventStream) leadGroupSteadyState() bool { // Uninstall any existing filter if filter != "" { es.uninstallFilter(&filter) + filterResetRequired = false } filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID // Determine the earliest block we need to poll from @@ -375,7 +375,7 @@ func (es *eventStream) leadGroupSteadyState() bool { if enrichErr != nil { log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr) // We have to reset our filter, as otherwise we'll skip past these events. - listenerResetRequired = true + filterResetRequired = true failCount++ continue } From 5fa9e6c3c889bff1a364657bd7855e54ed96a803 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 3 Mar 2023 09:57:49 -0500 Subject: [PATCH 3/3] Clear reset only when we uninstall Signed-off-by: Peter Broadhurst --- internal/ethereum/event_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 3aa344e..86eaa9b 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -321,8 +321,8 @@ func (es *eventStream) leadGroupSteadyState() bool { // Uninstall any existing filter if filter != "" { es.uninstallFilter(&filter) - filterResetRequired = false } + filterResetRequired = false filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID // Determine the earliest block we need to poll from fromBlock := int64(-1)