Skip to content

Commit

Permalink
fix: deadlock when using switch_if_error (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii authored Jul 19, 2022
1 parent a7bd3e9 commit ac1270f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 49 deletions.
5 changes: 3 additions & 2 deletions hc/hc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hc"
"github.com/jjeffcaii/reactor-go/mono"
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/jjeffcaii/reactor-go/tuple"
"github.com/stretchr/testify/assert"
)

var httpBinUrl = "https://httpbin.org/anything"
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestClient_Do(t *testing.T) {
}

func TestDo_Failed(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1/not-exists-path", nil)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:8080/not-exists-path", nil)
_, err := hc.Do(req).Block(context.Background())
assert.Error(t, err)
}
39 changes: 6 additions & 33 deletions mono/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,14 @@ package mono

import (
"context"
"math"
"sync"
"sync/atomic"

"github.com/pkg/errors"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
"github.com/pkg/errors"
)

var globalSinkPool sinkPool

type sinkPool struct {
inner sync.Pool
}

func (p *sinkPool) get() *sink {
if exist, _ := p.inner.Get().(*sink); exist != nil {
atomic.StoreInt32(&exist.stat, 0)
return exist
}
return &sink{}
}

func (p *sinkPool) put(s *sink) {
if s == nil {
return
}
atomic.StoreInt32(&s.stat, math.MinInt32)
s.actual = nil
p.inner.Put(s)
}

type Sink interface {
Success(Any)
Error(error)
Expand Down Expand Up @@ -84,23 +60,21 @@ func (s *sink) Success(v Any) {
s.Complete()
}

func (s *sink) Request(n int) {
func (s *sink) Request(_ int) {
// ignore
}

func (s *sink) Cancel() {
if !atomic.CompareAndSwapInt32(&s.stat, 0, statCancel) {
return
}
defer globalSinkPool.put(s)
s.actual.OnError(reactor.ErrSubscribeCancelled)
}

func (s *sink) Complete() {
if !atomic.CompareAndSwapInt32(&s.stat, 0, statComplete) {
return
}
defer globalSinkPool.put(s)
s.actual.OnComplete()
}

Expand All @@ -109,7 +83,6 @@ func (s *sink) Error(err error) {
hooks.Global().OnErrorDrop(err)
return
}
defer globalSinkPool.put(s)
s.actual.OnError(err)
}

Expand All @@ -129,8 +102,8 @@ func (s *sink) Next(v Any) {
}

func (m monoCreate) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
sk := globalSinkPool.get()
var sk sink
sk.actual = s
s.OnSubscribe(ctx, sk)
m.sinker(ctx, sk)
s.OnSubscribe(ctx, &sk)
m.sinker(ctx, &sk)
}
8 changes: 1 addition & 7 deletions mono/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"testing"

"github.com/golang/mock/gomock"

"github.com/jjeffcaii/reactor-go"
"github.com/stretchr/testify/assert"
)

func TestMonoCreate_SubscribeWith(t *testing.T) {
Expand Down Expand Up @@ -97,9 +97,3 @@ func TestMonoCreate_Cancel(t *testing.T) {
s.Success(1)
}).SubscribeWith(context.Background(), s)
}

func TestSinkPool_PutWithNilValue(t *testing.T) {
assert.NotPanics(t, func() {
globalSinkPool.put(nil)
})
}
18 changes: 11 additions & 7 deletions mono/peek.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"sync/atomic"

"github.com/pkg/errors"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal"
"github.com/pkg/errors"
)

type monoPeek struct {
Expand All @@ -20,10 +21,11 @@ type monoPeek struct {
}

type peekSubscriber struct {
actual reactor.Subscriber
parent *monoPeek
s reactor.Subscription
stat int32
actual reactor.Subscriber
parent *monoPeek
s reactor.Subscription
stat int32
cancelled int32
}

func newMonoPeek(source reactor.RawPublisher, first monoPeekOption, others ...monoPeekOption) *monoPeek {
Expand Down Expand Up @@ -52,7 +54,7 @@ func (p *peekSubscriber) Request(n int) {
}

func (p *peekSubscriber) Cancel() {
if !atomic.CompareAndSwapInt32(&p.stat, 0, statCancel) {
if !atomic.CompareAndSwapInt32(&p.cancelled, 0, 1) {
return
}
if call := p.parent.onCancelCall; call != nil {
Expand All @@ -73,7 +75,9 @@ func (p *peekSubscriber) OnComplete() {

func (p *peekSubscriber) OnError(err error) {
if !atomic.CompareAndSwapInt32(&p.stat, 0, statError) {
return
if isCancelled := atomic.LoadInt32(&p.stat) == statCancel && err == reactor.ErrSubscribeCancelled; !isCancelled {
return
}
}

defer func() {
Expand Down

0 comments on commit ac1270f

Please sign in to comment.