diff --git a/combinelatest2.go b/combinelatest2.go index 6fb647fc..81961a19 100644 --- a/combinelatest2.go +++ b/combinelatest2.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest2 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -9,38 +11,19 @@ func CombineLatest2[T1, T2, R any]( mapping func(v1 T1, v2 T2) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - - c.Go(func() { - var s combineLatestState2[T1, T2] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit2(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit2(o, n, mapping, &s, &s.V2, 2) - } - } - }) + var s combineLatestState2[T1, T2] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit2(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit2(o, n, mapping, &s, &s.V2, 2) }) } } type combineLatestState2[T1, T2 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -54,29 +37,38 @@ func combineLatestEmit2[T1, T2, R, X any]( s *combineLatestState2[T1, T2], v *X, bit uint8, -) bool { +) { const FullBits = 3 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try21(mapping, s.V1, s.V2, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try21(mapping, s.V1, s.V2, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest3.go b/combinelatest3.go index e67b6bf6..2149aea9 100644 --- a/combinelatest3.go +++ b/combinelatest3.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest3 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -10,42 +12,20 @@ func CombineLatest3[T1, T2, T3, R any]( mapping func(v1 T1, v2 T2, v3 T3) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - - c.Go(func() { - var s combineLatestState3[T1, T2, T3] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit3(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit3(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit3(o, n, mapping, &s, &s.V3, 4) - } - } - }) + var s combineLatestState3[T1, T2, T3] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit3(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit3(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit3(o, n, mapping, &s, &s.V3, 4) }) } } type combineLatestState3[T1, T2, T3 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -60,29 +40,38 @@ func combineLatestEmit3[T1, T2, T3, R, X any]( s *combineLatestState3[T1, T2, T3], v *X, bit uint8, -) bool { +) { const FullBits = 7 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try31(mapping, s.V1, s.V2, s.V3, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try31(mapping, s.V1, s.V2, s.V3, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest4.go b/combinelatest4.go index 718d74de..e897d57e 100644 --- a/combinelatest4.go +++ b/combinelatest4.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest4 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -11,46 +13,21 @@ func CombineLatest4[T1, T2, T3, T4, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - - c.Go(func() { - var s combineLatestState4[T1, T2, T3, T4] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit4(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit4(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit4(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit4(o, n, mapping, &s, &s.V4, 8) - } - } - }) + var s combineLatestState4[T1, T2, T3, T4] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit4(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit4(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit4(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit4(o, n, mapping, &s, &s.V4, 8) }) } } type combineLatestState4[T1, T2, T3, T4 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -66,29 +43,38 @@ func combineLatestEmit4[T1, T2, T3, T4, R, X any]( s *combineLatestState4[T1, T2, T3, T4], v *X, bit uint8, -) bool { +) { const FullBits = 15 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest5.go b/combinelatest5.go index 8fbf0810..51946094 100644 --- a/combinelatest5.go +++ b/combinelatest5.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest5 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -12,50 +14,22 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - - c.Go(func() { - var s combineLatestState5[T1, T2, T3, T4, T5] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit5(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit5(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit5(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit5(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = combineLatestEmit5(o, n, mapping, &s, &s.V5, 16) - } - } - }) + var s combineLatestState5[T1, T2, T3, T4, T5] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit5(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit5(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit5(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit5(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { combineLatestEmit5(o, n, mapping, &s, &s.V5, 16) }) } } type combineLatestState5[T1, T2, T3, T4, T5 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -72,29 +46,38 @@ func combineLatestEmit5[T1, T2, T3, T4, T5, R, X any]( s *combineLatestState5[T1, T2, T3, T4, T5], v *X, bit uint8, -) bool { +) { const FullBits = 31 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try51(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try51(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest6.go b/combinelatest6.go index 21d3d3e1..066fafd0 100644 --- a/combinelatest6.go +++ b/combinelatest6.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest6 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -13,54 +15,23 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - - c.Go(func() { - var s combineLatestState6[T1, T2, T3, T4, T5, T6] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = combineLatestEmit6(o, n, mapping, &s, &s.V6, 32) - } - } - }) + var s combineLatestState6[T1, T2, T3, T4, T5, T6] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit6(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit6(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit6(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit6(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { combineLatestEmit6(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { combineLatestEmit6(o, n, mapping, &s, &s.V6, 32) }) } } type combineLatestState6[T1, T2, T3, T4, T5, T6 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -78,29 +49,38 @@ func combineLatestEmit6[T1, T2, T3, T4, T5, T6, R, X any]( s *combineLatestState6[T1, T2, T3, T4, T5, T6], v *X, bit uint8, -) bool { +) { const FullBits = 63 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try61(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try61(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest7.go b/combinelatest7.go index 5d0d458d..329e03a9 100644 --- a/combinelatest7.go +++ b/combinelatest7.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest7 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -14,58 +16,24 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - - c.Go(func() { - var s combineLatestState7[T1, T2, T3, T4, T5, T6, T7] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = combineLatestEmit7(o, n, mapping, &s, &s.V7, 64) - } - } - }) + var s combineLatestState7[T1, T2, T3, T4, T5, T6, T7] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit7(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit7(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit7(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit7(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { combineLatestEmit7(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { combineLatestEmit7(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { combineLatestEmit7(o, n, mapping, &s, &s.V7, 64) }) } } type combineLatestState7[T1, T2, T3, T4, T5, T6, T7 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -84,29 +52,38 @@ func combineLatestEmit7[T1, T2, T3, T4, T5, T6, T7, R, X any]( s *combineLatestState7[T1, T2, T3, T4, T5, T6, T7], v *X, bit uint8, -) bool { +) { const FullBits = 127 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try71(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try71(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest8.go b/combinelatest8.go index e8573bb4..42e6c02d 100644 --- a/combinelatest8.go +++ b/combinelatest8.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest8 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -15,62 +17,25 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - - c.Go(func() { - var s combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V7, 64) - case n := <-chan8: - cont = combineLatestEmit8(o, n, mapping, &s, &s.V8, 128) - } - } - }) + var s combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit8(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit8(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit8(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit8(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { combineLatestEmit8(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { combineLatestEmit8(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { combineLatestEmit8(o, n, mapping, &s, &s.V7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { combineLatestEmit8(o, n, mapping, &s, &s.V8, 128) }) } } type combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8 any] struct { + sync.Mutex + NBits, CBits uint8 V1 T1 @@ -90,29 +55,38 @@ func combineLatestEmit8[T1, T2, T3, T4, T5, T6, T7, T8, R, X any]( s *combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8], v *X, bit uint8, -) bool { +) { const FullBits = 255 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try81(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try81(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/combinelatest9.go b/combinelatest9.go index 818fa2a7..3fc38b96 100644 --- a/combinelatest9.go +++ b/combinelatest9.go @@ -1,5 +1,7 @@ package rx +import "sync" + // CombineLatest9 combines multiple Observables to create an Observable // that emits mappings of the latest values emitted by each of its input // Observables. @@ -16,66 +18,26 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - chan9 := make(chan Notification[T9]) - - c.Go(func() { - var s combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V7, 64) - case n := <-chan8: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V8, 128) - case n := <-chan9: - cont = combineLatestEmit9(o, n, mapping, &s, &s.V9, 256) - } - } - }) + var s combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) && - subscribeChannel(c, ob9, chan9, noop) + ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit9(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit9(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit9(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit9(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { combineLatestEmit9(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { combineLatestEmit9(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { combineLatestEmit9(o, n, mapping, &s, &s.V7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { combineLatestEmit9(o, n, mapping, &s, &s.V8, 128) }) && + ob9.satcc(c, func(n Notification[T9]) { combineLatestEmit9(o, n, mapping, &s, &s.V9, 256) }) } } type combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9 any] struct { + sync.Mutex + NBits, CBits uint16 V1 T1 @@ -96,29 +58,38 @@ func combineLatestEmit9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R, X any]( s *combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9], v *X, bit uint16, -) bool { +) { const FullBits = 511 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits { - oops := func() { o.Error(ErrOops) } - v := Try91(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.V9, oops) - Try1(o, Next(v), oops) + if nbits == FullBits { + v := Try91(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.V9, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: - if s.CBits |= bit; s.CBits == FullBits { + s.Lock() + cbits := s.CBits + cbits |= bit + s.CBits = cbits + s.Unlock() + + if cbits == FullBits { o.Complete() - return false } } - - return true } diff --git a/observable.go b/observable.go index 9a476d30..c1e37e30 100644 --- a/observable.go +++ b/observable.go @@ -31,6 +31,9 @@ package rx // multiple Observables, most of them can do things concurrently. type Observable[T any] func(c Context, o Observer[T]) +// NewObservable creates an Observable from f. +func NewObservable[T any](f func(c Context, o Observer[T])) Observable[T] { return f } + // Subscribe invokes an execution of an Observable. // // If ob panics and c.PanicHandler is not nil, Subscribe calls c.PanicHandler @@ -53,7 +56,15 @@ func (ob Observable[T]) Subscribe(c Context, o Observer[T]) { ob(c, o) } -// NewObservable creates an Observable from f. -func NewObservable[T any](f func(c Context, o Observer[T])) Observable[T] { - return f +// satcc is short for Subscribe and Test Context Cancellation. +func (ob Observable[T]) satcc(c Context, o Observer[T]) bool { + ob.Subscribe(c, o) + + select { + default: + case <-c.Done(): + return false + } + + return true } diff --git a/util.go b/util.go index 026638a1..ef56428f 100644 --- a/util.go +++ b/util.go @@ -43,14 +43,3 @@ func channelObserver[T any](ch chan<- Notification[T], noop <-chan struct{}) Obs } } } - -func subscribeChannel[T any](c Context, ob Observable[T], ch chan<- Notification[T], noop <-chan struct{}) bool { - ob.Subscribe(c, channelObserver(ch, noop)) - - select { - case <-noop: - return false - default: - return true - } -} diff --git a/withlatestfrom1.go b/withlatestfrom1.go index db9e8bd8..d6f5a59e 100644 --- a/withlatestfrom1.go +++ b/withlatestfrom1.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom1 combines the source with another Observable to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -20,38 +22,19 @@ func withLatestFrom2[T1, T2, R any]( mapping func(v1 T1, v2 T2) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - - c.Go(func() { - var s withLatestFromState2[T1, T2] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit2(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit2(o, n, mapping, &s, &s.V2, 2) - } - } - }) + var s withLatestFromState2[T1, T2] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit2(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit2(o, n, mapping, &s, &s.V2, 2) }) } } type withLatestFromState2[T1, T2 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -65,29 +48,32 @@ func withLatestFromEmit2[T1, T2, R, X any]( s *withLatestFromState2[T1, T2], v *X, bit uint8, -) bool { +) { const FullBits = 3 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try21(mapping, s.V1, s.V2, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try21(mapping, s.V1, s.V2, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom2.go b/withlatestfrom2.go index c1068432..c7444438 100644 --- a/withlatestfrom2.go +++ b/withlatestfrom2.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom2 combines the source with 2 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -22,42 +24,20 @@ func withLatestFrom3[T1, T2, T3, R any]( mapping func(v1 T1, v2 T2, v3 T3) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - - c.Go(func() { - var s withLatestFromState3[T1, T2, T3] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit3(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit3(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit3(o, n, mapping, &s, &s.V3, 4) - } - } - }) + var s withLatestFromState3[T1, T2, T3] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit3(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit3(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit3(o, n, mapping, &s, &s.V3, 4) }) } } type withLatestFromState3[T1, T2, T3 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -72,29 +52,32 @@ func withLatestFromEmit3[T1, T2, T3, R, X any]( s *withLatestFromState3[T1, T2, T3], v *X, bit uint8, -) bool { +) { const FullBits = 7 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try31(mapping, s.V1, s.V2, s.V3, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try31(mapping, s.V1, s.V2, s.V3, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom3.go b/withlatestfrom3.go index 31c899e8..63af0ab9 100644 --- a/withlatestfrom3.go +++ b/withlatestfrom3.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom3 combines the source with 3 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -24,46 +26,21 @@ func withLatestFrom4[T1, T2, T3, T4, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - - c.Go(func() { - var s withLatestFromState4[T1, T2, T3, T4] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit4(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit4(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit4(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit4(o, n, mapping, &s, &s.V4, 8) - } - } - }) + var s withLatestFromState4[T1, T2, T3, T4] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit4(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit4(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit4(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit4(o, n, mapping, &s, &s.V4, 8) }) } } type withLatestFromState4[T1, T2, T3, T4 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -79,29 +56,32 @@ func withLatestFromEmit4[T1, T2, T3, T4, R, X any]( s *withLatestFromState4[T1, T2, T3, T4], v *X, bit uint8, -) bool { +) { const FullBits = 15 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom4.go b/withlatestfrom4.go index f55862a6..a1d36f05 100644 --- a/withlatestfrom4.go +++ b/withlatestfrom4.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom4 combines the source with 4 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -26,50 +28,22 @@ func withLatestFrom5[T1, T2, T3, T4, T5, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - - c.Go(func() { - var s withLatestFromState5[T1, T2, T3, T4, T5] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit5(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit5(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit5(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit5(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = withLatestFromEmit5(o, n, mapping, &s, &s.V5, 16) - } - } - }) + var s withLatestFromState5[T1, T2, T3, T4, T5] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit5(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit5(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit5(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit5(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { withLatestFromEmit5(o, n, mapping, &s, &s.V5, 16) }) } } type withLatestFromState5[T1, T2, T3, T4, T5 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -86,29 +60,32 @@ func withLatestFromEmit5[T1, T2, T3, T4, T5, R, X any]( s *withLatestFromState5[T1, T2, T3, T4, T5], v *X, bit uint8, -) bool { +) { const FullBits = 31 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try51(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try51(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom5.go b/withlatestfrom5.go index a45ca8de..0bd5ab64 100644 --- a/withlatestfrom5.go +++ b/withlatestfrom5.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom5 combines the source with 5 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -28,54 +30,23 @@ func withLatestFrom6[T1, T2, T3, T4, T5, T6, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - - c.Go(func() { - var s withLatestFromState6[T1, T2, T3, T4, T5, T6] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = withLatestFromEmit6(o, n, mapping, &s, &s.V6, 32) - } - } - }) + var s withLatestFromState6[T1, T2, T3, T4, T5, T6] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit6(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit6(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit6(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit6(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { withLatestFromEmit6(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { withLatestFromEmit6(o, n, mapping, &s, &s.V6, 32) }) } } type withLatestFromState6[T1, T2, T3, T4, T5, T6 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -93,29 +64,32 @@ func withLatestFromEmit6[T1, T2, T3, T4, T5, T6, R, X any]( s *withLatestFromState6[T1, T2, T3, T4, T5, T6], v *X, bit uint8, -) bool { +) { const FullBits = 63 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try61(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try61(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom6.go b/withlatestfrom6.go index e79e7d01..f49184a4 100644 --- a/withlatestfrom6.go +++ b/withlatestfrom6.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom6 combines the source with 6 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -30,58 +32,24 @@ func withLatestFrom7[T1, T2, T3, T4, T5, T6, T7, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - - c.Go(func() { - var s withLatestFromState7[T1, T2, T3, T4, T5, T6, T7] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = withLatestFromEmit7(o, n, mapping, &s, &s.V7, 64) - } - } - }) + var s withLatestFromState7[T1, T2, T3, T4, T5, T6, T7] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit7(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit7(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit7(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit7(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { withLatestFromEmit7(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { withLatestFromEmit7(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { withLatestFromEmit7(o, n, mapping, &s, &s.V7, 64) }) } } type withLatestFromState7[T1, T2, T3, T4, T5, T6, T7 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -100,29 +68,32 @@ func withLatestFromEmit7[T1, T2, T3, T4, T5, T6, T7, R, X any]( s *withLatestFromState7[T1, T2, T3, T4, T5, T6, T7], v *X, bit uint8, -) bool { +) { const FullBits = 127 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try71(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try71(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom7.go b/withlatestfrom7.go index 1420f2a2..e9ac127a 100644 --- a/withlatestfrom7.go +++ b/withlatestfrom7.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom7 combines the source with 7 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -32,62 +34,25 @@ func withLatestFrom8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - - c.Go(func() { - var s withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V7, 64) - case n := <-chan8: - cont = withLatestFromEmit8(o, n, mapping, &s, &s.V8, 128) - } - } - }) + var s withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit8(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit8(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit8(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit8(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { withLatestFromEmit8(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { withLatestFromEmit8(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { withLatestFromEmit8(o, n, mapping, &s, &s.V7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { withLatestFromEmit8(o, n, mapping, &s, &s.V8, 128) }) } } type withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8 any] struct { + sync.Mutex + NBits uint8 V1 T1 @@ -107,29 +72,32 @@ func withLatestFromEmit8[T1, T2, T3, T4, T5, T6, T7, T8, R, X any]( s *withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8], v *X, bit uint8, -) bool { +) { const FullBits = 255 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try81(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try81(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/withlatestfrom8.go b/withlatestfrom8.go index 3aba36a3..684e0dd8 100644 --- a/withlatestfrom8.go +++ b/withlatestfrom8.go @@ -1,5 +1,7 @@ package rx +import "sync" + // WithLatestFrom8 combines the source with 8 other Observables to create // an Observable that emits mappings of the latest values emitted by each // Observable, only when the source emits. @@ -34,66 +36,26 @@ func withLatestFrom9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - chan9 := make(chan Notification[T9]) - - c.Go(func() { - var s withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] - - cont := true + c, o = Serialize(c, o) - for cont { - select { - case n := <-chan1: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V1, 1) - case n := <-chan2: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V2, 2) - case n := <-chan3: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V3, 4) - case n := <-chan4: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V4, 8) - case n := <-chan5: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V5, 16) - case n := <-chan6: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V6, 32) - case n := <-chan7: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V7, 64) - case n := <-chan8: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V8, 128) - case n := <-chan9: - cont = withLatestFromEmit9(o, n, mapping, &s, &s.V9, 256) - } - } - }) + var s withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) && - subscribeChannel(c, ob9, chan9, noop) + ob1.satcc(c, func(n Notification[T1]) { withLatestFromEmit9(o, n, mapping, &s, &s.V1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { withLatestFromEmit9(o, n, mapping, &s, &s.V2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { withLatestFromEmit9(o, n, mapping, &s, &s.V3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { withLatestFromEmit9(o, n, mapping, &s, &s.V4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { withLatestFromEmit9(o, n, mapping, &s, &s.V5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { withLatestFromEmit9(o, n, mapping, &s, &s.V6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { withLatestFromEmit9(o, n, mapping, &s, &s.V7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { withLatestFromEmit9(o, n, mapping, &s, &s.V8, 128) }) && + ob9.satcc(c, func(n Notification[T9]) { withLatestFromEmit9(o, n, mapping, &s, &s.V9, 256) }) } } type withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9 any] struct { + sync.Mutex + NBits uint16 V1 T1 @@ -114,29 +76,32 @@ func withLatestFromEmit9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R, X any]( s *withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9], v *X, bit uint16, -) bool { +) { const FullBits = 511 switch n.Kind { case KindNext: + s.Lock() *v = n.Value + nbits := s.NBits + nbits |= bit + s.NBits = nbits - if s.NBits |= bit; s.NBits == FullBits && bit == 1 { - oops := func() { o.Error(ErrOops) } - v := Try91(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.V9, oops) - Try1(o, Next(v), oops) + if nbits == FullBits && bit == 1 { + v := Try91(mapping, s.V1, s.V2, s.V3, s.V4, s.V5, s.V6, s.V7, s.V8, s.V9, s.Unlock) + s.Unlock() + o.Next(v) + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: if bit == 1 { o.Complete() - return false } } - - return true } diff --git a/zipwithbuffering2.go b/zipwithbuffering2.go index 58937218..e3a7d4cd 100644 --- a/zipwithbuffering2.go +++ b/zipwithbuffering2.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering2 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -14,38 +18,19 @@ func ZipWithBuffering2[T1, T2, R any]( mapping func(v1 T1, v2 T2) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - - c.Go(func() { - var s zipState2[T1, T2] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit2(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit2(o, n, mapping, &s, &s.Q2, 2) - } - } - }) + c, o = Serialize(c, o) + + var s zipState2[T1, T2] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit2(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit2(o, n, mapping, &s, &s.Q2, 2) }) } } type zipState2[T1, T2 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -59,45 +44,52 @@ func zipEmit2[T1, T2, R, X any]( s *zipState2[T1, T2], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 3 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try21( mapping, zipPop2(s, &s.Q1, 1, &complete), zipPop2(s, &s.Q2, 2, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop2[T1, T2, X any]( diff --git a/zipwithbuffering3.go b/zipwithbuffering3.go index d3979d94..24cc42b1 100644 --- a/zipwithbuffering3.go +++ b/zipwithbuffering3.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering3 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -15,42 +19,20 @@ func ZipWithBuffering3[T1, T2, T3, R any]( mapping func(v1 T1, v2 T2, v3 T3) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - - c.Go(func() { - var s zipState3[T1, T2, T3] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit3(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit3(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit3(o, n, mapping, &s, &s.Q3, 4) - } - } - }) + c, o = Serialize(c, o) + + var s zipState3[T1, T2, T3] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit3(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit3(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit3(o, n, mapping, &s, &s.Q3, 4) }) } } type zipState3[T1, T2, T3 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -65,46 +47,53 @@ func zipEmit3[T1, T2, T3, R, X any]( s *zipState3[T1, T2, T3], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 7 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try31( mapping, zipPop3(s, &s.Q1, 1, &complete), zipPop3(s, &s.Q2, 2, &complete), zipPop3(s, &s.Q3, 4, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop3[T1, T2, T3, X any]( diff --git a/zipwithbuffering4.go b/zipwithbuffering4.go index ae4aed33..ea930827 100644 --- a/zipwithbuffering4.go +++ b/zipwithbuffering4.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering4 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -16,46 +20,21 @@ func ZipWithBuffering4[T1, T2, T3, T4, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - - c.Go(func() { - var s zipState4[T1, T2, T3, T4] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit4(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit4(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit4(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit4(o, n, mapping, &s, &s.Q4, 8) - } - } - }) + c, o = Serialize(c, o) + + var s zipState4[T1, T2, T3, T4] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit4(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit4(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit4(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit4(o, n, mapping, &s, &s.Q4, 8) }) } } type zipState4[T1, T2, T3, T4 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -71,47 +50,54 @@ func zipEmit4[T1, T2, T3, T4, R, X any]( s *zipState4[T1, T2, T3, T4], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 15 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try41( mapping, zipPop4(s, &s.Q1, 1, &complete), zipPop4(s, &s.Q2, 2, &complete), zipPop4(s, &s.Q3, 4, &complete), zipPop4(s, &s.Q4, 8, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop4[T1, T2, T3, T4, X any]( diff --git a/zipwithbuffering5.go b/zipwithbuffering5.go index 3af241fe..cf6a9496 100644 --- a/zipwithbuffering5.go +++ b/zipwithbuffering5.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering5 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -17,50 +21,22 @@ func ZipWithBuffering5[T1, T2, T3, T4, T5, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - - c.Go(func() { - var s zipState5[T1, T2, T3, T4, T5] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit5(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit5(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit5(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit5(o, n, mapping, &s, &s.Q4, 8) - case n := <-chan5: - cont = zipEmit5(o, n, mapping, &s, &s.Q5, 16) - } - } - }) + c, o = Serialize(c, o) + + var s zipState5[T1, T2, T3, T4, T5] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit5(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit5(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit5(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit5(o, n, mapping, &s, &s.Q4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { zipEmit5(o, n, mapping, &s, &s.Q5, 16) }) } } type zipState5[T1, T2, T3, T4, T5 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -77,17 +53,21 @@ func zipEmit5[T1, T2, T3, T4, T5, R, X any]( s *zipState5[T1, T2, T3, T4, T5], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 31 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try51( mapping, zipPop5(s, &s.Q1, 1, &complete), @@ -95,30 +75,33 @@ func zipEmit5[T1, T2, T3, T4, T5, R, X any]( zipPop5(s, &s.Q3, 4, &complete), zipPop5(s, &s.Q4, 8, &complete), zipPop5(s, &s.Q5, 16, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop5[T1, T2, T3, T4, T5, X any]( diff --git a/zipwithbuffering6.go b/zipwithbuffering6.go index c4eeca19..f05b1e9c 100644 --- a/zipwithbuffering6.go +++ b/zipwithbuffering6.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering6 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -18,54 +22,23 @@ func ZipWithBuffering6[T1, T2, T3, T4, T5, T6, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - - c.Go(func() { - var s zipState6[T1, T2, T3, T4, T5, T6] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit6(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit6(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit6(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit6(o, n, mapping, &s, &s.Q4, 8) - case n := <-chan5: - cont = zipEmit6(o, n, mapping, &s, &s.Q5, 16) - case n := <-chan6: - cont = zipEmit6(o, n, mapping, &s, &s.Q6, 32) - } - } - }) + c, o = Serialize(c, o) + + var s zipState6[T1, T2, T3, T4, T5, T6] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit6(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit6(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit6(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit6(o, n, mapping, &s, &s.Q4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { zipEmit6(o, n, mapping, &s, &s.Q5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { zipEmit6(o, n, mapping, &s, &s.Q6, 32) }) } } type zipState6[T1, T2, T3, T4, T5, T6 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -83,17 +56,21 @@ func zipEmit6[T1, T2, T3, T4, T5, T6, R, X any]( s *zipState6[T1, T2, T3, T4, T5, T6], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 63 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try61( mapping, zipPop6(s, &s.Q1, 1, &complete), @@ -102,30 +79,33 @@ func zipEmit6[T1, T2, T3, T4, T5, T6, R, X any]( zipPop6(s, &s.Q4, 8, &complete), zipPop6(s, &s.Q5, 16, &complete), zipPop6(s, &s.Q6, 32, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop6[T1, T2, T3, T4, T5, T6, X any]( diff --git a/zipwithbuffering7.go b/zipwithbuffering7.go index d7839903..0a31e4d9 100644 --- a/zipwithbuffering7.go +++ b/zipwithbuffering7.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering7 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -19,58 +23,24 @@ func ZipWithBuffering7[T1, T2, T3, T4, T5, T6, T7, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - - c.Go(func() { - var s zipState7[T1, T2, T3, T4, T5, T6, T7] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit7(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit7(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit7(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit7(o, n, mapping, &s, &s.Q4, 8) - case n := <-chan5: - cont = zipEmit7(o, n, mapping, &s, &s.Q5, 16) - case n := <-chan6: - cont = zipEmit7(o, n, mapping, &s, &s.Q6, 32) - case n := <-chan7: - cont = zipEmit7(o, n, mapping, &s, &s.Q7, 64) - } - } - }) + c, o = Serialize(c, o) + + var s zipState7[T1, T2, T3, T4, T5, T6, T7] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit7(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit7(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit7(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit7(o, n, mapping, &s, &s.Q4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { zipEmit7(o, n, mapping, &s, &s.Q5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { zipEmit7(o, n, mapping, &s, &s.Q6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { zipEmit7(o, n, mapping, &s, &s.Q7, 64) }) } } type zipState7[T1, T2, T3, T4, T5, T6, T7 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -89,17 +59,21 @@ func zipEmit7[T1, T2, T3, T4, T5, T6, T7, R, X any]( s *zipState7[T1, T2, T3, T4, T5, T6, T7], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 127 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try71( mapping, zipPop7(s, &s.Q1, 1, &complete), @@ -109,30 +83,33 @@ func zipEmit7[T1, T2, T3, T4, T5, T6, T7, R, X any]( zipPop7(s, &s.Q5, 16, &complete), zipPop7(s, &s.Q6, 32, &complete), zipPop7(s, &s.Q7, 64, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop7[T1, T2, T3, T4, T5, T6, T7, X any]( diff --git a/zipwithbuffering8.go b/zipwithbuffering8.go index ed7e0992..11c5d363 100644 --- a/zipwithbuffering8.go +++ b/zipwithbuffering8.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering8 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -20,62 +24,25 @@ func ZipWithBuffering8[T1, T2, T3, T4, T5, T6, T7, T8, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - - c.Go(func() { - var s zipState8[T1, T2, T3, T4, T5, T6, T7, T8] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit8(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit8(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit8(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit8(o, n, mapping, &s, &s.Q4, 8) - case n := <-chan5: - cont = zipEmit8(o, n, mapping, &s, &s.Q5, 16) - case n := <-chan6: - cont = zipEmit8(o, n, mapping, &s, &s.Q6, 32) - case n := <-chan7: - cont = zipEmit8(o, n, mapping, &s, &s.Q7, 64) - case n := <-chan8: - cont = zipEmit8(o, n, mapping, &s, &s.Q8, 128) - } - } - }) + c, o = Serialize(c, o) + + var s zipState8[T1, T2, T3, T4, T5, T6, T7, T8] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit8(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit8(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit8(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit8(o, n, mapping, &s, &s.Q4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { zipEmit8(o, n, mapping, &s, &s.Q5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { zipEmit8(o, n, mapping, &s, &s.Q6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { zipEmit8(o, n, mapping, &s, &s.Q7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { zipEmit8(o, n, mapping, &s, &s.Q8, 128) }) } } type zipState8[T1, T2, T3, T4, T5, T6, T7, T8 any] struct { + sync.Mutex + NBits, CBits uint8 Q1 queue.Queue[T1] @@ -95,17 +62,21 @@ func zipEmit8[T1, T2, T3, T4, T5, T6, T7, T8, R, X any]( s *zipState8[T1, T2, T3, T4, T5, T6, T7, T8], q *queue.Queue[X], bit uint8, -) bool { +) { const FullBits = 255 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try81( mapping, zipPop8(s, &s.Q1, 1, &complete), @@ -116,30 +87,33 @@ func zipEmit8[T1, T2, T3, T4, T5, T6, T7, T8, R, X any]( zipPop8(s, &s.Q6, 32, &complete), zipPop8(s, &s.Q7, 64, &complete), zipPop8(s, &s.Q8, 128, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop8[T1, T2, T3, T4, T5, T6, T7, T8, X any]( diff --git a/zipwithbuffering9.go b/zipwithbuffering9.go index af95e2ad..acfd560e 100644 --- a/zipwithbuffering9.go +++ b/zipwithbuffering9.go @@ -1,6 +1,10 @@ package rx -import "github.com/b97tsk/rx/internal/queue" +import ( + "sync" + + "github.com/b97tsk/rx/internal/queue" +) // ZipWithBuffering9 combines multiple Observables to create an Observable that // emits mappings of the values emitted by each of its input Observables. @@ -21,66 +25,26 @@ func ZipWithBuffering9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any]( mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R, ) Observable[R] { return func(c Context, o Observer[R]) { - c, cancel := c.WithCancel() - noop := make(chan struct{}) - o = o.DoOnTermination(func() { - cancel() - close(noop) - }) - - chan1 := make(chan Notification[T1]) - chan2 := make(chan Notification[T2]) - chan3 := make(chan Notification[T3]) - chan4 := make(chan Notification[T4]) - chan5 := make(chan Notification[T5]) - chan6 := make(chan Notification[T6]) - chan7 := make(chan Notification[T7]) - chan8 := make(chan Notification[T8]) - chan9 := make(chan Notification[T9]) - - c.Go(func() { - var s zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] - - cont := true - - for cont { - select { - case n := <-chan1: - cont = zipEmit9(o, n, mapping, &s, &s.Q1, 1) - case n := <-chan2: - cont = zipEmit9(o, n, mapping, &s, &s.Q2, 2) - case n := <-chan3: - cont = zipEmit9(o, n, mapping, &s, &s.Q3, 4) - case n := <-chan4: - cont = zipEmit9(o, n, mapping, &s, &s.Q4, 8) - case n := <-chan5: - cont = zipEmit9(o, n, mapping, &s, &s.Q5, 16) - case n := <-chan6: - cont = zipEmit9(o, n, mapping, &s, &s.Q6, 32) - case n := <-chan7: - cont = zipEmit9(o, n, mapping, &s, &s.Q7, 64) - case n := <-chan8: - cont = zipEmit9(o, n, mapping, &s, &s.Q8, 128) - case n := <-chan9: - cont = zipEmit9(o, n, mapping, &s, &s.Q9, 256) - } - } - }) + c, o = Serialize(c, o) + + var s zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9] _ = true && - subscribeChannel(c, ob1, chan1, noop) && - subscribeChannel(c, ob2, chan2, noop) && - subscribeChannel(c, ob3, chan3, noop) && - subscribeChannel(c, ob4, chan4, noop) && - subscribeChannel(c, ob5, chan5, noop) && - subscribeChannel(c, ob6, chan6, noop) && - subscribeChannel(c, ob7, chan7, noop) && - subscribeChannel(c, ob8, chan8, noop) && - subscribeChannel(c, ob9, chan9, noop) + ob1.satcc(c, func(n Notification[T1]) { zipEmit9(o, n, mapping, &s, &s.Q1, 1) }) && + ob2.satcc(c, func(n Notification[T2]) { zipEmit9(o, n, mapping, &s, &s.Q2, 2) }) && + ob3.satcc(c, func(n Notification[T3]) { zipEmit9(o, n, mapping, &s, &s.Q3, 4) }) && + ob4.satcc(c, func(n Notification[T4]) { zipEmit9(o, n, mapping, &s, &s.Q4, 8) }) && + ob5.satcc(c, func(n Notification[T5]) { zipEmit9(o, n, mapping, &s, &s.Q5, 16) }) && + ob6.satcc(c, func(n Notification[T6]) { zipEmit9(o, n, mapping, &s, &s.Q6, 32) }) && + ob7.satcc(c, func(n Notification[T7]) { zipEmit9(o, n, mapping, &s, &s.Q7, 64) }) && + ob8.satcc(c, func(n Notification[T8]) { zipEmit9(o, n, mapping, &s, &s.Q8, 128) }) && + ob9.satcc(c, func(n Notification[T9]) { zipEmit9(o, n, mapping, &s, &s.Q9, 256) }) } } type zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9 any] struct { + sync.Mutex + NBits, CBits uint16 Q1 queue.Queue[T1] @@ -101,17 +65,21 @@ func zipEmit9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R, X any]( s *zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9], q *queue.Queue[X], bit uint16, -) bool { +) { const FullBits = 511 switch n.Kind { case KindNext: + s.Lock() q.Push(n.Value) - if s.NBits |= bit; s.NBits == FullBits { + nbits := s.NBits + nbits |= bit + s.NBits = nbits + + if nbits == FullBits { var complete bool - oops := func() { o.Error(ErrOops) } v := Try91( mapping, zipPop9(s, &s.Q1, 1, &complete), @@ -123,30 +91,33 @@ func zipEmit9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R, X any]( zipPop9(s, &s.Q7, 64, &complete), zipPop9(s, &s.Q8, 128, &complete), zipPop9(s, &s.Q9, 256, &complete), - oops, + s.Unlock, ) - Try1(o, Next(v), oops) + s.Unlock() + o.Next(v) if complete { o.Complete() - return false } + + return } + s.Unlock() + case KindError: o.Error(n.Error) - return false case KindComplete: + s.Lock() s.CBits |= bit + complete := q.Len() == 0 + s.Unlock() - if q.Len() == 0 { + if complete { o.Complete() - return false } } - - return true } func zipPop9[T1, T2, T3, T4, T5, T6, T7, T8, T9, X any](