diff --git a/empty.go b/empty.go index fa766817..66302ba8 100644 --- a/empty.go +++ b/empty.go @@ -3,7 +3,6 @@ package rx import ( "context" - "github.com/b97tsk/rx/internal/ctxwatch" "github.com/b97tsk/rx/internal/waitgroup" ) @@ -30,7 +29,7 @@ func never[T any](ctx context.Context, sink Observer[T]) { wg.Add(1) } - ctxwatch.Add(ctx, func() { + context.AfterFunc(ctx, func() { if wg != nil { defer wg.Done() } diff --git a/go.mod b/go.mod index dd158340..f2a1c2d4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/b97tsk/rx -go 1.19 +go 1.21.0 require golang.org/x/exp v0.0.0-20220428152302-39d4317da171 diff --git a/internal/ctxwatch/ctxwatch.go b/internal/ctxwatch/ctxwatch.go deleted file mode 100644 index 5f585c9b..00000000 --- a/internal/ctxwatch/ctxwatch.go +++ /dev/null @@ -1,198 +0,0 @@ -package ctxwatch - -import ( - "context" - "reflect" - "sync" - "sync/atomic" - - "github.com/b97tsk/rx/internal/queue" -) - -type watchItem struct { - Context context.Context - Callback func() -} - -type watchService chan<- watchItem - -func startService() watchService { - cin, cout := make(chan watchItem), make(chan watchItem) - - go func() { - var q queue.Queue[watchItem] - - for { - var ( - in <-chan watchItem = cin - out chan<- watchItem - outv watchItem - ) - - if q.Len() > 0 { - out, outv = cout, q.Front() - } - - select { - case item := <-in: - q.Push(item) - case out <- outv: - q.Pop() - } - } - }() - - go func() { - cases := []reflect.SelectCase{{}} - - var itemCounter atomic.Int32 - - var ( - workloadDoneChans sync.Map - workloadPerWorker atomic.Int32 - ) - - oldWorkloadPerWorker := 3 - - workloadPerWorker.Store(5) - - for item := range cout { - itemValue := reflect.ValueOf(item) - - for i, j := 1, len(cases); i < j; i++ { - cases[i].Send = itemValue - } - - dir0 := reflect.SelectSend - if n := len(cases); int(itemCounter.Load()) >= (n-1)*oldWorkloadPerWorker { - dir0 = reflect.SelectDefault - } - - cases[0].Dir = dir0 - - itemCounter.Add(1) - - for { - if i, _, _ := reflect.Select(cases); i > 0 { - break - } - - cases[0].Dir = reflect.SelectSend - - worker := make(chan watchItem) - workerValue := reflect.ValueOf(worker) - - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectSend, - Chan: workerValue, - Send: itemValue, - }) - - w := workloadPerWorker.Load() - w2 := w + int32(oldWorkloadPerWorker) - oldWorkloadPerWorker = int(w) - - workloadDoneChans.Store(w2, make(chan struct{})) - workloadPerWorker.Store(w2) - - if done, loaded := workloadDoneChans.LoadAndDelete(w); loaded { - close(done.(chan struct{})) - } - - go startWorker(workerValue, &itemCounter, &workloadDoneChans, &workloadPerWorker) - } - - for i, j := 1, len(cases); i < j; i++ { - cases[i].Send = reflect.Value{} - } - } - }() - - return cin -} - -func startWorker( - workerChan reflect.Value, - itemCounter *atomic.Int32, - workloadDoneChans *sync.Map, - workloadPerWorker *atomic.Int32, -) { - cases := []reflect.SelectCase{{Dir: reflect.SelectRecv}} - items := []watchItem{{}} - - for { - chan0 := workerChan - - if w := workloadPerWorker.Load(); int(w) < len(items) { - done, ok := workloadDoneChans.Load(w) - if !ok { // workloadPerWorker has just changed. - continue - } - - chan0 = reflect.ValueOf(done) - } - - cases[0].Chan = chan0 - - switch i, v, _ := reflect.Select(cases); i { - case 0: - item, ok := v.Interface().(watchItem) - if !ok { - break - } - - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(item.Context.Done()), - }) - - items = append(items, item) - default: - j := len(cases) - 1 - item := items[i] - - cases[i].Chan = cases[j].Chan - cases[j].Chan = reflect.Value{} - cases = cases[:j] - - items[i] = items[j] - items[j] = watchItem{} - items = items[:j] - - go item.Callback() - - itemCounter.Add(-1) - } - } -} - -var shared struct { - sync.Once - service watchService -} - -func sharedInit() { - shared.service = startService() -} - -func getService() watchService { - shared.Do(sharedInit) - return shared.service -} - -// Add starts a background service that it waits until ctx has been cancelled, -// then it calls f in a goroutine. Subsequent calls use the same service to -// deal with arbitrary number of context.Contexts. -// -// Add works just like: -// -// go func() { -// <-ctx.Done() -// f() -// }() -// -// except it need not spawn new goroutines for each ctx to wait at the time -// when Add is called. -func Add(ctx context.Context, f func()) { - getService() <- watchItem{ctx, f} -} diff --git a/internal/ctxwatch/ctxwatch_test.go b/internal/ctxwatch/ctxwatch_test.go deleted file mode 100644 index 90a3e230..00000000 --- a/internal/ctxwatch/ctxwatch_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package ctxwatch_test - -import ( - "context" - "sync/atomic" - "testing" - "time" - - "github.com/b97tsk/rx/internal/ctxwatch" -) - -func TestAdd(t *testing.T) { - t.Parallel() - - var v atomic.Int32 - - cancels := []context.CancelFunc(nil) - done := func() { v.Add(-1) } - add := func(n int) { - v.Add(int32(n)) - - for i := 0; i < n; i++ { - ctx, cancel := context.WithCancel(context.Background()) - cancels = append(cancels, cancel) - - ctxwatch.Add(ctx, done) - - time.Sleep(30 * time.Millisecond) - } - } - - remove := func(n int) { - for _, cancel := range cancels[:n] { - cancel() - } - - cancels = cancels[n:] - - time.Sleep(30 * time.Millisecond * time.Duration(n)) - } - - for n, m := 10, 50; n < m; n += 10 { - add(m - n) - remove(n) - } - - if v.Load() != 0 { - t.Fail() - } -} diff --git a/multicast.go b/multicast.go index 45732876..b424a7af 100644 --- a/multicast.go +++ b/multicast.go @@ -5,7 +5,6 @@ import ( "runtime" "sync" - "github.com/b97tsk/rx/internal/ctxwatch" "github.com/b97tsk/rx/internal/waitgroup" ) @@ -102,7 +101,7 @@ func (m *multicast[T]) subscribe(ctx context.Context, sink Observer[T]) { wg.Add(1) } - ctxwatch.Add(ctx, func() { + context.AfterFunc(ctx, func() { if wg != nil { defer wg.Done() } diff --git a/multicastreplay.go b/multicastreplay.go index 570ef15c..957e7cfc 100644 --- a/multicastreplay.go +++ b/multicastreplay.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "time" - "github.com/b97tsk/rx/internal/ctxwatch" "github.com/b97tsk/rx/internal/queue" "github.com/b97tsk/rx/internal/waitgroup" ) @@ -180,7 +179,7 @@ func (m *multicastReplay[T]) subscribe(ctx context.Context, sink Observer[T]) { wg.Add(1) } - ctxwatch.Add(ctx, func() { + context.AfterFunc(ctx, func() { if wg != nil { defer wg.Done() } diff --git a/share.go b/share.go index b9d20dc2..6cddf3b8 100644 --- a/share.go +++ b/share.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/b97tsk/rx/internal/ctxwatch" "github.com/b97tsk/rx/internal/waitgroup" ) @@ -137,7 +136,7 @@ func (obs *shareObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) wg.Add(1) } - ctxwatch.Add(ctx, func() { + context.AfterFunc(ctx, func() { if wg != nil { defer wg.Done() }