diff --git a/examples/cancelAwait.php b/examples/cancelAwait.php new file mode 100644 index 0000000..633082a --- /dev/null +++ b/examples/cancelAwait.php @@ -0,0 +1,16 @@ +subscribe(new CallbackObserver( + $disposable = $observable->subscribe( function ($value) use (&$results, $loop) { $results[] = $value; @@ -30,18 +31,26 @@ function ($e) use (&$completed) { }, function () use (&$completed) { $completed = true; - } - - )); + }); while (!$completed) { $loop->run(); foreach ($results as $result) { - yield $result; + $cancel = yield $result; + + if ($cancel === AWAIT_CANCEL) { + $disposable->dispose(); + return; + } } $results = []; } } + +function cancelAwait(\Generator $generator) +{ + $generator->send(AWAIT_CANCEL); +} diff --git a/tests/FunctionAwaitTest.php b/tests/FunctionAwaitTest.php index e7ea178..ea91c19 100644 --- a/tests/FunctionAwaitTest.php +++ b/tests/FunctionAwaitTest.php @@ -2,6 +2,7 @@ namespace Rx\React\Tests; +use function Rx\cancelAwait; use Rx\Functional\FunctionalTestCase; use Rx\Observable; @@ -68,4 +69,28 @@ public function await_default_timeout() } + /** + * @test + */ + public function await_cancel() + { + $array = [1, 2, 3]; + + $observable = Observable::fromArray($array); + + $generator = \Rx\await($observable); + + $result = []; + foreach ($generator as $item) { + if ($item === 2) { + cancelAwait($generator); + } + + $result[] = $item; + } + + $this->assertEquals([1, 2], $result); + $this->assertEquals(count($result), 2); + + } }