Skip to content

Commit

Permalink
chore: cleanup error handling in compparallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jul 19, 2023
1 parent 7a4be42 commit 88c9d0d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 34 deletions.
72 changes: 40 additions & 32 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/Jorropo/jsync"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -258,55 +257,64 @@ func executeParallel(
routers []*ParallelRouter,
f func(context.Context, routing.Routing,
) error) error {
errCh := make(chan error)
var ready sync.RWMutex
ready.Lock()
var errorsLk sync.Mutex
var errors []error
fwg := jsync.NewFWaitGroup(func() {
close(errCh)
ready.Unlock()
log.Debug("executeParallel: finished executing all routers ", len(routers))
}, uint64(len(routers)))
for _, r := range routers {
go func(r *ParallelRouter) {
defer fwg.Done()
log.Debug("executeParallel: starting execution for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if !r.IgnoreError {
log.Debug("executeParallel: stopping execution on router on context done for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
errCh <- ctx.Err()
}
case <-tim.C:
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()

log.Debug("executeParallel: calling router function for router ", r.Router,
if err := func() error {
log.Debug("executeParallel: starting execution for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
err := f(ctx, r.Router)
if err != nil &&
!r.IgnoreError {
log.Debug("executeParallel: error calling router function for router ", r.Router,
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if !r.IgnoreError {
log.Debug("executeParallel: stopping execution on router on context done for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
)
return ctx.Err()
}
case <-tim.C:
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()

log.Debug("executeParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" with error ", err,
)
errCh <- err
if err := f(ctx, r.Router); err != nil &&
!r.IgnoreError {
log.Debug("executeParallel: error calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" with error ", err,
)
return err
}
}

return nil
}(); err != nil {
errorsLk.Lock()
defer errorsLk.Unlock()
errors = append(errors, err)
}
}(r)
}

var errOut error
for err := range errCh {
errOut = multierror.Append(errOut, err)
}
ready.Lock()
errOut := multierr.Combine(errors...)

if errOut != nil {
log.Debug("executeParallel: finished executing all routers with error: ", errOut)
Expand Down
4 changes: 2 additions & 2 deletions compparallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestComposableParallelFixtures(t *testing.T) {
{key: "a", value: "av", searchValCount: 2},
},
PutValue: []putValueFixture{
{err: errors.New("2 errors occurred:\n\t* a\n\t* a\n\n"), key: "/error/a", value: "a"},
{err: errors.New("a; a"), key: "/error/a", value: "a"},
{key: "a", value: "a"},
},
Provide: []provideFixture{{
err: errors.New("2 errors occurred:\n\t* routing: operation or key not supported\n\t* routing: operation or key not supported\n\n"),
err: errors.New("routing: operation or key not supported; routing: operation or key not supported"),
}},
FindPeer: []findPeerFixture{{peerID: "pid1"}, {peerID: "pid3"}},
SearchValue: []searchValueFixture{{key: "a", vals: []string{"a", "a"}}},
Expand Down

0 comments on commit 88c9d0d

Please sign in to comment.