Skip to content

Commit

Permalink
feat: add ants scheduler (#37)
Browse files Browse the repository at this point in the history
Co-authored-by: caiweiwei.cww <caiweiwei.cww@alibaba-inc.com>
  • Loading branch information
jjeffcaii and caiweiwei.cww authored Mar 14, 2022
1 parent 3addfdc commit b37aa4c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# reactor-go 🚀🚀🚀


![GitHub Workflow Status](https://github.com/jjeffcaii/reactor-go/workflows/Go/badge.svg)
[![codecov](https://codecov.io/gh/jjeffcaii/reactor-go/branch/master/graph/badge.svg)](https://codecov.io/gh/jjeffcaii/reactor-go)
[![GoDoc](https://godoc.org/github.com/jjeffcaii/reactor-go?status.svg)](https://godoc.org/github.com/jjeffcaii/reactor-go)
Expand All @@ -9,8 +8,6 @@
[![GitHub Release](https://img.shields.io/github/release-pre/jjeffcaii/reactor-go.svg)](https://github.com/jjeffcaii/reactor-go/releases)

> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
<br>🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
<br>⚠️⚠️⚠️ ***DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
## Install

Expand All @@ -20,10 +17,6 @@ go get -u github.com/jjeffcaii/reactor-go

## Example

> NOTICE:
<br> We can only use `func(interface{})interface{}` for most operations because Golang has not Generics. 😭
<br> If you have any better idea, please let me know. 😀
### Mono
```go
package mono_test
Expand Down
4 changes: 3 additions & 1 deletion mono/schedule_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal"
"github.com/jjeffcaii/reactor-go/scheduler"
)

Expand All @@ -16,7 +17,8 @@ func (m monoScheduleOn) SubscribeWith(ctx context.Context, s reactor.Subscriber)
if err := m.sc.Worker().Do(func() {
m.source.SubscribeWith(ctx, s)
}); err != nil {
panic(err)
s.OnSubscribe(ctx, internal.EmptySubscription)
s.OnError(err)
}
}

Expand Down
9 changes: 6 additions & 3 deletions scheduler/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ func (e *elasticScheduler) Worker() Worker {
// NewElastic creates a new elastic scheduler.
func NewElastic(size int) Scheduler {
pool, _ := ants.NewPool(size)
return &elasticScheduler{
pool: pool,
}
return NewAnts(pool)
}

// NewAnts creates a new scheduler over ants pool.
func NewAnts(pool *ants.Pool) Scheduler {
return &elasticScheduler{pool: pool}
}

// Elastic is a dynamic alloc scheduler.
Expand Down
4 changes: 2 additions & 2 deletions scheduler/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"
"time"

"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/stretchr/testify/assert"

"github.com/jjeffcaii/reactor-go/scheduler"
)

func TestElastic(t *testing.T) {
Expand Down Expand Up @@ -54,5 +55,4 @@ func TestElasticBounded(t *testing.T) {
}
wg.Wait()
})

}

0 comments on commit b37aa4c

Please sign in to comment.