Skip to content

Commit

Permalink
update(*): rename variables and fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Oct 6, 2023
1 parent 7ef9474 commit 014b264
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 106 deletions.
17 changes: 6 additions & 11 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type auditObservable[T, U any] struct {
}

func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Expand All @@ -61,18 +61,15 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]
}
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

x.Context.Store(source)

startWorker := func(v T) {
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

var noop bool
Expand All @@ -84,7 +81,7 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]

noop = true

cancel()
cancelWorker()

switch {
case n.HasValue:
Expand Down Expand Up @@ -127,10 +124,8 @@ func (obs auditObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]
case n.HasError:
ctx := x.Context.Swap(source)

if x.Worker.Cancel != nil {
x.Worker.Cancel()
x.Worker.Wait()
}
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
sink(n)
Expand Down
17 changes: 6 additions & 11 deletions concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type concatMapObservable[T, R any] struct {
}

func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observer[R]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Expand All @@ -116,7 +116,6 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
}
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

Expand All @@ -129,11 +128,9 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe

x.Queue.Unlock()

worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

obs.Project(v).Subscribe(worker, func(n Notification[R]) {
Expand Down Expand Up @@ -181,7 +178,7 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
startWorker()
}

cancel()
cancelWorker()
x.Worker.Done()
})
})
Expand All @@ -206,10 +203,8 @@ func (obs concatMapObservable[T, R]) Subscribe(ctx context.Context, sink Observe
case n.HasError:
ctx := x.Context.Swap(source)

if x.Worker.Cancel != nil {
x.Worker.Cancel()
x.Worker.Wait()
}
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
sink.Error(n.Error)
Expand Down
18 changes: 8 additions & 10 deletions debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type debounceObservable[T, U any] struct {
}

func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Expand All @@ -65,13 +65,13 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer
x.Context.Store(source)

startWorker := func(v T) {
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

x.Worker.Cancel = cancelWorker

var noop bool

obs.DurationSelector(v).Subscribe(worker, func(n Notification[U]) {
Expand All @@ -81,7 +81,7 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer

noop = true

cancel()
cancelWorker()

switch {
case n.HasValue:
Expand Down Expand Up @@ -129,10 +129,8 @@ func (obs debounceObservable[T, U]) Subscribe(ctx context.Context, sink Observer
default:
ctx := x.Context.Swap(source)

if x.Worker.Cancel != nil {
x.Worker.Cancel()
x.Worker.Wait()
}
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
if !n.HasError && x.Latest.HasValue.Load() {
Expand Down
17 changes: 6 additions & 11 deletions delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ type delayObservable[T any] struct {
}

func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Expand All @@ -38,7 +38,6 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
}
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

Expand All @@ -47,11 +46,9 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
var startWorker func(time.Duration)

startWorker = func(timeout time.Duration) {
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

Timer(timeout).Subscribe(worker, func(n Notification[time.Time]) {
Expand Down Expand Up @@ -122,7 +119,7 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
break
}

cancel()
cancelWorker()
x.Worker.Done()
})
}
Expand All @@ -146,10 +143,8 @@ func (obs delayObservable[T]) Subscribe(ctx context.Context, sink Observer[T]) {
case n.HasError:
ctx := x.Context.Swap(source)

if x.Worker.Cancel != nil {
x.Worker.Cancel()
x.Worker.Wait()
}
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
sink(n)
Expand Down
17 changes: 6 additions & 11 deletions exhaust.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,24 @@ type exhaustMapObservable[T, R any] struct {
}

func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observer[R]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Complete atomic.Bool
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

x.Context.Store(source)

startWorker := func(v T) {
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

obs.Project(v).Subscribe(worker, func(n Notification[R]) {
Expand All @@ -85,7 +82,7 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ
}
}

cancel()
cancelWorker()
x.Worker.Done()
})
}
Expand All @@ -100,10 +97,8 @@ func (obs exhaustMapObservable[T, R]) Subscribe(ctx context.Context, sink Observ
case n.HasError:
ctx := x.Context.Swap(source)

if x.Worker.Cancel != nil {
x.Worker.Cancel()
x.Worker.Wait()
}
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
sink.Error(n.Error)
Expand Down
13 changes: 5 additions & 8 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type sampleObservable[T, U any] struct {
}

func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Expand All @@ -42,16 +42,13 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T
}
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

{
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

obs.Notifier.Subscribe(worker, func(n Notification[U]) {
Expand All @@ -76,15 +73,15 @@ func (obs sampleObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T
break
}

cancel()
cancelWorker()
x.Worker.Done()
})
}

finish := func(n Notification[T]) {
ctx := x.Context.Swap(source)

x.Worker.Cancel()
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
Expand Down
13 changes: 5 additions & 8 deletions skipuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,21 @@ type skipUntilObservable[T, U any] struct {
}

func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observer[T]) {
source, cancel := context.WithCancel(ctx)
source, cancelSource := context.WithCancel(ctx)

sink = sink.OnLastNotification(cancel)
sink = sink.OnLastNotification(cancelSource)

var x struct {
Context atomic.Value
Worker struct {
sync.WaitGroup
Cancel context.CancelFunc
}
}

{
worker, cancel := context.WithCancel(source)
worker, cancelWorker := context.WithCancel(source)
x.Context.Store(worker)

x.Worker.Cancel = cancel

x.Worker.Add(1)

var noop bool
Expand All @@ -51,7 +48,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe

noop = true

cancel()
cancelWorker()

switch {
case n.HasValue:
Expand All @@ -73,7 +70,7 @@ func (obs skipUntilObservable[T, U]) Subscribe(ctx context.Context, sink Observe
finish := func(n Notification[T]) {
ctx := x.Context.Swap(source)

x.Worker.Cancel()
cancelSource()
x.Worker.Wait()

if x.Context.Swap(sentinel) != sentinel && ctx != sentinel {
Expand Down
Loading

0 comments on commit 014b264

Please sign in to comment.