Skip to content

Commit

Permalink
Improve/context (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii authored Sep 21, 2020
1 parent 8f1019f commit 197ee21
Show file tree
Hide file tree
Showing 44 changed files with 228 additions and 168 deletions.
2 changes: 1 addition & 1 deletion flux/flux_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (fc fluxCreate) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
default:
sink = newBufferedSink(s, _buffSize)
}
s.OnSubscribe(sink)
s.OnSubscribe(ctx, sink)
fc.source(ctx, sink)
}

Expand Down
4 changes: 2 additions & 2 deletions flux/flux_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type fluxError struct {
}

func (p fluxError) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
actual := internal.NewCoreSubscriber(ctx, s)
actual.OnSubscribe(internal.EmptySubscription)
actual := internal.NewCoreSubscriber(s)
actual.OnSubscribe(ctx, internal.EmptySubscription)
actual.OnError(p.e)
}

Expand Down
2 changes: 1 addition & 1 deletion flux/flux_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type fluxInterval struct {

func (p *fluxInterval) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
su := newIntervalSubscription(s, p.period)
s.OnSubscribe(su)
s.OnSubscribe(ctx, su)
err := p.sc.Worker().Do(func() {
su.run(ctx)
})
Expand Down
4 changes: 1 addition & 3 deletions flux/flux_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/stretchr/testify/assert"
)



func TestInterval(t *testing.T) {
done := make(chan struct{})
var amount int32
Expand All @@ -25,7 +23,7 @@ func TestInterval(t *testing.T) {
atomic.AddInt32(&amount, 1)
return nil
}),
reactor.OnSubscribe(func(su reactor.Subscription) {
reactor.OnSubscribe(func(ctx context.Context, su reactor.Subscription) {
su.Request(3)
}),
)
Expand Down
2 changes: 1 addition & 1 deletion flux/flux_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type fluxRange struct {
func (r fluxRange) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
raw := internal.ExtractRawSubscriber(s)
su := newRangeSubscription(raw, r.begin, r.end)
internal.NewCoreSubscriber(ctx, raw).OnSubscribe(su)
internal.NewCoreSubscriber(raw).OnSubscribe(ctx, su)
}

type rangeSubscription struct {
Expand Down
2 changes: 1 addition & 1 deletion flux/flux_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (p *fluxSlice) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
return
}
subscription := newSliceSubscription(s, p.slice)
s.OnSubscribe(subscription)
s.OnSubscribe(ctx, subscription)
}

