From ca9010bd387e14a4f63ddfdf54fe00a34d0f23e3 Mon Sep 17 00:00:00 2001 From: b97tsk Date: Wed, 8 Nov 2023 08:00:00 +0800 Subject: [PATCH] add WaitGroup type --- blocking.go | 87 ++++++++++++++++------------ combinelatest2.go | 15 ++--- combinelatest3.go | 17 ++---- combinelatest4.go | 19 +++--- combinelatest5.go | 21 +++---- combinelatest6.go | 23 +++----- combinelatest7.go | 25 ++++---- combinelatest8.go | 27 ++++----- combinelatest9.go | 29 ++++------ congest.go | 6 +- empty.go | 8 +-- example_test.go | 25 +++++++- go.go | 30 ---------- internal/waitgroup/waitgroup.go | 44 -------------- internal/waitgroup/waitgroup_test.go | 40 ------------- merge.go | 11 ++-- multicast.go | 4 +- multicastreplay.go | 3 +- observable.go | 8 ++- race.go | 6 +- share.go | 4 +- timeout.go | 2 +- timing.go | 4 +- waitgroup.go | 66 +++++++++++++++++++++ withlatestfrom1.go | 15 ++--- withlatestfrom2.go | 17 ++---- withlatestfrom3.go | 19 +++--- withlatestfrom4.go | 21 +++---- withlatestfrom5.go | 23 +++----- withlatestfrom6.go | 25 ++++---- withlatestfrom7.go | 27 ++++----- withlatestfrom8.go | 29 ++++------ zip2.go | 10 ++-- zip3.go | 12 ++-- zip4.go | 14 ++--- zip5.go | 16 +++-- zip6.go | 18 +++--- zip7.go | 20 +++---- zip8.go | 22 ++++--- zip9.go | 24 ++++---- 40 files changed, 360 insertions(+), 476 deletions(-) delete mode 100644 go.go delete mode 100644 internal/waitgroup/waitgroup.go delete mode 100644 internal/waitgroup/waitgroup_test.go create mode 100644 waitgroup.go diff --git a/blocking.go b/blocking.go index eafd174e..1a2be01f 100644 --- a/blocking.go +++ b/blocking.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // BlockingFirst subscribes to the source Observable, and returns // the first value emitted by the source. @@ -17,18 +13,22 @@ import ( // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingFirst(ctx context.Context) (v T, err error) { - child, cancel := context.WithCancel(ctx) - child, wg := waitgroup.Install(child) + var wg WaitGroup + + child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg)) res := Error[T](ErrEmpty) var noop bool + wg.Add(1) + obs.Subscribe(child, func(n Notification[T]) { if noop { return @@ -71,10 +71,11 @@ func (obs Observable[T]) BlockingFirst(ctx context.Context) (v T, err error) { // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingFirstOrElse(ctx context.Context, def T) T { v, err := obs.BlockingFirst(ctx) if err != nil { @@ -95,16 +96,20 @@ func (obs Observable[T]) BlockingFirstOrElse(ctx context.Context, def T) T { // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingLast(ctx context.Context) (v T, err error) { - child, cancel := context.WithCancel(ctx) - child, wg := waitgroup.Install(child) + var wg WaitGroup + + child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg)) res := Error[T](ErrEmpty) + wg.Add(1) + obs.Subscribe(child, func(n Notification[T]) { if n.HasValue || n.HasError { res = n @@ -143,10 +148,11 @@ func (obs Observable[T]) BlockingLast(ctx context.Context) (v T, err error) { // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingLastOrElse(ctx context.Context, def T) T { v, err := obs.BlockingLast(ctx) if err != nil { @@ -168,18 +174,22 @@ func (obs Observable[T]) BlockingLastOrElse(ctx context.Context, def T) T { // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingSingle(ctx context.Context) (v T, err error) { - child, cancel := context.WithCancel(ctx) - child, wg := waitgroup.Install(child) + var wg WaitGroup + + child, cancel := context.WithCancel(WithWaitGroup(ctx, &wg)) res := Error[T](ErrEmpty) var noop bool + wg.Add(1) + obs.Subscribe(child, func(n Notification[T]) { if noop { return @@ -232,14 +242,19 @@ func (obs Observable[T]) BlockingSingle(ctx context.Context) (v T, err error) { // // Like any other Blocking methods, this method waits for every goroutine // started during subscription to complete before returning. -// To have this works properly, one must use [Go] function rather than -// go statements to start new goroutines during subscription, especially -// when one need to subscribe to other Observables in a goroutine (otherwise, -// their program might panic randomly). +// To have this work properly, Observables must use [WaitGroupFromContext] +// to obtain a WaitGroup and use [WaitGroup.Go] rather than built-in go +// statements to start new goroutines during subscription, especially when +// they need to subscribe to other Observables in a goroutine; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). func (obs Observable[T]) BlockingSubscribe(ctx context.Context, sink Observer[T]) error { + var wg WaitGroup + + child := WithWaitGroup(ctx, &wg) + var res Notification[T] - child, wg := waitgroup.Install(ctx) + wg.Add(1) obs.Subscribe(child, func(n Notification[T]) { res = n diff --git a/combinelatest2.go b/combinelatest2.go index 42b29762..07be838f 100644 --- a/combinelatest2.go +++ b/combinelatest2.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest2 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -18,6 +14,7 @@ func CombineLatest2[T1, T2, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -30,9 +27,7 @@ func CombineLatest2[T1, T2, R any]( chan1 := make(chan Notification[T1]) chan2 := make(chan Notification[T2]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState2[T1, T2] done := false @@ -47,8 +42,8 @@ func CombineLatest2[T1, T2, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) } } diff --git a/combinelatest3.go b/combinelatest3.go index 29f505f4..1b48d414 100644 --- a/combinelatest3.go +++ b/combinelatest3.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest3 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -19,6 +15,7 @@ func CombineLatest3[T1, T2, T3, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -32,9 +29,7 @@ func CombineLatest3[T1, T2, T3, R any]( chan2 := make(chan Notification[T2]) chan3 := make(chan Notification[T3]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState3[T1, T2, T3] done := false @@ -51,9 +46,9 @@ func CombineLatest3[T1, T2, T3, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) } } diff --git a/combinelatest4.go b/combinelatest4.go index b4739cf4..5c7be507 100644 --- a/combinelatest4.go +++ b/combinelatest4.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest4 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -20,6 +16,7 @@ func CombineLatest4[T1, T2, T3, T4, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -34,9 +31,7 @@ func CombineLatest4[T1, T2, T3, T4, R any]( chan3 := make(chan Notification[T3]) chan4 := make(chan Notification[T4]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState4[T1, T2, T3, T4] done := false @@ -55,10 +50,10 @@ func CombineLatest4[T1, T2, T3, T4, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) } } diff --git a/combinelatest5.go b/combinelatest5.go index 449649d6..99a0641c 100644 --- a/combinelatest5.go +++ b/combinelatest5.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest5 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -21,6 +17,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -36,9 +33,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any]( chan4 := make(chan Notification[T4]) chan5 := make(chan Notification[T5]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState5[T1, T2, T3, T4, T5] done := false @@ -59,11 +54,11 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) } } diff --git a/combinelatest6.go b/combinelatest6.go index 28348dd5..aa152ca5 100644 --- a/combinelatest6.go +++ b/combinelatest6.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest6 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -22,6 +18,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -38,9 +35,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any]( chan5 := make(chan Notification[T5]) chan6 := make(chan Notification[T6]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState6[T1, T2, T3, T4, T5, T6] done := false @@ -63,12 +58,12 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) } } diff --git a/combinelatest7.go b/combinelatest7.go index 69141992..73196049 100644 --- a/combinelatest7.go +++ b/combinelatest7.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest7 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -23,6 +19,7 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -40,9 +37,7 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any]( chan6 := make(chan Notification[T6]) chan7 := make(chan Notification[T7]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState7[T1, T2, T3, T4, T5, T6, T7] done := false @@ -67,13 +62,13 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) } } diff --git a/combinelatest8.go b/combinelatest8.go index cec369d0..27bf58e7 100644 --- a/combinelatest8.go +++ b/combinelatest8.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest8 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -24,6 +20,7 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -42,9 +39,7 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( chan7 := make(chan Notification[T7]) chan8 := make(chan Notification[T8]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8] done := false @@ -71,14 +66,14 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) } } diff --git a/combinelatest9.go b/combinelatest9.go index c84727d2..da39de87 100644 --- a/combinelatest9.go +++ b/combinelatest9.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // CombineLatest9 combines multiple Observables to create an Observable that // emits projection of latest values of each of its input Observables. @@ -25,6 +21,7 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -44,9 +41,7 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( chan8 := make(chan Notification[T8]) chan9 := make(chan Notification[T9]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] done := false @@ -75,15 +70,15 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) - Go(ctxHoisted, func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) } } diff --git a/congest.go b/congest.go index 6b772820..5574802c 100644 --- a/congest.go +++ b/congest.go @@ -38,7 +38,9 @@ func (obs congestObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) cout := make(chan Notification[T]) - Go(ctx, func() { + wg := WaitGroupFromContext(ctx) + + wg.Go(func() { for n := range cout { sink(n) } @@ -49,7 +51,7 @@ func (obs congestObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) cin := make(chan Notification[T]) noop := make(chan struct{}) - Go(ctx, func() { + wg.Go(func() { var q queue.Queue[Notification[T]] for { diff --git a/empty.go b/empty.go index 66302ba8..95e10761 100644 --- a/empty.go +++ b/empty.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // Empty returns an Observable that emits no values and immediately completes. func Empty[T any]() Observable[T] { @@ -24,7 +20,7 @@ func Never[T any]() Observable[T] { func never[T any](ctx context.Context, sink Observer[T]) { if ctx.Done() != nil { - wg := waitgroup.Get(ctx) + wg := WaitGroupFromContext(ctx) if wg != nil { wg.Add(1) } diff --git a/example_test.go b/example_test.go index a2dd73c7..1ad785b9 100644 --- a/example_test.go +++ b/example_test.go @@ -56,7 +56,7 @@ func Example() { } func Example_blocking() { - ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond) + ctx, cancel := context.WithTimeout(context.TODO(), 700*time.Millisecond) defer cancel() obs := rx.Pipe4( @@ -81,3 +81,26 @@ func Example_blocking() { // 21 // context deadline exceeded } + +func Example_waitGroup() { + var wg rx.WaitGroup + + ctx := rx.WithWaitGroup(context.TODO(), &wg) + + wg.Go(func() { + for n := 1; n < 4; n++ { + rx.Pipe2( + rx.Timer(50*time.Millisecond*time.Duration(n)), + rx.MapTo[time.Time](n), + rx.OnNext(func(v int) { fmt.Println(v) }), + ).Subscribe(ctx, rx.Noop[int]) + } + }) + + wg.Wait() + + // Output: + // 1 + // 2 + // 3 +} diff --git a/go.go b/go.go deleted file mode 100644 index 4e93841f..00000000 --- a/go.go +++ /dev/null @@ -1,30 +0,0 @@ -package rx - -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) - -// Go calls f in a new goroutine. -// -// Internally, a WaitGroup might have been installed in ctx. -// If that is the case, Go calls Add(1) on that WaitGroup before starting -// a goroutine to call f, and calls Done() on that WaitGroup after f returns. -// -// To correctly use Go, Go must be called by an Observable while being -// subscribed (by another Observable or Go), or by another Go. -func Go(ctx context.Context, f func()) { - wg := waitgroup.Get(ctx) - if wg == nil { - go f() - return - } - - wg.Add(1) - - go func() { - defer wg.Done() - f() - }() -} diff --git a/internal/waitgroup/waitgroup.go b/internal/waitgroup/waitgroup.go deleted file mode 100644 index f5823646..00000000 --- a/internal/waitgroup/waitgroup.go +++ /dev/null @@ -1,44 +0,0 @@ -package waitgroup - -import ( - "context" - "sync" -) - -type WaitGroup = sync.WaitGroup - -var ctxKey int - -// Get returns the latest WaitGroup that was installed in ctx, or nil -// if there is none. -func Get(ctx context.Context) *WaitGroup { - wg, _ := ctx.Value(&ctxKey).(*WaitGroup) - return wg -} - -// Hoist returns a copy of ctx that pulls the latest installed WaitGroup -// up close to the surface such that Get would perform better. -func Hoist(ctx context.Context) context.Context { - return context.WithValue(ctx, &ctxKey, Get(ctx)) -} - -// Install returns a copy of ctx and a new WaitGroup that is bound to it. -// -// The WaitGroup returned has been added once, which means a Done call is -// expected somewhere after Install. -func Install(ctx context.Context) (context.Context, *WaitGroup) { - var wg WaitGroup - - wg.Add(1) - - if pwg := Get(ctx); pwg != nil { - pwg.Add(1) - - go func() { - wg.Wait() - pwg.Done() - }() - } - - return context.WithValue(ctx, &ctxKey, &wg), &wg -} diff --git a/internal/waitgroup/waitgroup_test.go b/internal/waitgroup/waitgroup_test.go deleted file mode 100644 index 107e06c3..00000000 --- a/internal/waitgroup/waitgroup_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package waitgroup_test - -import ( - "context" - "testing" - "time" - - "github.com/b97tsk/rx/internal/waitgroup" -) - -func TestWaitGroup(t *testing.T) { - t.Parallel() - - ctx1, wg1 := waitgroup.Install(context.Background()) - ctx2, wg2 := waitgroup.Install(ctx1) - - done := make(chan struct{}) - - go func() { - wg2.Wait() - wg1.Wait() - close(done) - }() - - wg1.Done() - wg2.Done() - - select { - case <-done: - case <-time.After(1 * time.Second): - t.Fatal("timeout waiting for done") - } - - ctx1 = waitgroup.Hoist(ctx1) - ctx2 = waitgroup.Hoist(ctx2) - - if wg1 != waitgroup.Get(ctx1) || wg2 != waitgroup.Get(ctx2) { - t.Fail() - } -} diff --git a/merge.go b/merge.go index b6febce9..1582a89d 100644 --- a/merge.go +++ b/merge.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Merge creates an Observable that concurrently emits all values from every @@ -30,6 +29,7 @@ func MergeWith[T any](some ...Observable[T]) Operator[T, T] { } func (some observables[T]) Merge(ctx context.Context, sink Observer[T]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) sink = sink.OnLastNotification(cancel).Serialized() @@ -44,12 +44,10 @@ func (some observables[T]) Merge(ctx context.Context, sink Observer[T]) { } } - ctxHoisted := waitgroup.Hoist(ctx) - for _, obs := range some { obs := obs - Go(ctxHoisted, func() { obs.Subscribe(ctx, observer) }) + wg.Go(func() { obs.Subscribe(ctx, observer) }) } } @@ -126,6 +124,7 @@ type mergeMapObservable[T, R any] struct { } func (obs mergeMapObservable[T, R]) Subscribe(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) sink = sink.OnLastNotification(cancel).Serialized() @@ -142,14 +141,12 @@ func (obs mergeMapObservable[T, R]) Subscribe(ctx context.Context, sink Observer var startWorker func() - ctxHoisted := waitgroup.Hoist(ctx) - startWorker = func() { obs1 := obs.Project(x.Queue.Pop()) x.Queue.Unlock() - Go(ctxHoisted, func() { + wg.Go(func() { obs1.Subscribe(ctx, func(n Notification[R]) { if n.HasValue { sink(n) diff --git a/multicast.go b/multicast.go index b424a7af..c4393a87 100644 --- a/multicast.go +++ b/multicast.go @@ -4,8 +4,6 @@ import ( "context" "runtime" "sync" - - "github.com/b97tsk/rx/internal/waitgroup" ) // Multicast returns a Subject whose Observable part takes care of @@ -96,7 +94,7 @@ func (m *multicast[T]) subscribe(ctx context.Context, sink Observer[T]) { observer := sink m.obs.Add(&observer) - wg := waitgroup.Get(ctx) + wg := WaitGroupFromContext(ctx) if wg != nil { wg.Add(1) } diff --git a/multicastreplay.go b/multicastreplay.go index 957e7cfc..9ac7f804 100644 --- a/multicastreplay.go +++ b/multicastreplay.go @@ -7,7 +7,6 @@ import ( "time" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // ReplayConfig carries options for MulticastReplay. @@ -174,7 +173,7 @@ func (m *multicastReplay[T]) subscribe(ctx context.Context, sink Observer[T]) { observer := sink m.obs.Add(&observer) - wg := waitgroup.Get(ctx) + wg := WaitGroupFromContext(ctx) if wg != nil { wg.Add(1) } diff --git a/observable.go b/observable.go index 2ea203f1..bfde7304 100644 --- a/observable.go +++ b/observable.go @@ -25,9 +25,11 @@ import ( // emit a notification of error (as a termination) to the given Observer // as soon as possible. // -// An Observable must use [Go] function rather than go statements to start -// new goroutines; otherwise, your program might panic randomly when using -// any of the Blocking methods. +// If an Observable need to subscribe another Observable in a goroutine +// with the given context in which a [WaitGroup] is associated, it must use +// [WaitGroupFromContext] to obtain the WaitGroup and use [WaitGroup.Go] +// rather than built-in go statements to start new goroutines; otherwise, +// runtime panicking might happen randomly (WaitGroup misuse). // // Observables are expected to be sequential. If you want to do something // parallel, you will need to divide it (as an Observable) into pieces diff --git a/race.go b/race.go index dfb5c38c..88c3641b 100644 --- a/race.go +++ b/race.go @@ -3,8 +3,6 @@ package rx import ( "context" "sync/atomic" - - "github.com/b97tsk/rx/internal/waitgroup" ) // Race creates an Observable that mirrors the first Observable to emit @@ -36,12 +34,12 @@ func (some observables[T]) Race(ctx context.Context, sink Observer[T]) { var race atomic.Uint32 - ctxHoisted := waitgroup.Hoist(ctx) + wg := WaitGroupFromContext(ctx) for index, obs := range some { index, obs := index, obs - Go(ctxHoisted, func() { + wg.Go(func() { var won, lost bool obs.Subscribe(subs[index].Left(), func(n Notification[T]) { diff --git a/share.go b/share.go index 1564652f..f6ed5e9e 100644 --- a/share.go +++ b/share.go @@ -3,8 +3,6 @@ package rx import ( "context" "sync" - - "github.com/b97tsk/rx/internal/waitgroup" ) // Share returns a new Observable that multicasts (shares) the source @@ -126,7 +124,7 @@ func (obs *shareObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) }) } - wg := waitgroup.Get(ctx) + wg := WaitGroupFromContext(ctx) if wg != nil { wg.Add(1) } diff --git a/timeout.go b/timeout.go index bb7af45a..1f72e7de 100644 --- a/timeout.go +++ b/timeout.go @@ -57,7 +57,7 @@ func (obs timeoutObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) c := make(chan Notification[T]) noop := make(chan struct{}) - Go(ctx, func() { + WaitGroupFromContext(ctx).Go(func() { tm := timerpool.Get(obs.First) for { diff --git a/timing.go b/timing.go index 2dff7651..eba10b73 100644 --- a/timing.go +++ b/timing.go @@ -17,7 +17,7 @@ func Ticker(d time.Duration) Observable[time.Time] { return func(ctx context.Context, sink Observer[time.Time]) { tk := time.NewTicker(d) - Go(ctx, func() { + WaitGroupFromContext(ctx).Go(func() { defer tk.Stop() done := ctx.Done() @@ -41,7 +41,7 @@ func Timer(d time.Duration) Observable[time.Time] { return func(ctx context.Context, sink Observer[time.Time]) { tm := timerpool.Get(d) - Go(ctx, func() { + WaitGroupFromContext(ctx).Go(func() { select { case <-ctx.Done(): timerpool.Put(tm) diff --git a/waitgroup.go b/waitgroup.go new file mode 100644 index 00000000..366e0a83 --- /dev/null +++ b/waitgroup.go @@ -0,0 +1,66 @@ +package rx + +import ( + "context" + "sync" +) + +var waitGroupKey int + +// A WaitGroup waits for a collection of goroutines to finish. +// The main goroutine calls Add to set the number of +// goroutines to wait for. Then each of the goroutines +// runs and calls Done when finished. At the same time, +// Wait can be used to block until all goroutines have finished. +// +// A WaitGroup must not be copied after first use. +type WaitGroup sync.WaitGroup + +// WithWaitGroup returns a copy of ctx in which wg is associated. +func WithWaitGroup(ctx context.Context, wg *WaitGroup) context.Context { + return context.WithValue(ctx, &waitGroupKey, wg) +} + +// WaitGroupFromContext returns the WaitGroup that was associated in ctx. +// +// If there is no WaitGroup associated in ctx, WaitGroupFromContext returns +// nil; though, [WaitGroup.Go] is still safe for use. +func WaitGroupFromContext(ctx context.Context) *WaitGroup { + wg, _ := ctx.Value(&waitGroupKey).(*WaitGroup) + return wg +} + +// Add adds delta, which may be negative, to the WaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait are released. +// If the counter goes negative, Add panics. +func (wg *WaitGroup) Add(delta int) { + (*sync.WaitGroup)(wg).Add(delta) +} + +// Done decrements the WaitGroup counter by one. +func (wg *WaitGroup) Done() { + wg.Add(-1) +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *WaitGroup) Wait() { + (*sync.WaitGroup)(wg).Wait() +} + +// Go calls f in a new goroutine. +// +// If wg is not nil, Go calls wg.Add(1) before starting a goroutine +// to call f, and calls wg.Done() after f returns. +func (wg *WaitGroup) Go(f func()) { + if wg == nil { + go f() + return + } + + wg.Add(1) + + go func() { + defer wg.Done() + f() + }() +} diff --git a/withlatestfrom1.go b/withlatestfrom1.go index 08760bb2..c8420dcc 100644 --- a/withlatestfrom1.go +++ b/withlatestfrom1.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom1 combines the source with another Observable to create // an Observable that emits projection of latest values of each Observable, @@ -30,6 +26,7 @@ func withLatestFrom2[T1, T2, R any]( proj func(v1 T1, v2 T2) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -42,9 +39,7 @@ func withLatestFrom2[T1, T2, R any]( chan1 := make(chan Notification[T1]) chan2 := make(chan Notification[T2]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState2[T1, T2] done := false @@ -59,8 +54,8 @@ func withLatestFrom2[T1, T2, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) } } diff --git a/withlatestfrom2.go b/withlatestfrom2.go index e6a64596..c16539d2 100644 --- a/withlatestfrom2.go +++ b/withlatestfrom2.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom2 combines the source with 2 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -32,6 +28,7 @@ func withLatestFrom3[T1, T2, T3, R any]( proj func(v1 T1, v2 T2, v3 T3) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -45,9 +42,7 @@ func withLatestFrom3[T1, T2, T3, R any]( chan2 := make(chan Notification[T2]) chan3 := make(chan Notification[T3]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState3[T1, T2, T3] done := false @@ -64,9 +59,9 @@ func withLatestFrom3[T1, T2, T3, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) } } diff --git a/withlatestfrom3.go b/withlatestfrom3.go index 4890efb7..6ef78136 100644 --- a/withlatestfrom3.go +++ b/withlatestfrom3.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom3 combines the source with 3 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -34,6 +30,7 @@ func withLatestFrom4[T1, T2, T3, T4, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -48,9 +45,7 @@ func withLatestFrom4[T1, T2, T3, T4, R any]( chan3 := make(chan Notification[T3]) chan4 := make(chan Notification[T4]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState4[T1, T2, T3, T4] done := false @@ -69,10 +64,10 @@ func withLatestFrom4[T1, T2, T3, T4, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) } } diff --git a/withlatestfrom4.go b/withlatestfrom4.go index f62739aa..01288ce8 100644 --- a/withlatestfrom4.go +++ b/withlatestfrom4.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom4 combines the source with 4 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -36,6 +32,7 @@ func withLatestFrom5[T1, T2, T3, T4, T5, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -51,9 +48,7 @@ func withLatestFrom5[T1, T2, T3, T4, T5, R any]( chan4 := make(chan Notification[T4]) chan5 := make(chan Notification[T5]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState5[T1, T2, T3, T4, T5] done := false @@ -74,11 +69,11 @@ func withLatestFrom5[T1, T2, T3, T4, T5, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) } } diff --git a/withlatestfrom5.go b/withlatestfrom5.go index 4b87129a..4fd42bfc 100644 --- a/withlatestfrom5.go +++ b/withlatestfrom5.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom5 combines the source with 5 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -38,6 +34,7 @@ func withLatestFrom6[T1, T2, T3, T4, T5, T6, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -54,9 +51,7 @@ func withLatestFrom6[T1, T2, T3, T4, T5, T6, R any]( chan5 := make(chan Notification[T5]) chan6 := make(chan Notification[T6]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState6[T1, T2, T3, T4, T5, T6] done := false @@ -79,12 +74,12 @@ func withLatestFrom6[T1, T2, T3, T4, T5, T6, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) } } diff --git a/withlatestfrom6.go b/withlatestfrom6.go index 4551ea81..846d94db 100644 --- a/withlatestfrom6.go +++ b/withlatestfrom6.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom6 combines the source with 6 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -40,6 +36,7 @@ func withLatestFrom7[T1, T2, T3, T4, T5, T6, T7, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -57,9 +54,7 @@ func withLatestFrom7[T1, T2, T3, T4, T5, T6, T7, R any]( chan6 := make(chan Notification[T6]) chan7 := make(chan Notification[T7]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState7[T1, T2, T3, T4, T5, T6, T7] done := false @@ -84,13 +79,13 @@ func withLatestFrom7[T1, T2, T3, T4, T5, T6, T7, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) } } diff --git a/withlatestfrom7.go b/withlatestfrom7.go index 8a3179c0..b10f903a 100644 --- a/withlatestfrom7.go +++ b/withlatestfrom7.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom7 combines the source with 7 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -42,6 +38,7 @@ func withLatestFrom8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -60,9 +57,7 @@ func withLatestFrom8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( chan7 := make(chan Notification[T7]) chan8 := make(chan Notification[T8]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8] done := false @@ -89,14 +84,14 @@ func withLatestFrom8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) } } diff --git a/withlatestfrom8.go b/withlatestfrom8.go index b57c71fa..72ed5799 100644 --- a/withlatestfrom8.go +++ b/withlatestfrom8.go @@ -1,10 +1,6 @@ package rx -import ( - "context" - - "github.com/b97tsk/rx/internal/waitgroup" -) +import "context" // WithLatestFrom8 combines the source with 8 other Observables to create // an Observable that emits projection of latest values of each Observable, @@ -44,6 +40,7 @@ func withLatestFrom9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( proj func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R, ) Observable[R] { return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -63,9 +60,7 @@ func withLatestFrom9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( chan8 := make(chan Notification[T8]) chan9 := make(chan Notification[T9]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] done := false @@ -94,15 +89,15 @@ func withLatestFrom9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) - Go(ctxHoisted, func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) } } diff --git a/zip2.go b/zip2.go index e5d35b9f..6fb5e423 100644 --- a/zip2.go +++ b/zip2.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip2 combines multiple Observables to create an Observable that emits @@ -19,6 +18,7 @@ func Zip2[T1, T2, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -31,9 +31,7 @@ func Zip2[T1, T2, R any]( chan1 := make(chan Notification[T1]) chan2 := make(chan Notification[T2]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState2[T1, T2] done := false @@ -48,8 +46,8 @@ func Zip2[T1, T2, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) } } diff --git a/zip3.go b/zip3.go index 0d1d5883..82d60aca 100644 --- a/zip3.go +++ b/zip3.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip3 combines multiple Observables to create an Observable that emits @@ -20,6 +19,7 @@ func Zip3[T1, T2, T3, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -33,9 +33,7 @@ func Zip3[T1, T2, T3, R any]( chan2 := make(chan Notification[T2]) chan3 := make(chan Notification[T3]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState3[T1, T2, T3] done := false @@ -52,9 +50,9 @@ func Zip3[T1, T2, T3, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) } } diff --git a/zip4.go b/zip4.go index f72e0d9b..43ed7347 100644 --- a/zip4.go +++ b/zip4.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip4 combines multiple Observables to create an Observable that emits @@ -21,6 +20,7 @@ func Zip4[T1, T2, T3, T4, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -35,9 +35,7 @@ func Zip4[T1, T2, T3, T4, R any]( chan3 := make(chan Notification[T3]) chan4 := make(chan Notification[T4]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState4[T1, T2, T3, T4] done := false @@ -56,10 +54,10 @@ func Zip4[T1, T2, T3, T4, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) } } diff --git a/zip5.go b/zip5.go index 76a283ad..44b3c5d8 100644 --- a/zip5.go +++ b/zip5.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip5 combines multiple Observables to create an Observable that emits @@ -22,6 +21,7 @@ func Zip5[T1, T2, T3, T4, T5, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -37,9 +37,7 @@ func Zip5[T1, T2, T3, T4, T5, R any]( chan4 := make(chan Notification[T4]) chan5 := make(chan Notification[T5]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState5[T1, T2, T3, T4, T5] done := false @@ -60,11 +58,11 @@ func Zip5[T1, T2, T3, T4, T5, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) } } diff --git a/zip6.go b/zip6.go index ff25bd21..93e14a4c 100644 --- a/zip6.go +++ b/zip6.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip6 combines multiple Observables to create an Observable that emits @@ -23,6 +22,7 @@ func Zip6[T1, T2, T3, T4, T5, T6, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -39,9 +39,7 @@ func Zip6[T1, T2, T3, T4, T5, T6, R any]( chan5 := make(chan Notification[T5]) chan6 := make(chan Notification[T6]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState6[T1, T2, T3, T4, T5, T6] done := false @@ -64,12 +62,12 @@ func Zip6[T1, T2, T3, T4, T5, T6, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) } } diff --git a/zip7.go b/zip7.go index 8d5fc474..c05845ad 100644 --- a/zip7.go +++ b/zip7.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip7 combines multiple Observables to create an Observable that emits @@ -24,6 +23,7 @@ func Zip7[T1, T2, T3, T4, T5, T6, T7, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -41,9 +41,7 @@ func Zip7[T1, T2, T3, T4, T5, T6, T7, R any]( chan6 := make(chan Notification[T6]) chan7 := make(chan Notification[T7]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState7[T1, T2, T3, T4, T5, T6, T7] done := false @@ -68,13 +66,13 @@ func Zip7[T1, T2, T3, T4, T5, T6, T7, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) } } diff --git a/zip8.go b/zip8.go index 396556ef..e8ed6a63 100644 --- a/zip8.go +++ b/zip8.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip8 combines multiple Observables to create an Observable that emits @@ -25,6 +24,7 @@ func Zip8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -43,9 +43,7 @@ func Zip8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( chan7 := make(chan Notification[T7]) chan8 := make(chan Notification[T8]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState8[T1, T2, T3, T4, T5, T6, T7, T8] done := false @@ -72,14 +70,14 @@ func Zip8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) } } diff --git a/zip9.go b/zip9.go index 7789f944..5dcfe304 100644 --- a/zip9.go +++ b/zip9.go @@ -4,7 +4,6 @@ import ( "context" "github.com/b97tsk/rx/internal/queue" - "github.com/b97tsk/rx/internal/waitgroup" ) // Zip9 combines multiple Observables to create an Observable that emits @@ -26,6 +25,7 @@ func Zip9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( } return func(ctx context.Context, sink Observer[R]) { + wg := WaitGroupFromContext(ctx) ctx, cancel := context.WithCancel(ctx) noop := make(chan struct{}) @@ -45,9 +45,7 @@ func Zip9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( chan8 := make(chan Notification[T8]) chan9 := make(chan Notification[T9]) - ctxHoisted := waitgroup.Hoist(ctx) - - Go(ctxHoisted, func() { + wg.Go(func() { var s zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] done := false @@ -76,15 +74,15 @@ func Zip9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( } }) - Go(ctxHoisted, func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) - Go(ctxHoisted, func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) - Go(ctxHoisted, func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) - Go(ctxHoisted, func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) - Go(ctxHoisted, func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) - Go(ctxHoisted, func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) - Go(ctxHoisted, func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) - Go(ctxHoisted, func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) - Go(ctxHoisted, func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) + wg.Go(func() { obs1.Subscribe(ctx, chanObserver(chan1, noop)) }) + wg.Go(func() { obs2.Subscribe(ctx, chanObserver(chan2, noop)) }) + wg.Go(func() { obs3.Subscribe(ctx, chanObserver(chan3, noop)) }) + wg.Go(func() { obs4.Subscribe(ctx, chanObserver(chan4, noop)) }) + wg.Go(func() { obs5.Subscribe(ctx, chanObserver(chan5, noop)) }) + wg.Go(func() { obs6.Subscribe(ctx, chanObserver(chan6, noop)) }) + wg.Go(func() { obs7.Subscribe(ctx, chanObserver(chan7, noop)) }) + wg.Go(func() { obs8.Subscribe(ctx, chanObserver(chan8, noop)) }) + wg.Go(func() { obs9.Subscribe(ctx, chanObserver(chan9, noop)) }) } }