Skip to content

Commit

Permalink
remove AsOperator methods
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Sep 4, 2023
1 parent 6f50185 commit 3ca3d80
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 62 deletions.
5 changes: 0 additions & 5 deletions buffercount.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ func (op BufferCountOperator[T]) Apply(source Observable[T]) Observable[[]T] {
return bufferCountObservable[T]{source, op.opts}.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op BufferCountOperator[T]) AsOperator() Operator[T, []T] { return op }

type bufferCountObservable[T any] struct {
Source Observable[T]
bufferCountConfig
Expand Down
12 changes: 6 additions & 6 deletions buffercount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,42 @@ func TestBufferCount(t *testing.T) {
NewTestSuite[string](t).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](2).AsOperator(),
rx.BufferCount[string](2),
ToString[[]string](),
),
"[A B]", "[C D]", "[E]", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).AsOperator(),
rx.BufferCount[string](3),
ToString[[]string](),
),
"[A B C]", "[D E]", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(1).AsOperator(),
rx.BufferCount[string](3).WithStartBufferEvery(1),
ToString[[]string](),
),
"[A B C]", "[B C D]", "[C D E]", "[D E]", "[E]", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(2).AsOperator(),
rx.BufferCount[string](3).WithStartBufferEvery(2),
ToString[[]string](),
),
"[A B C]", "[C D E]", "[E]", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
rx.BufferCount[string](3).WithStartBufferEvery(4).AsOperator(),
rx.BufferCount[string](3).WithStartBufferEvery(4),
ToString[[]string](),
),
"[A B C]", "[E]", ErrComplete,
).Case(
rx.Pipe2(
rx.Throw[string](ErrTest),
rx.BufferCount[string](2).AsOperator(),
rx.BufferCount[string](2),
ToString[[]string](),
),
ErrTest,
Expand Down
5 changes: 0 additions & 5 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ func (op ConnectOperator[T, R]) Apply(source Observable[T]) Observable[R] {
return connectObservable[T, R]{source, op.opts}.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op ConnectOperator[T, R]) AsOperator() Operator[T, R] { return op }

type connectObservable[T, R any] struct {
Source Observable[T]
connectConfig[T, R]
Expand Down
2 changes: 1 addition & 1 deletion connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestConnect(t *testing.T) {
ToString[[]int](),
)
},
).WithConnector(rx.Multicast[int]).AsOperator(),
).WithConnector(rx.Multicast[int]),
),
"[0 5 12 21]", ErrComplete,
)
Expand Down
5 changes: 0 additions & 5 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func (op MergeMapOperator[T, R]) Apply(source Observable[T]) Observable[R] {
return mergeMapObservable[T, R]{source, op.opts}.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op MergeMapOperator[T, R]) AsOperator() Operator[T, R] { return op }

type mergeMapObservable[T, R any] struct {
Source Observable[T]
mergeMapConfig[T, R]
Expand Down
12 changes: 6 additions & 6 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMerge2(t *testing.T) {
rx.Pipe1(rx.Just("C", "D"), AddLatencyToValues[string](2, 4)),
rx.Pipe1(rx.Just("E", "F"), AddLatencyToValues[string](1, 3)),
),
rx.MergeAll[rx.Observable[string]]().AsOperator(),
rx.MergeAll[rx.Observable[string]](),
),
"E", "C", "A", "F", "D", "B", ErrComplete,
).Case(
Expand All @@ -54,27 +54,27 @@ func TestMerge2(t *testing.T) {
func(v int) rx.Observable[int] {
return rx.Pipe1(rx.Just(v), DelaySubscription[int](1))
},
).WithConcurrency(3).AsOperator(),
).WithConcurrency(3),
rx.Reduce(0, func(v1, v2 int) int { return v1 + v2 }),
ToString[int](),
),
"36", ErrComplete,
).Case(
rx.Pipe1(
rx.Timer(Step(1)),
rx.MergeMapTo[time.Time](rx.Just("A")).AsOperator(),
rx.MergeMapTo[time.Time](rx.Just("A")),
),
"A", ErrComplete,
).Case(
rx.Pipe1(
rx.Empty[rx.Observable[string]](),
rx.MergeAll[rx.Observable[string]]().AsOperator(),
rx.MergeAll[rx.Observable[string]](),
),
ErrComplete,
).Case(
rx.Pipe1(
rx.Throw[rx.Observable[string]](ErrTest),
rx.MergeAll[rx.Observable[string]]().AsOperator(),
rx.MergeAll[rx.Observable[string]](),
),
ErrTest,
).Case(
Expand All @@ -89,7 +89,7 @@ func TestMerge2(t *testing.T) {
sink.Next(rx.Just("E", "F"))
sink.Complete()
},
rx.MergeAll[rx.Observable[string]]().AsOperator(),
rx.MergeAll[rx.Observable[string]](),
),
"A", "B", ErrTest,
)
Expand Down
5 changes: 0 additions & 5 deletions share.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ func (op ShareOperator[T]) Apply(source Observable[T]) Observable[T] {
return obs.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op ShareOperator[T]) AsOperator() Operator[T, T] { return op }