func newSliceFlux(values []Any) *fluxSlice {
Expand Down
12 changes: 6 additions & 6 deletions flux/flux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Example() {
}).
SubscribeOn(scheduler.Elastic()).
Subscribe(context.Background(),
reactor.OnSubscribe(func(s reactor.Subscription) {
reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
su = s
s.Request(1)
}),
Expand Down Expand Up @@ -121,7 +121,7 @@ func testDoOnSubscribe(f flux.Flux, t *testing.T) {
su.Request(1)
return nil
}
onSubscribe := func(s reactor.Subscription) {
onSubscribe := func(ctx context.Context, s reactor.Subscription) {
su = s
su.Request(1)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func testFilterRequest(f flux.Flux, t *testing.T) {
DoOnRequest(func(n int) {
requests++
}).
Subscribe(context.Background(), reactor.OnSubscribe(func(su reactor.Subscription) {
Subscribe(context.Background(), reactor.OnSubscribe(func(ctx context.Context, su reactor.Subscription) {
s = su
s.Request(1)
}))
Expand Down Expand Up @@ -235,7 +235,7 @@ func testPeek(f flux.Flux, t *testing.T) {
DoFinally(func(s reactor.SignalType) {
close(done)
}).
Subscribe(context.Background(), reactor.OnSubscribe(func(su reactor.Subscription) {
Subscribe(context.Background(), reactor.OnSubscribe(func(ctx context.Context, su reactor.Subscription) {
ss = su
ss.Request(1)
}), reactor.OnNext(func(v interface{}) error {
Expand All @@ -258,7 +258,7 @@ func testRequest(f flux.Flux, t *testing.T) {
close(done)
}).
SubscribeOn(scheduler.Elastic()).
Subscribe(context.Background(), reactor.OnSubscribe(func(s reactor.Subscription) {
Subscribe(context.Background(), reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
su = s
su.Request(1)
}), reactor.OnNext(func(v Any) error {
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestCreateWithRequest(t *testing.T) {
fmt.Println("next:", v)
processed++
return nil
}), reactor.OnSubscribe(func(s reactor.Subscription) {
}), reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
su <- s
s.Request(1)
}), reactor.OnComplete(func() {
Expand Down
2 changes: 1 addition & 1 deletion flux/op_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (p *fluxContext) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
for i := 0; i < len(p.kv); i += 2 {
ctx = context.WithValue(ctx, p.kv[i], p.kv[i+1])
}
p.source.SubscribeWith(ctx, internal.NewCoreSubscriber(ctx, s))
p.source.SubscribeWith(ctx, internal.NewCoreSubscriber(s))
}

type fluxContextOption func(*fluxContext)
Expand Down
6 changes: 3 additions & 3 deletions flux/op_delay_element.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func (d *delayElementSubscriber) OnNext(v Any) {
d.actual.OnNext(v)
}

func (d *delayElementSubscriber) OnSubscribe(s reactor.Subscription) {
func (d *delayElementSubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {
d.s = s
d.actual.OnSubscribe(d)
d.actual.OnSubscribe(ctx, d)
s.Request(reactor.RequestInfinite)
}

Expand All @@ -73,7 +73,7 @@ type fluxDelayElement struct {

func (f *fluxDelayElement) SubscribeWith(ctx context.Context, actual reactor.Subscriber) {
actual = internal.ExtractRawSubscriber(actual)
actual = internal.NewCoreSubscriber(ctx, newDelayElementSubscriber(actual, f.delay, f.sc))
actual = internal.NewCoreSubscriber(newDelayElementSubscriber(actual, f.delay, f.sc))
f.source.SubscribeWith(ctx, actual)
}

Expand Down
12 changes: 6 additions & 6 deletions flux/op_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ func (p *filterSubscriber) OnNext(v Any) {
internal.TryDiscard(p.ctx, v)
}

func (p *filterSubscriber) OnSubscribe(su reactor.Subscription) {
func (p *filterSubscriber) OnSubscribe(ctx context.Context, su reactor.Subscription) {
p.ctx = ctx
p.su = su
p.actual.OnSubscribe(su)
p.actual.OnSubscribe(ctx, su)
}

func newFilterSubscriber(ctx context.Context, s reactor.Subscriber, p reactor.Predicate) *filterSubscriber {
func newFilterSubscriber(s reactor.Subscriber, p reactor.Predicate) *filterSubscriber {
return &filterSubscriber{
ctx: ctx,
actual: s,
f: p,
}
Expand All @@ -57,10 +57,10 @@ type fluxFilter struct {
func (f *fluxFilter) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
var actual reactor.Subscriber
if cs, ok := s.(*internal.CoreSubscriber); ok {
cs.Subscriber = newFilterSubscriber(ctx, cs.Subscriber, f.predicate)
cs.Subscriber = newFilterSubscriber(cs.Subscriber, f.predicate)
actual = cs
} else {
actual = internal.NewCoreSubscriber(ctx, newFilterSubscriber(ctx, s, f.predicate))
actual = internal.NewCoreSubscriber(newFilterSubscriber(s, f.predicate))
}
f.source.SubscribeWith(ctx, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion flux/op_finally.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type fluxFinally struct {

func (p *fluxFinally) SubscribeWith(ctx context.Context, actual reactor.Subscriber) {
actual = internal.ExtractRawSubscriber(actual)
actual = internal.NewCoreSubscriber(ctx, subscribers.NewDoFinallySubscriber(actual, p.onFinally))
actual = internal.NewCoreSubscriber(subscribers.NewDoFinallySubscriber(actual, p.onFinally))
p.source.SubscribeWith(ctx, actual)
}

Expand Down
4 changes: 2 additions & 2 deletions flux/op_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (s mapSubscriber) OnNext(i Any) {
}
}

func (s mapSubscriber) OnSubscribe(subscription reactor.Subscription) {
s.source.OnSubscribe(subscription)
func (s mapSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
s.source.OnSubscribe(ctx, subscription)
}

func newMapSubscriber(s reactor.Subscriber, mapper reactor.Transformer) reactor.Subscriber {
Expand Down
8 changes: 4 additions & 4 deletions flux/op_peek.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ func (p *peekSubscriber) OnNext(v Any) {
p.actual.OnNext(v)
}

func (p *peekSubscriber) OnSubscribe(s reactor.Subscription) {
func (p *peekSubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {
if p.s != nil {
panic(internal.ErrCallOnSubscribeDuplicated)
}
p.s = s
if call := p.parent.onSubscribeCall; call != nil {
call(p)
call(ctx, p)
}
p.actual.OnSubscribe(p)
p.actual.OnSubscribe(ctx, p)
}

func newPeekSubscriber(peek *fluxPeek, actual reactor.Subscriber) *peekSubscriber {
Expand All @@ -91,7 +91,7 @@ type fluxPeek struct {

func (fp *fluxPeek) SubscribeWith(ctx context.Context, actual reactor.Subscriber) {
actual = internal.ExtractRawSubscriber(actual)
actual = internal.NewCoreSubscriber(ctx, newPeekSubscriber(fp, actual))
actual = internal.NewCoreSubscriber(newPeekSubscriber(fp, actual))
fp.source.SubscribeWith(ctx, actual)
}

Expand Down
8 changes: 4 additions & 4 deletions flux/op_switch_on_first.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *switchOnFirstInner) SubscribeWith(ctx context.Context, actual reactor.S
panic(errSubscribeOnce)
}
p.inner = actual
actual.OnSubscribe(p)
actual.OnSubscribe(ctx, p)
}

func (p *switchOnFirstInner) Request(n int) {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (p *switchOnFirstInner) OnNext(v Any) {
i.OnNext(v)
}

func (p *switchOnFirstInner) OnSubscribe(s reactor.Subscription) {
func (p *switchOnFirstInner) OnSubscribe(ctx context.Context, s reactor.Subscription) {
p.s = s
s.Request(1)
}
Expand Down Expand Up @@ -148,8 +148,8 @@ func (p *switchOnFirstInnerSubscriber) OnNext(t Any) {
p.inner.OnNext(t)
}

func (p *switchOnFirstInnerSubscriber) OnSubscribe(s reactor.Subscription) {
p.inner.OnSubscribe(s)
func (p *switchOnFirstInnerSubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {
p.inner.OnSubscribe(ctx, s)
}

func newFluxSwitchOnFirst(source reactor.RawPublisher, transformer FnSwitchOnFirst) *fluxSwitchOnFirst {
Expand Down
6 changes: 3 additions & 3 deletions flux/op_take.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type fluxTake struct {
func (ft *fluxTake) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
actual := internal.ExtractRawSubscriber(s)
take := newTakeSubscriber(actual, int64(ft.n))
ft.source.SubscribeWith(ctx, internal.NewCoreSubscriber(ctx, take))
ft.source.SubscribeWith(ctx, internal.NewCoreSubscriber(take))
}

type takeSubscriber struct {
Expand Down Expand Up @@ -50,13 +50,13 @@ func (ts *takeSubscriber) OnNext(v Any) {
ts.OnComplete()
}

func (ts *takeSubscriber) OnSubscribe(su reactor.Subscription) {
func (ts *takeSubscriber) OnSubscribe(ctx context.Context, su reactor.Subscription) {
if atomic.LoadInt64(&ts.remaining) < 1 {
su.Cancel()
return
}
ts.su = su
ts.actual.OnSubscribe(su)
ts.actual.OnSubscribe(ctx, su)
}

func (ts *takeSubscriber) OnComplete() {
Expand Down
6 changes: 3 additions & 3 deletions flux/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (up *unicastProcessor) OnNext(v Any) {
up.drain()
}

func (up *unicastProcessor) OnSubscribe(su reactor.Subscription) {
up.actual.OnSubscribe(su)
func (up *unicastProcessor) OnSubscribe(ctx context.Context, su reactor.Subscription) {
up.actual.OnSubscribe(ctx, su)
}

func (up *unicastProcessor) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
Expand All @@ -102,7 +102,7 @@ func (up *unicastProcessor) SubscribeWith(ctx context.Context, s reactor.Subscri
up.cond.L.Unlock()

defer close(up.subscribed)
raw.OnSubscribe(up)
raw.OnSubscribe(ctx, up)
}

func (up *unicastProcessor) Complete() {
Expand Down
2 changes: 1 addition & 1 deletion flux/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestUnicastProcessor(t *testing.T) {
fmt.Println("complete")
close(results)
}).
Subscribe(context.Background(), reactor.OnSubscribe(func(s reactor.Subscription) {
Subscribe(context.Background(), reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
su = s
su.Request(1)
}))
Expand Down
2 changes: 1 addition & 1 deletion flux/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (w wrapper) BlockFirst(ctx context.Context) (first Any, err error) {
first = v
su.Cancel()
return nil
}), reactor.OnSubscribe(func(s reactor.Subscription) {
}), reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
su = s
su.Request(1)
}), reactor.OnError(func(e error) {
Expand Down
42 changes: 35 additions & 7 deletions internal/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,63 @@ import (

type CoreSubscriber struct {
reactor.Subscriber
ctx context.Context
done chan struct{}
}

func (p *CoreSubscriber) OnSubscribe(su reactor.Subscription) {
func (p *CoreSubscriber) OnSubscribe(ctx context.Context, su reactor.Subscription) {
if ctx != context.Background() && ctx != context.TODO() {
go func() {
select {
case <-p.done:
case <-ctx.Done():
p.OnError(reactor.ErrSubscribeCancelled)
}
}()
}

select {
case <-p.ctx.Done():
case <-ctx.Done():
p.Subscriber.OnError(reactor.ErrSubscribeCancelled)
default:
p.Subscriber.OnSubscribe(su)
p.Subscriber.OnSubscribe(ctx, su)
}
}

func (p *CoreSubscriber) OnComplete() {
select {
case <-p.done:
default:
close(p.done)
p.Subscriber.OnComplete()
}
}

func (p *CoreSubscriber) OnError(err error) {
select {
case <-p.done:
default:
close(p.done)
p.Subscriber.OnError(err)
}
}

func (p *CoreSubscriber) OnNext(v reactor.Any) {
select {
case <-p.ctx.Done():
case <-p.done:
hooks.Global().OnNextDrop(v)
p.Subscriber.OnError(reactor.ErrSubscribeCancelled)
default:
p.Subscriber.OnNext(v)
}
}

func NewCoreSubscriber(ctx context.Context, actual reactor.Subscriber) *CoreSubscriber {
func NewCoreSubscriber(actual reactor.Subscriber) *CoreSubscriber {
if cs, ok := actual.(*CoreSubscriber); ok {
return cs
}
return &CoreSubscriber{
Subscriber: actual,
ctx: ctx,
done: make(chan struct{}),
}
}

Expand Down
Loading

0 comments on commit 197ee21

Please sign in to comment.