Skip to content

Commit

Permalink
Merge pull request #58 from hyperledger/enrich-reset
Browse files Browse the repository at this point in the history
Reset filter after enrichment failure
  • Loading branch information
peterbroadhurst authored Mar 3, 2023
2 parents a9db899 + 5fa9e6c commit b1cd38d
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,15 @@ func (es *eventStream) leadGroupSteadyState() bool {
lastUpdate := -1
failCount := 0
filterRPC := ""
filterResetRequired := false
for {
if es.c.doFailureDelay(es.ctx, failCount) {
log.L(es.ctx).Debugf("Stream loop exiting")
return true
}

// Build the aggregated listener list if it has changed
listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag)
listenerChanged := es.buildReuseLeadGroupListener(&lastUpdate, &ag) || filterResetRequired

// No need to poll for events, if we don't have any listeners
if len(ag.signatureSet) > 0 {
Expand All @@ -321,6 +322,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
if filter != "" {
es.uninstallFilter(&filter)
}
filterResetRequired = false
filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID
// Determine the earliest block we need to poll from
fromBlock := int64(-1)
Expand Down Expand Up @@ -372,6 +374,8 @@ func (es *eventStream) leadGroupSteadyState() bool {
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
if enrichErr != nil {
log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr)
// We have to reset our filter, as otherwise we'll skip past these events.
filterResetRequired = true
failCount++
continue
}
Expand Down

0 comments on commit b1cd38d

Please sign in to comment.