diff --git a/README.md b/README.md
index 92d1cc0..dd1bc69 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,5 @@
# reactor-go 🚀🚀🚀
-
![GitHub Workflow Status](https://github.com/jjeffcaii/reactor-go/workflows/Go/badge.svg)
[![codecov](https://codecov.io/gh/jjeffcaii/reactor-go/branch/master/graph/badge.svg)](https://codecov.io/gh/jjeffcaii/reactor-go)
[![GoDoc](https://godoc.org/github.com/jjeffcaii/reactor-go?status.svg)](https://godoc.org/github.com/jjeffcaii/reactor-go)
@@ -9,8 +8,6 @@
[![GitHub Release](https://img.shields.io/github/release-pre/jjeffcaii/reactor-go.svg)](https://github.com/jjeffcaii/reactor-go/releases)
> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
-
🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
-
⚠️⚠️⚠️ ***DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
## Install
@@ -20,10 +17,6 @@ go get -u github.com/jjeffcaii/reactor-go
## Example
-> NOTICE:
-
We can only use `func(interface{})interface{}` for most operations because Golang has not Generics. 😭
-
If you have any better idea, please let me know. 😀
-
### Mono
```go
package mono_test
diff --git a/mono/schedule_on.go b/mono/schedule_on.go
index fd34fa9..839b0ff 100644
--- a/mono/schedule_on.go
+++ b/mono/schedule_on.go
@@ -4,6 +4,7 @@ import (
"context"
"github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/internal"
"github.com/jjeffcaii/reactor-go/scheduler"
)
@@ -16,7 +17,8 @@ func (m monoScheduleOn) SubscribeWith(ctx context.Context, s reactor.Subscriber)
if err := m.sc.Worker().Do(func() {
m.source.SubscribeWith(ctx, s)
}); err != nil {
- panic(err)
+ s.OnSubscribe(ctx, internal.EmptySubscription)
+ s.OnError(err)
}
}
diff --git a/scheduler/elastic.go b/scheduler/elastic.go
index 77a52be..412f6fe 100644
--- a/scheduler/elastic.go
+++ b/scheduler/elastic.go
@@ -41,9 +41,12 @@ func (e *elasticScheduler) Worker() Worker {
// NewElastic creates a new elastic scheduler.
func NewElastic(size int) Scheduler {
pool, _ := ants.NewPool(size)
- return &elasticScheduler{
- pool: pool,
- }
+ return NewAnts(pool)
+}
+
+// NewAnts creates a new scheduler over ants pool.
+func NewAnts(pool *ants.Pool) Scheduler {
+ return &elasticScheduler{pool: pool}
}
// Elastic is a dynamic alloc scheduler.
diff --git a/scheduler/elastic_test.go b/scheduler/elastic_test.go
index 28205e1..2b2a21c 100644
--- a/scheduler/elastic_test.go
+++ b/scheduler/elastic_test.go
@@ -6,8 +6,9 @@ import (
"testing"
"time"
- "github.com/jjeffcaii/reactor-go/scheduler"
"github.com/stretchr/testify/assert"
+
+ "github.com/jjeffcaii/reactor-go/scheduler"
)
func TestElastic(t *testing.T) {
@@ -54,5 +55,4 @@ func TestElasticBounded(t *testing.T) {
}
wg.Wait()
})
-
}