From a5c7faa707c745ef2d61ac6d4c4edb5a893d78dd Mon Sep 17 00:00:00 2001 From: b97tsk Date: Mon, 11 Dec 2023 08:00:00 +0800 Subject: [PATCH] update(*): minor changes --- audit.go | 5 +++-- concat.go | 18 ++++++++++++------ debounce.go | 5 +++-- delay.go | 5 +++-- exhaust.go | 5 +++-- sample.go | 5 +++-- skipuntil.go | 16 ++++------------ switch.go | 5 +++-- takeuntil.go | 16 ++++------------ throttle.go | 5 +++-- 10 files changed, 41 insertions(+), 44 deletions(-) diff --git a/audit.go b/audit.go index 10ccf652..1e66e668 100644 --- a/audit.go +++ b/audit.go @@ -68,6 +68,7 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T] startWorker := func(v T) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -122,12 +123,12 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T] } case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) } diff --git a/concat.go b/concat.go index a0e7ab76..ea06a013 100644 --- a/concat.go +++ b/concat.go @@ -129,7 +129,11 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe x.Queue.Unlock() worker, cancelWorker := context.WithCancel(source) - x.Context.Store(worker) + + if !x.Context.CompareAndSwap(source, worker) { // This fails if x.Context was swapped to sentinel. + cancelWorker() + return + } x.Worker.Add(1) @@ -161,9 +165,9 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe } case KindComplete: - if x.Queue.Len() == 0 { - swapped := x.Context.CompareAndSwap(worker, source) + swapped := x.Context.CompareAndSwap(worker, source) + if x.Queue.Len() == 0 { x.Queue.Unlock() if swapped && x.Complete.Load() && x.Context.CompareAndSwap(source, sentinel) { @@ -173,7 +177,9 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe break } - startWorker() + if swapped { + startWorker() + } } cancelWorker() @@ -200,12 +206,12 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe x.Queue.Unlock() case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink.Error(n.Error) } diff --git a/debounce.go b/debounce.go index c388771b..4fe6afc4 100644 --- a/debounce.go +++ b/debounce.go @@ -66,6 +66,7 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer startWorker := func(v T) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -127,12 +128,12 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer startWorker(n.Value) case KindError, KindComplete: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { if n.Kind == KindComplete && x.Latest.HasValue.Load() { sink.Next(x.Latest.Value) } diff --git a/delay.go b/delay.go index 259e19e1..05dea0e4 100644 --- a/delay.go +++ b/delay.go @@ -47,6 +47,7 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) { startWorker = func(timeout time.Duration) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -141,12 +142,12 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) { } case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) } diff --git a/exhaust.go b/exhaust.go index f9a0c54a..254b0fbc 100644 --- a/exhaust.go +++ b/exhaust.go @@ -61,6 +61,7 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ startWorker := func(v T) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -96,12 +97,12 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ } case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink.Error(n.Error) } diff --git a/sample.go b/sample.go index a2846f4e..e7e7e1bc 100644 --- a/sample.go +++ b/sample.go @@ -47,6 +47,7 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -79,12 +80,12 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T } finish := func(n Notification[T]) { - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) } } diff --git a/skipuntil.go b/skipuntil.go index 7cf12f54..171fb433 100644 --- a/skipuntil.go +++ b/skipuntil.go @@ -2,7 +2,6 @@ package rx import ( "context" - "sync" "sync/atomic" ) @@ -28,16 +27,12 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe var x struct { Context atomic.Value - Worker struct { - sync.WaitGroup - } } { worker, cancelWorker := context.WithCancel(source) - x.Context.Store(worker) - x.Worker.Add(1) + x.Context.Store(worker) var noop bool @@ -52,7 +47,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe switch n.Kind { case KindNext: - x.Context.Store(source) + x.Context.CompareAndSwap(worker, source) case KindError: if x.Context.CompareAndSwap(worker, sentinel) { @@ -62,18 +57,15 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe case KindComplete: break } - - x.Worker.Done() }) } finish := func(n Notification[T]) { - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() - x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) } } diff --git a/switch.go b/switch.go index d4ebd069..674ccebe 100644 --- a/switch.go +++ b/switch.go @@ -61,6 +61,7 @@ func (obs switchMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe startWorker := func(v T) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -106,12 +107,12 @@ func (obs switchMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe startWorker(n.Value) case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink.Error(n.Error) } diff --git a/takeuntil.go b/takeuntil.go index a03d425d..7fb0c46a 100644 --- a/takeuntil.go +++ b/takeuntil.go @@ -32,16 +32,12 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe sync.Mutex sync.WaitGroup } - Worker struct { - sync.WaitGroup - } } { worker, cancelWorker := context.WithCancel(source) - x.Context.Store(worker) - x.Worker.Add(1) + x.Context.Store(worker) var noop bool @@ -57,9 +53,8 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe switch n.Kind { case KindNext, KindError: if x.Context.CompareAndSwap(worker, sentinel) { - x.Worker.Done() - cancelSource() + x.Source.Lock() x.Source.Wait() x.Source.Unlock() @@ -74,8 +69,6 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe return } } - - x.Worker.Done() }) } @@ -84,12 +77,11 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe x.Source.Unlock() finish := func(n Notification[T]) { - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() - x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) } diff --git a/throttle.go b/throttle.go index 03fd3af9..db5fecf7 100644 --- a/throttle.go +++ b/throttle.go @@ -104,6 +104,7 @@ func (obs throttleObservable[T, U]) Subscribe(ctx context.Context, sink Observer doThrottle = func(v T) { worker, cancelWorker := context.WithCancel(source) + x.Context.Store(worker) x.Worker.Add(1) @@ -170,12 +171,12 @@ func (obs throttleObservable[T, U]) Subscribe(ctx context.Context, sink Observer } case KindError: - ctx := x.Context.Swap(source) + ctx := x.Context.Swap(sentinel) cancelSource() x.Worker.Wait() - if x.Context.Swap(sentinel) != sentinel && ctx != sentinel { + if ctx != sentinel { sink(n) }