type shareObservable[T any] struct {
mu sync.Mutex
source Observable[T]
Expand Down
10 changes: 5 additions & 5 deletions share_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestShare1(t *testing.T) {
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Take[int](4),
rx.Share[int]().AsOperator(),
rx.Share[int](),
)

NewTestSuite[int](t).Case(
Expand All @@ -36,7 +36,7 @@ func TestShare2(t *testing.T) {
obs := rx.Pipe3(
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Share[int]().AsOperator(),
rx.Share[int](),
rx.Take[int](4),
)

Expand All @@ -62,7 +62,7 @@ func TestShare3(t *testing.T) {
func() rx.Subject[int] {
return rx.MulticastReplay[int](&rx.ReplayConfig{BufferSize: 1})
},
).AsOperator(),
),
)

NewTestSuite[int](t).Case(
Expand All @@ -86,7 +86,7 @@ func TestShare4(t *testing.T) {
func() rx.Subject[int] {
return rx.MulticastReplay[int](&rx.ReplayConfig{BufferSize: 1})
},
).AsOperator(),
),
rx.Take[int](4),
)

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestShare5(t *testing.T) {
Observer: rx.Noop[int],
}
},
).AsOperator(),
),
),
ErrTest,
)
Expand Down
5 changes: 0 additions & 5 deletions throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func (op ThrottleOperator[T, U]) Apply(source Observable[T]) Observable[T] {
return throttleObservable[T, U]{source, op.opts}.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op ThrottleOperator[T, U]) AsOperator() Operator[T, T] { return op }

type throttleObservable[T, U any] struct {
Source Observable[T]
throttleConfig[T, U]
Expand Down
20 changes: 10 additions & 10 deletions throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[time.Time] {
return rx.Timer(Step(3))
},
).AsOperator(),
),
),
"A", "C", "E", ErrComplete,
).Case(
Expand All @@ -30,7 +30,7 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[int] {
return rx.Empty[int]()
},
).AsOperator(),
),
),
"A", "B", "C", "D", "E", ErrComplete,
).Case(
Expand All @@ -44,7 +44,7 @@ func TestThrottle(t *testing.T) {
DelaySubscription[int](5),
)
},
).WithLeading(false).WithTrailing(true).AsOperator(),
).WithLeading(false).WithTrailing(true),
),
ErrComplete,
).Case(
Expand All @@ -54,7 +54,7 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[int] {
return rx.Throw[int](ErrTest)
},
).AsOperator(),
),
),
ErrTest,
).Case(
Expand All @@ -65,7 +65,7 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[int] {
return rx.Throw[int](ErrTest)
},
).AsOperator(),
),
),
"A", ErrTest,
).Case(
Expand All @@ -76,7 +76,7 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[time.Time] {
return rx.Timer(Step(9))
},
).WithLeading(false).WithTrailing(true).AsOperator(),
).WithLeading(false).WithTrailing(true),
),
"C", "E", ErrComplete,
).Case(
Expand All @@ -87,28 +87,28 @@ func TestThrottle(t *testing.T) {
func(string) rx.Observable[time.Time] {
return rx.Timer(Step(9))
},
).WithLeading(true).WithTrailing(true).AsOperator(),
).WithLeading(true).WithTrailing(true),
),
"A", "C", "E", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
AddLatencyToValues[string](0, 2),
rx.ThrottleTime[string](Step(3)).AsOperator(),
rx.ThrottleTime[string](Step(3)),
),
"A", "C", "E", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
AddLatencyToValues[string](0, 4),
rx.ThrottleTime[string](Step(9)).WithLeading(false).WithTrailing(true).AsOperator(),
rx.ThrottleTime[string](Step(9)).WithLeading(false).WithTrailing(true),
),
"C", "E", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
AddLatencyToValues[string](0, 4),
rx.ThrottleTime[string](Step(9)).WithLeading(true).WithTrailing(true).AsOperator(),
rx.ThrottleTime[string](Step(9)).WithLeading(true).WithTrailing(true),
),
"A", "C", "E", ErrComplete,
)
Expand Down
5 changes: 0 additions & 5 deletions timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func (op TimeoutOperator[T]) Apply(source Observable[T]) Observable[T] {
return timeoutObservable[T]{source, op.opts}.Subscribe
}

// AsOperator converts op to an Operator.
//
// Once type inference has improved in Go, this method will be removed.
func (op TimeoutOperator[T]) AsOperator() Operator[T, T] { return op }

type timeoutObservable[T any] struct {
Source Observable[T]
timeoutConfig[T]
Expand Down
8 changes: 4 additions & 4 deletions timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,28 @@ func TestTimeout(t *testing.T) {
rx.Pipe2(
rx.Just("A", "B", "C"),
AddLatencyToValues[string](1, 1),
rx.Timeout[string](Step(2)).AsOperator(),
rx.Timeout[string](Step(2)),
),
"A", "B", "C", ErrComplete,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C"),
AddLatencyToValues[string](1, 3),
rx.Timeout[string](Step(2)).AsOperator(),
rx.Timeout[string](Step(2)),
),
"A", rx.ErrTimeout,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C"),
AddLatencyToValues[string](2, 2),
rx.Timeout[string](Step(1)).WithFirst(Step(3)).AsOperator(),
rx.Timeout[string](Step(1)).WithFirst(Step(3)),
),
"A", rx.ErrTimeout,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C"),
AddLatencyToValues[string](1, 3),
rx.Timeout[string](Step(2)).WithObservable(rx.Throw[string](ErrTest)).AsOperator(),
rx.Timeout[string](Step(2)).WithObservable(rx.Throw[string](ErrTest)),
),
"A", ErrTest,
)
Expand Down

0 comments on commit 3ca3d80

Please sign in to comment.