Skip to content

Commit

Permalink
Upgrade to RxPHP 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Jan 5, 2017
1 parent 052faff commit 5625ee9
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 46 deletions.
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
}
},
"require": {
"voryx/event-loop": "^0.2.0",
"php": "^7.0",
"voryx/event-loop": "^2.0.1",
"react/stream": "^0.4.3",
"reactivex/rxphp": "^1.0.0",
"rx/operator-extras": "^0.1.0"
"reactivex/rxphp": "2.x-dev",
"rx/operator-extras": "^2.0.0"
}
}
19 changes: 7 additions & 12 deletions src/FromFileObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

namespace Rx\React;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use Rx\Disposable\CallbackDisposable;
use Rx\Extra\Operator\CutOperator;
use Rx\DisposableInterface;
use Rx\Operator\CutOperator;
use Rx\Observable;
use Rx\ObserverInterface;
use Rx\SchedulerInterface;


class FromFileObservable extends Observable
{
Expand All @@ -18,28 +18,23 @@ class FromFileObservable extends Observable
/** @var string */
private $mode;

/** @var LoopInterface */
private $loop;

public function __construct($fileName, $mode = "r", LoopInterface $loop = null)
public function __construct($fileName, $mode = 'r')
{
$this->fileName = $fileName;
$this->mode = $mode;
$this->loop = $loop ?: \EventLoop\getLoop();
}

/**
* @param ObserverInterface $observer
* @param SchedulerInterface|null $scheduler
* @return \Rx\Disposable\CompositeDisposable|\Rx\DisposableInterface
*/
public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null)
public function _subscribe(ObserverInterface $observer): DisposableInterface
{

try {
$stream = new StreamSubject(fopen($this->fileName, $this->mode), $this->loop);
$stream = new StreamSubject(fopen($this->fileName, $this->mode));

return $stream->subscribe($observer, $scheduler);
return $stream->subscribe($observer);

} catch (\Exception $e) {
$observer->onError($e);
Expand Down
12 changes: 6 additions & 6 deletions src/StreamSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

namespace Rx\React;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use Rx\Disposable\BinaryDisposable;
use Rx\Disposable\CallbackDisposable;
use Rx\DisposableInterface;
use Rx\ObserverInterface;
use Rx\Subject\Subject;
use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop;

class StreamSubject extends Subject
{
Expand All @@ -19,12 +20,11 @@ class StreamSubject extends Subject
* StreamSubject constructor.
*
* @param resource $resource
* @param LoopInterface|null $loop
*/
public function __construct($resource, LoopInterface $loop = null)
public function __construct($resource)
{

$loop = $loop ?: \EventLoop\getLoop();
$loop = new AsyncInteropLoop();

$this->stream = new Stream($resource, $loop);

Expand All @@ -49,7 +49,7 @@ public function onCompleted()
parent::onCompleted();
}

public function subscribe(ObserverInterface $observer, $scheduler = null)
public function _subscribe(ObserverInterface $observer): DisposableInterface
{

$this->stream->on('data', function ($data) use ($observer) {
Expand All @@ -65,7 +65,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null)
$observer->onCompleted();
});

$disposable = parent::subscribe($observer, $scheduler);
$disposable = parent::_subscribe($observer);

return new BinaryDisposable($disposable, new CallbackDisposable(function () use ($observer) {
$this->removeObserver($observer);
Expand Down
12 changes: 2 additions & 10 deletions src/ToFileObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@

namespace Rx\React;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use Rx\Observable;

class ToFileObserver extends StreamSubject
{

/**
* ToFileObserver constructor.
*
* @param string $fileName
* @param LoopInterface|null $loop
*/
public function __construct($fileName, LoopInterface $loop = null)
public function __construct($fileName)
{

$loop = $loop ?: \EventLoop\getLoop();

parent::__construct(fopen($fileName, 'w'), $loop);
parent::__construct(fopen($fileName, 'wb'));
}
}
31 changes: 16 additions & 15 deletions tests/Functional/Observable/FromFileObservableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Rx\Observable;
use Rx\Observer\CallbackObserver;
use Rx\React\FromFileObservable;
use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop;

class FromFileObservableTest extends \PHPUnit_Framework_TestCase
{
Expand All @@ -16,13 +17,13 @@ class FromFileObservableTest extends \PHPUnit_Framework_TestCase
public function fromFile_basic()
{
/** @var LoopInterface $loop */
$loop = \EventLoop\getLoop();
$loop = new AsyncInteropLoop();
$source = new FromFileObservable(__DIR__ . '/../test.txt');
$result = false;
$complete = false;
$error = false;

$source->subscribe(new CallbackObserver(
$source->subscribe(
function ($value) use (&$result) {
$result = $value;
},
Expand All @@ -32,12 +33,12 @@ function ($e) use (&$error) {
function () use (&$complete) {
$complete = true;
}
));
);


$loop->tick();

$this->assertEquals("1 2 3 4 5", $result);
$this->assertEquals('1 2 3 4 5', $result);
$this->assertTrue($complete);
$this->assertFalse($error);

Expand All @@ -49,13 +50,13 @@ function () use (&$complete) {
public function fromFile_missing_file()
{
/** @var LoopInterface $loop */
$loop = \EventLoop\getLoop();
$loop = new AsyncInteropLoop();
$source = new FromFileObservable(__DIR__ . '/../nofile.txt');
$result = false;
$complete = false;
$error = false;

$source->subscribe(new CallbackObserver(
$source->subscribe(
function ($value) use (&$result) {
$result = $value;
},
Expand All @@ -65,7 +66,7 @@ function ($e) use (&$error) {
function () use (&$complete) {
$complete = true;
}
));
);


$loop->tick();
Expand All @@ -83,18 +84,18 @@ public function fromFile_exceed_buffer()
{
//Create a 10k temp file
$temp = tmpfile();
fwrite($temp, str_repeat("1", 10000));
fwrite($temp, str_repeat('1', 150000));
$meta_data = stream_get_meta_data($temp);
$filename = $meta_data["uri"];
$filename = $meta_data['uri'];

/** @var LoopInterface $loop */
$loop = \EventLoop\getLoop();
$loop = new AsyncInteropLoop();
$source = new FromFileObservable($filename);
$result = false;
$complete = false;
$error = false;

$source->subscribe(new CallbackObserver(
$source->subscribe(
function ($value) use (&$result) {
$result = $value;
},
Expand All @@ -104,26 +105,26 @@ function ($e) use (&$error) {
function () use (&$complete) {
$complete = true;
}
));
);


$loop->tick();

$this->assertEquals("4096", strlen($result));
$this->assertEquals("65536", strlen($result));
$this->assertFalse($complete);
$this->assertFalse($error);


$loop->tick();

$this->assertEquals("4096", strlen($result));
$this->assertEquals("65536", strlen($result));
$this->assertFalse($complete);
$this->assertFalse($error);


$loop->tick();

$this->assertEquals("1808", strlen($result));
$this->assertEquals("18928", strlen($result));
$this->assertTrue($complete);
$this->assertFalse($error);
}
Expand Down

0 comments on commit 5625ee9

Please sign in to comment.