Skip to content

Commit

Permalink
fix: let consumer handle cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
2color committed Dec 16, 2024
1 parent 84bc4f7 commit 0c28c6b
Showing 1 changed file with 15 additions and 18 deletions.
33 changes: 15 additions & 18 deletions server_cached_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
return nil, err
}

Check warning on line 64 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L63-L64

Added lines #L63 - L64 were not covered by tests

return NewCacheFallbackIter(it, r, ctx), nil
iter := NewCacheFallbackIter(it, r, ctx)

go func() {
// make sure we close the iterator when the parent context is done
<-ctx.Done()
iter.Close()
}()

return iter, nil
}

// FindPeers uses a simpler approach than FindProviders because we're dealing with a single PeerRecord, and there's
Expand Down Expand Up @@ -115,29 +123,20 @@ type cacheFallbackIter struct {
findPeersResult chan types.PeerRecord
router cachedRouter
ctx context.Context
cancel context.CancelFunc
ongoingLookups atomic.Int32
}

// NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers.
// It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call.
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter {
ctx, cancel := context.WithCancel(ctx)
iter := &cacheFallbackIter{
sourceIter: sourceIter,
router: router,
ctx: ctx,
cancel: cancel,
findPeersResult: make(chan types.PeerRecord),
ongoingLookups: atomic.Int32{},
}

// Add a goroutine to handle context cancellation
go func() {
<-ctx.Done()
iter.Close()
}()

return iter
}

Expand Down Expand Up @@ -234,12 +233,10 @@ func (it *cacheFallbackIter) dispatchFindPeer(record types.PeerRecord) {
}

func (it *cacheFallbackIter) Close() error {
it.cancel()
go func() {
for it.ongoingLookups.Load() > 0 {
time.Sleep(time.Millisecond * 100)
}
close(it.findPeersResult)
}()
return it.sourceIter.Close()
for it.ongoingLookups.Load() > 0 {
time.Sleep(time.Millisecond * 100)
}

Check warning on line 238 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L237-L238

Added lines #L237 - L238 were not covered by tests

close(it.findPeersResult)
return nil
}

0 comments on commit 0c28c6b

Please sign in to comment.