diff --git a/scheduler/elastic.go b/scheduler/elastic.go index 1dd6d82..1859b76 100644 --- a/scheduler/elastic.go +++ b/scheduler/elastic.go @@ -27,10 +27,8 @@ func (e *elasticScheduler) Close() (err error) { return } -func (e *elasticScheduler) Do(job Task) { - if err := e.pool.Submit(job); err != nil { - panic(err) - } +func (e *elasticScheduler) Do(job Task) error { + return e.pool.Submit(job) } func (e *elasticScheduler) Worker() Worker { diff --git a/scheduler/elastic_test.go b/scheduler/elastic_test.go index 383d769..f4c3bf4 100644 --- a/scheduler/elastic_test.go +++ b/scheduler/elastic_test.go @@ -22,22 +22,18 @@ func TestNewElastic(t *testing.T) { const size = 100 sc := scheduler.NewElastic(size) - assert.NotPanics(t, func() { - wg := sync.WaitGroup{} - wg.Add(size * 2) - for i := 0; i < size*2; i++ { - sc.Worker().Do(func() { - wg.Done() - }) - } - wg.Wait() - }) - assert.NoError(t, sc.Close()) + wg := sync.WaitGroup{} + wg.Add(size * 2) + for i := 0; i < size*2; i++ { + assert.NoError(t, sc.Worker().Do(func() { + wg.Done() + }), "should not return error") + } + wg.Wait() - assert.Panics(t, func() { - sc.Worker().Do(func() { - // noop - }) - }, "should panic after closing scheduler") + assert.NoError(t, sc.Close()) + assert.Error(t, sc.Worker().Do(func() { + // noop + }), "should return error after closing scheduler") } diff --git a/scheduler/immediate.go b/scheduler/immediate.go index 1b2872d..30f67bf 100644 --- a/scheduler/immediate.go +++ b/scheduler/immediate.go @@ -19,8 +19,9 @@ func (i immediateScheduler) Close() error { return nil } -func (i immediateScheduler) Do(job Task) { +func (i immediateScheduler) Do(job Task) error { job() + return nil } func (i immediateScheduler) Worker() Worker { diff --git a/scheduler/parallel.go b/scheduler/parallel.go index 66c0b7a..c6bf0d3 100644 --- a/scheduler/parallel.go +++ b/scheduler/parallel.go @@ -19,8 +19,9 @@ func (p parallelScheduler) Close() error { return nil } -func (p parallelScheduler) Do(j Task) { +func (p parallelScheduler) Do(j Task) error { go j() + return nil } func (p parallelScheduler) Worker() Worker { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 59e61cf..17714da 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,6 +1,8 @@ package scheduler -import "io" +import ( + "io" +) // Task defines task function. type Task func() @@ -8,7 +10,7 @@ type Task func() // Worker is used to execute a task. type Worker interface { // Do executes a task. - Do(Task) + Do(Task) error } // Scheduler schedule tasks. diff --git a/scheduler/single.go b/scheduler/single.go index 43d4007..0b24563 100644 --- a/scheduler/single.go +++ b/scheduler/single.go @@ -1,13 +1,15 @@ package scheduler import ( + "errors" "sync" - "sync/atomic" ) // DefaultSingleCap is default task queue size. const DefaultSingleCap = 1000 +var errSchedulerClosed = errors.New("scheduler has been closed") + const _singleName = "single" var ( @@ -17,7 +19,7 @@ var ( type singleScheduler struct { jobs chan Task - started int64 + started sync.Once } func (p *singleScheduler) Name() string { @@ -35,14 +37,20 @@ func (p *singleScheduler) start() { } } -func (p *singleScheduler) Do(j Task) { +func (p *singleScheduler) Do(j Task) (err error) { + defer func() { + if e := recover(); e != nil { + err = errSchedulerClosed + } + }() p.jobs <- j + return } func (p *singleScheduler) Worker() Worker { - if atomic.AddInt64(&p.started, 1) == 1 { + p.started.Do(func() { go p.start() - } + }) return p } diff --git a/scheduler/single_test.go b/scheduler/single_test.go index aa1ed7f..678a031 100644 --- a/scheduler/single_test.go +++ b/scheduler/single_test.go @@ -15,21 +15,18 @@ func TestSingle(t *testing.T) { single := scheduler.Single() fmt.Println(single.Name()) - assert.NotPanics(t, func() { - wg := sync.WaitGroup{} - wg.Add(totals) - for range [totals]struct{}{} { - single.Worker().Do(func() { - wg.Done() - }) - } - wg.Wait() - }) - assert.NoError(t, single.Close(), "close should not return error") + wg := sync.WaitGroup{} + wg.Add(totals) + for range [totals]struct{}{} { + assert.NoError(t, single.Worker().Do(func() { + wg.Done() + }), "should not return error") + } + wg.Wait() - assert.Panics(t, func() { - single.Worker().Do(func() { - // noop - }) - }, "should panic after closing scheduler") + assert.NoError(t, single.Close(), "close should not return error") + noop := func() { + // noop + } + assert.Error(t, single.Worker().Do(noop), "should return error after closing scheduler") }