Skip to content

Commit

Permalink
fix: race problem in timeout mono (#42)
Browse files Browse the repository at this point in the history
* fix: race problem in timeout mono

* add ut

* fix ut

* set codecov

(cherry picked from commit a7bd3e9)
  • Loading branch information
jjeffcaii committed Jun 28, 2022
1 parent fe24342 commit a746d47
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage:
range: 70..100
round: down
precision: 2
58 changes: 32 additions & 26 deletions mono/mono_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mono

import (
"context"
"math"
"sync/atomic"
"time"

"github.com/jjeffcaii/reactor-go"
Expand All @@ -13,6 +15,21 @@ type monoTimeout struct {
timeout time.Duration
}

func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout {
return &monoTimeout{
source: source,
timeout: timeout,
}
}

func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.source.SubscribeWith(ctx, &timeoutSubscriber{
actual: s,
timeout: m.timeout,
done: make(chan struct{}),
})
}

func (m *monoTimeout) Parent() reactor.RawPublisher {
return m.source
}
Expand All @@ -21,33 +38,37 @@ type timeoutSubscriber struct {
actual reactor.Subscriber
timeout time.Duration
done chan struct{}
closed int32
}

func (t *timeoutSubscriber) OnComplete() {
select {
case <-t.done:
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, math.MaxInt32) || atomic.CompareAndSwapInt32(&t.closed, 1, math.MaxInt32) {
close(t.done)
t.actual.OnComplete()
}
}

func (t *timeoutSubscriber) OnError(err error) {
select {
case <-t.done:
hooks.Global().OnErrorDrop(err)
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, -1) {
close(t.done)
t.actual.OnError(err)
return
}

// item is emitted before error reach, should be processed as completed.
if atomic.CompareAndSwapInt32(&t.closed, 1, -1) {
close(t.done)
t.actual.OnComplete()
}

hooks.Global().OnErrorDrop(err)
}

func (t *timeoutSubscriber) OnNext(any reactor.Any) {
select {
case <-t.done:
hooks.Global().OnNextDrop(any)
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
t.actual.OnNext(any)
} else {
hooks.Global().OnNextDrop(any)
}
}

Expand All @@ -63,18 +84,3 @@ func (t *timeoutSubscriber) OnSubscribe(ctx context.Context, subscription reacto
}()
t.actual.OnSubscribe(ctx, subscription)
}

func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.source.SubscribeWith(ctx, &timeoutSubscriber{
actual: s,
timeout: m.timeout,
done: make(chan struct{}),
})
}

func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout {
return &monoTimeout{
source: source,
timeout: timeout,
}
}

0 comments on commit a746d47

Please sign in to comment.