Skip to content

Commit

Permalink
update(*): minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Dec 11, 2023
1 parent b6d5c93 commit a5c7faa
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 44 deletions.
5 changes: 3 additions & 2 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]

startWorker := func(v T) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -122,12 +123,12 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]
}

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}

Expand Down
18 changes: 12 additions & 6 deletions concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
x.Queue.Unlock()

worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

if !x.Context.CompareAndSwap(source, worker) { // This fails if x.Context was swapped to sentinel.
cancelWorker()
return
}

x.Worker.Add(1)

Expand Down Expand Up @@ -161,9 +165,9 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
}

case KindComplete:
if x.Queue.Len() == 0 {
swapped := x.Context.CompareAndSwap(worker, source)
swapped := x.Context.CompareAndSwap(worker, source)

if x.Queue.Len() == 0 {
x.Queue.Unlock()

if swapped && x.Complete.Load() && x.Context.CompareAndSwap(source, sentinel) {
Expand All @@ -173,7 +177,9 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
break
}

startWorker()
if swapped {
startWorker()
}
}

cancelWorker()
Expand All @@ -200,12 +206,12 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
x.Queue.Unlock()

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink.Error(n.Error)
}

Expand Down
5 changes: 3 additions & 2 deletions debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer

startWorker := func(v T) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -127,12 +128,12 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer
startWorker(n.Value)

case KindError, KindComplete:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
if n.Kind == KindComplete && x.Latest.HasValue.Load() {
sink.Next(x.Latest.Value)
}
Expand Down
5 changes: 3 additions & 2 deletions delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {

startWorker = func(timeout time.Duration) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -141,12 +142,12 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
}

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}

Expand Down
5 changes: 3 additions & 2 deletions exhaust.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ

startWorker := func(v T) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -96,12 +97,12 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ
}

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink.Error(n.Error)
}

Expand Down
5 changes: 3 additions & 2 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T

{
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -79,12 +80,12 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T
}

finish := func(n Notification[T]) {
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}
}
Expand Down
16 changes: 4 additions & 12 deletions skipuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rx

import (
"context"
"sync"
"sync/atomic"
)

Expand All @@ -28,16 +27,12 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe

var x struct {
Context atomic.Value
Worker struct {
sync.WaitGroup
}
}

{
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Add(1)
x.Context.Store(worker)

var noop bool

Expand All @@ -52,7 +47,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe

switch n.Kind {
case KindNext:
x.Context.Store(source)
x.Context.CompareAndSwap(worker, source)

case KindError:
if x.Context.CompareAndSwap(worker, sentinel) {
Expand All @@ -62,18 +57,15 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
case KindComplete:
break
}

x.Worker.Done()
})
}

finish := func(n Notification[T]) {
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}
}
Expand Down
5 changes: 3 additions & 2 deletions switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (obs switchMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe

startWorker := func(v T) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -106,12 +107,12 @@ func (obs switchMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
startWorker(n.Value)

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink.Error(n.Error)
}

Expand Down
16 changes: 4 additions & 12 deletions takeuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,12 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
sync.Mutex
sync.WaitGroup
}
Worker struct {
sync.WaitGroup
}
}

{
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Add(1)
x.Context.Store(worker)

var noop bool

Expand All @@ -57,9 +53,8 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
switch n.Kind {
case KindNext, KindError:
if x.Context.CompareAndSwap(worker, sentinel) {
x.Worker.Done()

cancelSource()

x.Source.Lock()
x.Source.Wait()
x.Source.Unlock()
Expand All @@ -74,8 +69,6 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
return
}
}

x.Worker.Done()
})
}

Expand All @@ -84,12 +77,11 @@ func (obs takeUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
x.Source.Unlock()

finish := func(n Notification[T]) {
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}

Expand Down
5 changes: 3 additions & 2 deletions throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (obs throttleObservable[T, U]) Subscribe(ctx context.Context, sink Observer

doThrottle = func(v T) {
worker, cancelWorker := context.WithCancel(source)

x.Context.Store(worker)

x.Worker.Add(1)
Expand Down Expand Up @@ -170,12 +171,12 @@ func (obs throttleObservable[T, U]) Subscribe(ctx context.Context, sink Observer
}

case KindError:
ctx := x.Context.Swap(source)
ctx := x.Context.Swap(sentinel)

cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if ctx != sentinel {
sink(n)
}

Expand Down

0 comments on commit a5c7faa

Please sign in to comment.