Skip to content

Commit

Permalink
Func Do in Scheduler will return error.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Aug 11, 2020
1 parent 40ed4e3 commit cd0e6cb
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 45 deletions.
6 changes: 2 additions & 4 deletions scheduler/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ func (e *elasticScheduler) Close() (err error) {
return
}

func (e *elasticScheduler) Do(job Task) {
if err := e.pool.Submit(job); err != nil {
panic(err)
}
func (e *elasticScheduler) Do(job Task) error {
return e.pool.Submit(job)
}

func (e *elasticScheduler) Worker() Worker {
Expand Down
28 changes: 12 additions & 16 deletions scheduler/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,18 @@ func TestNewElastic(t *testing.T) {
const size = 100
sc := scheduler.NewElastic(size)

assert.NotPanics(t, func() {
wg := sync.WaitGroup{}
wg.Add(size * 2)
for i := 0; i < size*2; i++ {
sc.Worker().Do(func() {
wg.Done()
})
}
wg.Wait()
})
assert.NoError(t, sc.Close())
wg := sync.WaitGroup{}
wg.Add(size * 2)
for i := 0; i < size*2; i++ {
assert.NoError(t, sc.Worker().Do(func() {
wg.Done()
}), "should not return error")
}
wg.Wait()

assert.Panics(t, func() {
sc.Worker().Do(func() {
// noop
})
}, "should panic after closing scheduler")
assert.NoError(t, sc.Close())

assert.Error(t, sc.Worker().Do(func() {
// noop
}), "should return error after closing scheduler")
}
3 changes: 2 additions & 1 deletion scheduler/immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func (i immediateScheduler) Close() error {
return nil
}

func (i immediateScheduler) Do(job Task) {
func (i immediateScheduler) Do(job Task) error {
job()
return nil
}

func (i immediateScheduler) Worker() Worker {
Expand Down
3 changes: 2 additions & 1 deletion scheduler/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func (p parallelScheduler) Close() error {
return nil
}

func (p parallelScheduler) Do(j Task) {
func (p parallelScheduler) Do(j Task) error {
go j()
return nil
}

func (p parallelScheduler) Worker() Worker {
Expand Down
6 changes: 4 additions & 2 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package scheduler

import "io"
import (
"io"
)

// Task defines task function.
type Task func()

// Worker is used to execute a task.
type Worker interface {
// Do executes a task.
Do(Task)
Do(Task) error
}

// Scheduler schedule tasks.
Expand Down
18 changes: 13 additions & 5 deletions scheduler/single.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package scheduler

import (
"errors"
"sync"
"sync/atomic"
)

// DefaultSingleCap is default task queue size.
const DefaultSingleCap = 1000

var errSchedulerClosed = errors.New("scheduler has been closed")

const _singleName = "single"

var (
Expand All @@ -17,7 +19,7 @@ var (

type singleScheduler struct {
jobs chan Task
started int64
started sync.Once
}

func (p *singleScheduler) Name() string {
Expand All @@ -35,14 +37,20 @@ func (p *singleScheduler) start() {
}
}

func (p *singleScheduler) Do(j Task) {
func (p *singleScheduler) Do(j Task) (err error) {
defer func() {
if e := recover(); e != nil {
err = errSchedulerClosed
}
}()
p.jobs <- j
return
}

func (p *singleScheduler) Worker() Worker {
if atomic.AddInt64(&p.started, 1) == 1 {
p.started.Do(func() {
go p.start()
}
})
return p
}

Expand Down
29 changes: 13 additions & 16 deletions scheduler/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@ func TestSingle(t *testing.T) {
single := scheduler.Single()
fmt.Println(single.Name())

assert.NotPanics(t, func() {
wg := sync.WaitGroup{}
wg.Add(totals)
for range [totals]struct{}{} {
single.Worker().Do(func() {
wg.Done()
})
}
wg.Wait()
})
assert.NoError(t, single.Close(), "close should not return error")
wg := sync.WaitGroup{}
wg.Add(totals)
for range [totals]struct{}{} {
assert.NoError(t, single.Worker().Do(func() {
wg.Done()
}), "should not return error")
}
wg.Wait()

assert.Panics(t, func() {
single.Worker().Do(func() {
// noop
})
}, "should panic after closing scheduler")
assert.NoError(t, single.Close(), "close should not return error")
noop := func() {
// noop
}
assert.Error(t, single.Worker().Do(noop), "should return error after closing scheduler")
}

0 comments on commit cd0e6cb

Please sign in to comment.