Skip to content

Commit

Permalink
Added ability to cancel await
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Apr 18, 2018
1 parent a620636 commit f88edfb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
16 changes: 16 additions & 0 deletions examples/cancelAwait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

require __DIR__ . '/../vendor/autoload.php';

$source = \Rx\Observable::interval(1000);

$generator = \Rx\await($source);

foreach ($generator as $item) {
if ($item === 3) {
\Rx\cancelAwait($generator);
}
echo $item, PHP_EOL;
}

echo 'DONE';
23 changes: 16 additions & 7 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
namespace Rx;

use React\EventLoop\LoopInterface;
use Rx\Observer\CallbackObserver;

const AWAIT_CANCEL = 'cancel';

/**
* Wait until observable completes.
Expand All @@ -12,13 +13,13 @@
* @param LoopInterface $loop
* @return \Generator
*/
function await(Observable $observable, LoopInterface $loop = null)
function await(Observable $observable, LoopInterface $loop = null): \Generator
{
$completed = false;
$results = [];
$loop = $loop ?: \EventLoop\getLoop();

$observable->subscribe(new CallbackObserver(
$disposable = $observable->subscribe(
function ($value) use (&$results, $loop) {
$results[] = $value;

Expand All @@ -30,18 +31,26 @@ function ($e) use (&$completed) {
},
function () use (&$completed) {
$completed = true;
}

));
});

while (!$completed) {

$loop->run();

foreach ($results as $result) {
yield $result;
$cancel = yield $result;

if ($cancel === AWAIT_CANCEL) {
$disposable->dispose();
return;
}
}

$results = [];
}
}

function cancelAwait(\Generator $generator)
{
$generator->send(AWAIT_CANCEL);
}
25 changes: 25 additions & 0 deletions tests/FunctionAwaitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Rx\React\Tests;

use function Rx\cancelAwait;
use Rx\Functional\FunctionalTestCase;
use Rx\Observable;

Expand Down Expand Up @@ -68,4 +69,28 @@ public function await_default_timeout()

}

/**
* @test
*/
public function await_cancel()
{
$array = [1, 2, 3];

$observable = Observable::fromArray($array);

$generator = \Rx\await($observable);

$result = [];
foreach ($generator as $item) {
if ($item === 2) {
cancelAwait($generator);
}

$result[] = $item;
}

$this->assertEquals([1, 2], $result);
$this->assertEquals(count($result), 2);

}
}

0 comments on commit f88edfb

Please sign in to comment.