Skip to content

Commit

Permalink
feat: add DoNotWaitForStreamingResponses to ParallelRouter
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jul 19, 2023
1 parent 88c9d0d commit 4c7bd7c
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 70 deletions.
9 changes: 5 additions & 4 deletions compconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
)

type ParallelRouter struct {
Timeout time.Duration
IgnoreError bool
Router routing.Routing
ExecuteAfter time.Duration
Timeout time.Duration
Router routing.Routing
ExecuteAfter time.Duration
DoNotWaitForStreamingResponses bool
IgnoreError bool
}

type SequentialRouter struct {
Expand Down
122 changes: 61 additions & 61 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,16 @@ func getValueOrErrorParallel[T any](
func executeParallel(
ctx context.Context,
routers []*ParallelRouter,
f func(context.Context, routing.Routing,
) error) error {
var ready sync.RWMutex
ready.Lock()
var errorsLk sync.Mutex
var errors []error
fwg := jsync.NewFWaitGroup(func() {
ready.Unlock()
log.Debug("executeParallel: finished executing all routers ", len(routers))
}, uint64(len(routers)))
f func(context.Context, routing.Routing) error,
) error {
var errsLk sync.Mutex
var errs []error
var wg sync.WaitGroup
wg.Add(len(routers))

for _, r := range routers {
go func(r *ParallelRouter) {
defer fwg.Done()
go func(r *ParallelRouter, ctx context.Context) {
defer wg.Done()

if err := func() error {
log.Debug("executeParallel: starting execution for router ", r.Router,
Expand All @@ -293,8 +290,7 @@ func executeParallel(
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
if err := f(ctx, r.Router); err != nil &&
!r.IgnoreError {
if err := f(ctx, r.Router); err != nil && !r.IgnoreError {
log.Debug("executeParallel: error calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
Expand All @@ -306,15 +302,15 @@ func executeParallel(

return nil
}(); err != nil {
errorsLk.Lock()
defer errorsLk.Unlock()
errors = append(errors, err)
errsLk.Lock()
defer errsLk.Unlock()
errs = append(errs, err)
}
}(r)
}(r, ctx)
}

ready.Lock()
errOut := multierr.Combine(errors...)
wg.Wait()
errOut := multierr.Combine(errs...)

if errOut != nil {
log.Debug("executeParallel: finished executing all routers with error: ", errOut)
Expand All @@ -329,7 +325,6 @@ func getChannelOrErrorParallel[T any](
f func(context.Context, routing.Routing) (<-chan T, error),
shouldStop func() bool,
) (chan T, error) {

var ready sync.Mutex
ready.Lock()
var outCh chan T
Expand All @@ -346,27 +341,39 @@ func getChannelOrErrorParallel[T any](
}

cancelAll()
log.Debug("getChannelOrErrorParallel: finished executing all routers ", len(routers))
}, uint64(len(routers)))

var blocking atomic.Uint64
blocking.Add(1) // start at one so we don't cancel while dispatching
var sent atomic.Bool

for _, r := range routers {
if !r.DoNotWaitForStreamingResponses {
blocking.Add(1)
}

go func(r *ParallelRouter) {
defer fwg.Done()
defer func() {
var remainingBlockers uint64
if r.DoNotWaitForStreamingResponses {
remainingBlockers = blocking.Load()
} else {
var minusOne uint64
minusOne--
remainingBlockers = blocking.Add(minusOne)
}

log.Debug("getChannelOrErrorParallel: starting execution for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
if remainingBlockers == 0 && sent.Load() {
cancelAll()
}
}()

if r.ExecuteAfter != 0 {
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
return
case <-tim.C:
// ready
Expand All @@ -376,10 +383,6 @@ func getChannelOrErrorParallel[T any](
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()

log.Debug("getChannelOrErrorParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
valueChan, err := f(ctx, r.Router)
if err != nil {
if r.IgnoreError {
Expand All @@ -396,48 +399,33 @@ func getChannelOrErrorParallel[T any](
return
}

errorsLk.Lock()
if outCh == nil {
outCh = make(chan T)
errors = nil
ready.Unlock()
}
errorsLk.Unlock()

for {
for first := true; true; first = false {
select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside loop for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
return
case val, ok := <-valueChan:
log.Debug("getChannelOrErrorParallel: getting channel value for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
if !ok {
return
}

if first {
errorsLk.Lock()
if outCh == nil {
outCh = make(chan T)
errors = nil
ready.Unlock()
}
errorsLk.Unlock()
}

select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
return
case outCh <- val:
sent.Store(true)
}

if shouldStop() {
log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
cancelAll()
return
}
Expand All @@ -446,9 +434,21 @@ func getChannelOrErrorParallel[T any](
}(r)
}

// remove the dispatch count and check if we should cancel
var minusOne uint64
minusOne--
if blocking.Add(minusOne) == 0 && sent.Load() {
cancelAll()
}

ready.Lock()
if outCh != nil {
return outCh, nil
} else if len(errors) == 0 {
// found nothing
ch := make(chan T)
close(ch)
return ch, nil
} else {
return nil, multierr.Combine(errors...)
}
Expand Down
42 changes: 42 additions & 0 deletions compparallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,48 @@ func TestComposableParallelFixtures(t *testing.T) {
FindPeer: []findPeerFixture{{peerID: "pid1"}, {peerID: "pid3"}},
SearchValue: []searchValueFixture{{key: "a", vals: []string{"a", "a"}}},
},
{
Name: "simple two routers, one with delay, plus a third nothing DoNotWaitForStreamingResponses router",
routers: []*ParallelRouter{
{
Timeout: time.Second,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a", "b", "c"}, []string{"av", "bv", "cv"}),
PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}),
ContentRouting: Null{},
},
},
{
Timeout: time.Minute,
IgnoreError: false,
ExecuteAfter: time.Second,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a", "d"}, []string{"av2", "dv"}),
PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}),
ContentRouting: Null{},
},
},
{
DoNotWaitForStreamingResponses: true,
IgnoreError: true,
Router: nothing{},
},
},
GetValue: []getValueFixture{
{key: "d", value: "dv", searchValCount: 1},
{key: "a", value: "av", searchValCount: 2},
},
PutValue: []putValueFixture{
{err: errors.New("a; a"), key: "/error/a", value: "a"},
{key: "a", value: "a"},
},
Provide: []provideFixture{{
err: errors.New("routing: operation or key not supported; routing: operation or key not supported"),
}},
FindPeer: []findPeerFixture{{peerID: "pid1"}, {peerID: "pid3"}},
SearchValue: []searchValueFixture{{key: "a", vals: []string{"a", "a"}}},
},
{
Name: "two routers with ignore errors",
routers: []*ParallelRouter{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/libp2p/go-libp2p-routing-helpers
go 1.19

require (
github.com/Jorropo/jsync v1.0.1
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-cid v0.3.0
Expand All @@ -11,10 +12,10 @@ require (
github.com/libp2p/go-libp2p-record v0.2.0
github.com/multiformats/go-multihash v0.2.1
github.com/stretchr/testify v1.8.0
go.uber.org/multierr v1.8.0
)

require (
github.com/Jorropo/jsync v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -37,7 +38,6 @@ require (
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.22.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Jorropo/jsync v1.0.0 h1:wW7QnToqINqxboAkihn9RsH1KKr6bxEbG1DUgO3Z8Bg=
github.com/Jorropo/jsync v1.0.0/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ=
github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU=
github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -49,7 +49,6 @@ github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peK
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
Expand Down
35 changes: 35 additions & 0 deletions nothing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package routinghelpers

import (
"context"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

// nothing is like [Null] but it never reach quorum for streaming responses.
type nothing struct {
Null
}

// SearchValue always returns ErrNotFound
func (nr nothing) SearchValue(ctx context.Context, _ string, _ ...routing.Option) (<-chan []byte, error) {
return makeChannelThatDoNothingAndIsClosedOnCancel[[]byte](ctx), nil
}

// FindProvidersAsync always returns a closed channel
func (nr nothing) FindProvidersAsync(ctx context.Context, _ cid.Cid, _ int) <-chan peer.AddrInfo {
return makeChannelThatDoNothingAndIsClosedOnCancel[peer.AddrInfo](ctx)
}

func makeChannelThatDoNothingAndIsClosedOnCancel[T any](ctx context.Context) <-chan T {
ch := make(chan T)
go func() {
<-ctx.Done()
close(ch)
}()
return ch
}

var _ routing.Routing = nothing{}

0 comments on commit 4c7bd7c

Please sign in to comment.