Skip to content

Commit

Permalink
add oneshot mono api. (#11)
Browse files Browse the repository at this point in the history
* add oneshot mono api.
  • Loading branch information
jjeffcaii authored Oct 9, 2020
1 parent fde7184 commit 0c66f30
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 80 deletions.
48 changes: 48 additions & 0 deletions mono/mono_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mono_test

import (
"context"
"testing"

"github.com/jjeffcaii/reactor-go/mono"
)

func BenchmarkJust(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mono.Just(1).Subscribe(context.Background())
}
})
}

func BenchmarkCreate(b *testing.B) {
gen := func(i context.Context, sink mono.Sink) {
sink.Success(1)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mono.Create(gen).Subscribe(context.Background())
}
})
}

func BenchmarkJustOneshot(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mono.JustOneshot(1).Subscribe(context.Background())
}
})
}

func BenchmarkCreateOneshot(b *testing.B) {
gen := func(i context.Context, sink mono.Sink) {
sink.Success(1)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mono.CreateOneshot(gen).Subscribe(context.Background())
}
})
}
8 changes: 4 additions & 4 deletions mono/mono_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type monoError struct {
e error
inner error
}

func (p monoError) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
func (e monoError) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
s.OnSubscribe(ctx, internal.EmptySubscription)
s.OnError(p.e)
s.OnError(e.inner)
}

func newMonoError(e error) monoError {
return monoError{e: e}
return monoError{inner: e}
}
168 changes: 125 additions & 43 deletions mono/mono_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,49 +290,6 @@ func testContextDone(m mono.Mono, t *testing.T) {
assert.Error(t, err, "should catch error")
}

func BenchmarkNative(b *testing.B) {
var sum int64
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var v Any = int64(1)
atomic.AddInt64(&sum, v.(int64))
}
})
}

func BenchmarkJust(b *testing.B) {
var sum int64
m := mono.Just(int64(1))
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
s := reactor.NewSubscriber(reactor.OnNext(func(v Any) error {
atomic.AddInt64(&sum, v.(int64))
return nil
}))
for pb.Next() {
m.SubscribeWith(context.Background(), s)
}
})
}

func BenchmarkCreate(b *testing.B) {
var sum int64
m := mono.Create(func(i context.Context, sink mono.Sink) {
sink.Success(int64(1))
})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
s := reactor.NewSubscriber(reactor.OnNext(func(v Any) error {
atomic.AddInt64(&sum, v.(int64))
return nil
}))
for pb.Next() {
m.SubscribeWith(context.Background(), s)
}
})
}

func TestError(t *testing.T) {
mockErr := errors.New("this is a mock error")
var sig reactor.SignalType
Expand Down Expand Up @@ -438,3 +395,128 @@ func TestBlock(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, v)
}

func TestOneshot(t *testing.T) {
for _, m := range []mono.Mono{
mono.JustOneshot(1),
mono.CreateOneshot(func(ctx context.Context, s mono.Sink) {
s.Success(1)
}),
} {
result, err := m.
Map(func(any reactor.Any) (reactor.Any, error) {
return any.(int) * 2, nil
}).
DoOnNext(func(v reactor.Any) error {
assert.Equal(t, 2, v.(int))
return nil
}).
DoOnError(func(e error) {
assert.FailNow(t, "unreachable")
}).
Block(context.Background())
assert.NoError(t, err)
assert.Equal(t, 2, result)
}

n, err := mono.JustOneshot(1).
FlatMap(func(any reactor.Any) mono.Mono {
return mono.Just(any.(int) * 3)
}).
Block(context.Background())
assert.NoError(t, err)
assert.Equal(t, 3, n)

discardCalls := new(int32)

mono.
CreateOneshot(func(ctx context.Context, s mono.Sink) {
s.Success(111)
}).
Filter(func(any reactor.Any) bool {
return any.(int) > 222
}).
DoOnDiscard(func(v reactor.Any) {
assert.Equal(t, 111, v)
atomic.AddInt32(discardCalls, 1)
}).
SwitchIfEmpty(mono.Just(333)).
DoOnNext(func(v reactor.Any) error {
assert.Equal(t, 333, v)
return nil
}).
Subscribe(context.Background())

assert.Equal(t, int32(1), atomic.LoadInt32(discardCalls))

_, err = mono.
CreateOneshot(func(ctx context.Context, s mono.Sink) {
time.Sleep(100 * time.Millisecond)
s.Success(1)
}).
Timeout(50 * time.Millisecond).
Block(context.Background())
assert.Error(t, err)
assert.True(t, reactor.IsCancelledError(err))

done := make(chan struct{})
now := time.Now()
mono.Just(1).
DoFinally(func(s reactor.SignalType) {
close(done)
}).
DelayElement(100*time.Millisecond).
Subscribe(context.Background(), reactor.OnNext(func(v reactor.Any) error {
assert.True(t, time.Since(now)/1e6 >= 100)
return nil
}))
<-done
}

