From 5625ee93dd822e503c4bd6b7be4b0328a6be87c4 Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 5 Jan 2017 14:59:27 -0500 Subject: [PATCH] Upgrade to RxPHP 2.0 --- composer.json | 7 +++-- src/FromFileObservable.php | 19 +++++------- src/StreamSubject.php | 12 +++---- src/ToFileObserver.php | 12 ++----- .../Observable/FromFileObservableTest.php | 31 ++++++++++--------- 5 files changed, 35 insertions(+), 46 deletions(-) diff --git a/composer.json b/composer.json index 6a7996e..94d06a3 100644 --- a/composer.json +++ b/composer.json @@ -29,9 +29,10 @@ } }, "require": { - "voryx/event-loop": "^0.2.0", + "php": "^7.0", + "voryx/event-loop": "^2.0.1", "react/stream": "^0.4.3", - "reactivex/rxphp": "^1.0.0", - "rx/operator-extras": "^0.1.0" + "reactivex/rxphp": "2.x-dev", + "rx/operator-extras": "^2.0.0" } } diff --git a/src/FromFileObservable.php b/src/FromFileObservable.php index af970e2..d3f6f8e 100644 --- a/src/FromFileObservable.php +++ b/src/FromFileObservable.php @@ -2,13 +2,13 @@ namespace Rx\React; -use React\EventLoop\LoopInterface; use React\Stream\Stream; use Rx\Disposable\CallbackDisposable; -use Rx\Extra\Operator\CutOperator; +use Rx\DisposableInterface; +use Rx\Operator\CutOperator; use Rx\Observable; use Rx\ObserverInterface; -use Rx\SchedulerInterface; + class FromFileObservable extends Observable { @@ -18,28 +18,23 @@ class FromFileObservable extends Observable /** @var string */ private $mode; - /** @var LoopInterface */ - private $loop; - - public function __construct($fileName, $mode = "r", LoopInterface $loop = null) + public function __construct($fileName, $mode = 'r') { $this->fileName = $fileName; $this->mode = $mode; - $this->loop = $loop ?: \EventLoop\getLoop(); } /** * @param ObserverInterface $observer - * @param SchedulerInterface|null $scheduler * @return \Rx\Disposable\CompositeDisposable|\Rx\DisposableInterface */ - public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { try { - $stream = new StreamSubject(fopen($this->fileName, $this->mode), $this->loop); + $stream = new StreamSubject(fopen($this->fileName, $this->mode)); - return $stream->subscribe($observer, $scheduler); + return $stream->subscribe($observer); } catch (\Exception $e) { $observer->onError($e); diff --git a/src/StreamSubject.php b/src/StreamSubject.php index 996da4b..5ee9fdb 100644 --- a/src/StreamSubject.php +++ b/src/StreamSubject.php @@ -2,12 +2,13 @@ namespace Rx\React; -use React\EventLoop\LoopInterface; use React\Stream\Stream; use Rx\Disposable\BinaryDisposable; use Rx\Disposable\CallbackDisposable; +use Rx\DisposableInterface; use Rx\ObserverInterface; use Rx\Subject\Subject; +use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop; class StreamSubject extends Subject { @@ -19,12 +20,11 @@ class StreamSubject extends Subject * StreamSubject constructor. * * @param resource $resource - * @param LoopInterface|null $loop */ - public function __construct($resource, LoopInterface $loop = null) + public function __construct($resource) { - $loop = $loop ?: \EventLoop\getLoop(); + $loop = new AsyncInteropLoop(); $this->stream = new Stream($resource, $loop); @@ -49,7 +49,7 @@ public function onCompleted() parent::onCompleted(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $this->stream->on('data', function ($data) use ($observer) { @@ -65,7 +65,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) $observer->onCompleted(); }); - $disposable = parent::subscribe($observer, $scheduler); + $disposable = parent::_subscribe($observer); return new BinaryDisposable($disposable, new CallbackDisposable(function () use ($observer) { $this->removeObserver($observer); diff --git a/src/ToFileObserver.php b/src/ToFileObserver.php index e0312ae..8589ec3 100644 --- a/src/ToFileObserver.php +++ b/src/ToFileObserver.php @@ -2,10 +2,6 @@ namespace Rx\React; -use React\EventLoop\LoopInterface; -use React\Stream\Stream; -use Rx\Observable; - class ToFileObserver extends StreamSubject { @@ -13,13 +9,9 @@ class ToFileObserver extends StreamSubject * ToFileObserver constructor. * * @param string $fileName - * @param LoopInterface|null $loop */ - public function __construct($fileName, LoopInterface $loop = null) + public function __construct($fileName) { - - $loop = $loop ?: \EventLoop\getLoop(); - - parent::__construct(fopen($fileName, 'w'), $loop); + parent::__construct(fopen($fileName, 'wb')); } } diff --git a/tests/Functional/Observable/FromFileObservableTest.php b/tests/Functional/Observable/FromFileObservableTest.php index 51b25ef..5181577 100644 --- a/tests/Functional/Observable/FromFileObservableTest.php +++ b/tests/Functional/Observable/FromFileObservableTest.php @@ -6,6 +6,7 @@ use Rx\Observable; use Rx\Observer\CallbackObserver; use Rx\React\FromFileObservable; +use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop; class FromFileObservableTest extends \PHPUnit_Framework_TestCase { @@ -16,13 +17,13 @@ class FromFileObservableTest extends \PHPUnit_Framework_TestCase public function fromFile_basic() { /** @var LoopInterface $loop */ - $loop = \EventLoop\getLoop(); + $loop = new AsyncInteropLoop(); $source = new FromFileObservable(__DIR__ . '/../test.txt'); $result = false; $complete = false; $error = false; - $source->subscribe(new CallbackObserver( + $source->subscribe( function ($value) use (&$result) { $result = $value; }, @@ -32,12 +33,12 @@ function ($e) use (&$error) { function () use (&$complete) { $complete = true; } - )); + ); $loop->tick(); - $this->assertEquals("1 2 3 4 5", $result); + $this->assertEquals('1 2 3 4 5', $result); $this->assertTrue($complete); $this->assertFalse($error); @@ -49,13 +50,13 @@ function () use (&$complete) { public function fromFile_missing_file() { /** @var LoopInterface $loop */ - $loop = \EventLoop\getLoop(); + $loop = new AsyncInteropLoop(); $source = new FromFileObservable(__DIR__ . '/../nofile.txt'); $result = false; $complete = false; $error = false; - $source->subscribe(new CallbackObserver( + $source->subscribe( function ($value) use (&$result) { $result = $value; }, @@ -65,7 +66,7 @@ function ($e) use (&$error) { function () use (&$complete) { $complete = true; } - )); + ); $loop->tick(); @@ -83,18 +84,18 @@ public function fromFile_exceed_buffer() { //Create a 10k temp file $temp = tmpfile(); - fwrite($temp, str_repeat("1", 10000)); + fwrite($temp, str_repeat('1', 150000)); $meta_data = stream_get_meta_data($temp); - $filename = $meta_data["uri"]; + $filename = $meta_data['uri']; /** @var LoopInterface $loop */ - $loop = \EventLoop\getLoop(); + $loop = new AsyncInteropLoop(); $source = new FromFileObservable($filename); $result = false; $complete = false; $error = false; - $source->subscribe(new CallbackObserver( + $source->subscribe( function ($value) use (&$result) { $result = $value; }, @@ -104,26 +105,26 @@ function ($e) use (&$error) { function () use (&$complete) { $complete = true; } - )); + ); $loop->tick(); - $this->assertEquals("4096", strlen($result)); + $this->assertEquals("65536", strlen($result)); $this->assertFalse($complete); $this->assertFalse($error); $loop->tick(); - $this->assertEquals("4096", strlen($result)); + $this->assertEquals("65536", strlen($result)); $this->assertFalse($complete); $this->assertFalse($error); $loop->tick(); - $this->assertEquals("1808", strlen($result)); + $this->assertEquals("18928", strlen($result)); $this->assertTrue($complete); $this->assertFalse($error); }