func TestErrorOneshot(t *testing.T) {
fakeErr := errors.New("fake error")
finallyCalls := new(int32)
subscribeCalls := new(int32)
_, err := mono.
ErrorOneshot(fakeErr).
DoFinally(func(s reactor.SignalType) {
atomic.AddInt32(finallyCalls, 1)
}).
DoOnSubscribe(func(ctx context.Context, su reactor.Subscription) {
atomic.AddInt32(subscribeCalls, 1)
}).
DoOnCancel(func() {
assert.FailNow(t, "unreachable")
}).
DoOnNext(func(v reactor.Any) error {
assert.FailNow(t, "unreachable")
return nil
}).
DoOnError(func(e error) {
assert.Equal(t, fakeErr, e)
}).
SubscribeOn(scheduler.Parallel()).
Block(context.Background())
assert.Error(t, err, "should return error")
assert.Equal(t, fakeErr, err)
assert.Equal(t, int32(1), atomic.LoadInt32(finallyCalls))
assert.Equal(t, int32(1), atomic.LoadInt32(subscribeCalls))
}

func TestIsSubscribeOnParallel(t *testing.T) {
assert.False(t, mono.IsSubscribeAsync(mono.Just(1)))
assert.True(t, mono.IsSubscribeAsync(mono.Just(1).SubscribeOn(scheduler.Parallel())))
assert.True(t, mono.IsSubscribeAsync(mono.Just(1).SubscribeOn(scheduler.Single())))
assert.True(t, mono.IsSubscribeAsync(mono.JustOneshot(1).SubscribeOn(scheduler.Elastic())))
}

func TestJust(t *testing.T) {
assert.Panics(t, func() {
mono.Just(nil)
})
assert.Panics(t, func() {
mono.JustOneshot(nil)
})
assert.NotNil(t, mono.Just(1))
assert.NotNil(t, mono.JustOneshot(1))
}
17 changes: 17 additions & 0 deletions mono/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mono_test

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -51,3 +52,19 @@ func TestProcessor_Context(t *testing.T) {
Subscribe(ctx)
<-done
}

func TestProcessor_Error(t *testing.T) {
fakeErr := errors.New("fake error")
p := mono.CreateProcessor()
done := make(chan error, 1)
p.
DoOnError(func(e error) {
done <- e
}).
Subscribe(context.Background())

time.Sleep(100 * time.Millisecond)
p.Error(fakeErr)
e := <-done
assert.Equal(t, fakeErr, e)
}
20 changes: 17 additions & 3 deletions mono/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package mono

import (
"context"
"errors"
"time"
)

var empty = wrap(newMonoJust(nil))
var errJustNilValue = errors.New("require non nil value")
var _errJustNilValue = "require non nil value"

func Error(e error) Mono {
return wrap(newMonoError(e))
}

func ErrorOneshot(e error) Mono {
return borrowOneshotWrapper(newMonoError(e))
}

func Empty() Mono {
return empty
}
Expand All @@ -26,15 +29,26 @@ func JustOrEmpty(v Any) Mono {

func Just(v Any) Mono {
if v == nil {
panic(errJustNilValue)
panic(_errJustNilValue)
}
return wrap(newMonoJust(v))
}

func JustOneshot(v Any) Mono {
if v == nil {
panic(_errJustNilValue)
}
return borrowOneshotWrapper(newMonoJust(v))
}

func Create(gen func(ctx context.Context, s Sink)) Mono {
return wrap(newMonoCreate(gen))
}

func CreateOneshot(gen func(ctx context.Context, s Sink)) Mono {
return borrowOneshotWrapper(newMonoCreate(gen))
}

func Delay(delay time.Duration) Mono {
return wrap(newMonoDelay(delay))
}
Expand Down
34 changes: 4 additions & 30 deletions mono/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal/subscribers"
"github.com/jjeffcaii/reactor-go/scheduler"
)

Expand Down Expand Up @@ -69,7 +68,7 @@ func (p wrapper) DoOnSubscribe(fn reactor.FnOnSubscribe) Mono {
}

func (p wrapper) DelayElement(delay time.Duration) Mono {
return wrap(newMonoDelayElement(p.RawPublisher, delay, scheduler.Elastic()))
return wrap(newMonoDelayElement(p.RawPublisher, delay, scheduler.Parallel()))
}

func (p wrapper) Timeout(timeout time.Duration) Mono {
Expand All @@ -80,40 +79,15 @@ func (p wrapper) Timeout(timeout time.Duration) Mono {
}

func (p wrapper) Block(ctx context.Context) (Any, error) {
done := make(chan struct{})
vchan := make(chan reactor.Any, 1)
echan := make(chan error, 1)
b := subscribers.NewBlockSubscriber(done, vchan, echan)
p.SubscribeWith(ctx, b)
<-done

defer close(vchan)
defer close(echan)

select {
case value := <-vchan:
return value, nil
case err := <-echan:
return nil, err
default:
return nil, nil
}
return block(ctx, p.RawPublisher)
}

func (p wrapper) Success(v Any) {
p.mustProcessor().Success(v)
mustProcessor(p.RawPublisher).Success(v)
}

func (p wrapper) Error(e error) {
p.mustProcessor().Error(e)
}

func (p wrapper) mustProcessor() *processor {
pp, ok := p.RawPublisher.(*processor)
if !ok {
panic(errNotProcessor)
}
return pp
mustProcessor(p.RawPublisher).Error(e)
}

func wrap(r reactor.RawPublisher) wrapper {
Expand Down
Loading

0 comments on commit 0c66f30

Please sign in to comment.