Skip to content

Commit

Permalink
Updated to React/Stream ^0.7.0
Browse files Browse the repository at this point in the history
From and To file now use Readable and Writable resources
  • Loading branch information
davidwdan committed Dec 19, 2017
1 parent d8d2489 commit 6853604
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 38 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"require": {
"php": "^7.0",
"voryx/event-loop": "^2.0",
"react/stream": "^0.4.3",
"react/stream": "^0.7.0",
"reactivex/rxphp": "^2.0",
"rx/operator-extras": "^2.0"
},
Expand Down
42 changes: 30 additions & 12 deletions src/FromFileObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,64 @@
namespace Rx\React;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use React\Stream\ReadableResourceStream;
use Rx\Disposable\CallbackDisposable;
use Rx\Disposable\EmptyDisposable;
use Rx\DisposableInterface;
use Rx\Operator\CutOperator;
use Rx\Observable;
use Rx\ObserverInterface;

class FromFileObservable extends Observable
{

private $fileName;
private $mode;

private $loop;

public function __construct(string $fileName, string $mode = 'r', LoopInterface $loop = null)
public function __construct(string $fileName, LoopInterface $loop = null)
{
$this->fileName = $fileName;
$this->mode = $mode;
$this->loop = $loop ?: \EventLoop\getLoop();
}

public function _subscribe(ObserverInterface $observer): DisposableInterface
{
try {
$stream = new StreamSubject(@fopen($this->fileName, $this->mode), $this->loop);
$stream = new ReadableResourceStream(@fopen($this->fileName, 'rb'), $this->loop);

return $stream->subscribe($observer);
$stream->on('data', function ($data) use ($observer) {
$observer->onNext($data);
});

} catch (\Exception $e) {
$observer->onError($e);
$stream->on('error', function (\Throwable $e) use ($observer) {
$observer->onError($e);
});

return new CallbackDisposable(function () use (&$stream) {
if ($stream instanceof Stream) {
$stream->close();
}
$stream->on('close', function () use ($observer) {
$observer->onCompleted();
});

$stream->on('end', function () use ($observer) {
$observer->onCompleted();
});

return new CallbackDisposable(function () use ($stream) {
$stream->close();
});

} catch (\Throwable $e) {
$observer->onError($e);
return new EmptyDisposable();
}
}

/**
* Cuts the stream based upon a delimiter.
*
* @param string $lineEnd
*
* @return \Rx\Observable
*/
public function cut(string $lineEnd = PHP_EOL): Observable
{
Expand Down
6 changes: 3 additions & 3 deletions src/StreamSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Rx\React;

use React\EventLoop\LoopInterface;
use React\Stream\Stream;
use React\Stream\DuplexResourceStream;
use Rx\Disposable\BinaryDisposable;
use Rx\Disposable\CallbackDisposable;
use Rx\DisposableInterface;
Expand All @@ -23,7 +23,7 @@ public function __construct($resource, LoopInterface $loop = null)
{
$loop = $loop ?: \EventLoop\getLoop();

$this->stream = new Stream($resource, $loop);
$this->stream = new DuplexResourceStream($resource, $loop);
}

public function onNext($data)
Expand Down Expand Up @@ -73,7 +73,7 @@ public function dispose()
}
}

public function getStream(): Stream
public function getStream(): DuplexResourceStream
{
return $this->stream;
}
Expand Down
28 changes: 25 additions & 3 deletions src/ToFileObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@

namespace Rx\React;

class ToFileObserver extends StreamSubject
use React\EventLoop\LoopInterface;
use React\Stream\WritableResourceStream;
use Rx\ObserverInterface;

class ToFileObserver implements ObserverInterface
{
public function __construct(string $fileName)
private $stream;

public function __construct(string $fileName, LoopInterface $loop = null)
{
$loop = $loop ?: \EventLoop\getLoop();
$this->stream = new WritableResourceStream(@fopen($fileName, 'wb'), $loop);
}

public function onCompleted()
{
$this->stream->end();
}

public function onError(\Throwable $error)
{
$this->stream->close();
}

public function onNext($value)
{
parent::__construct(@fopen($fileName, 'wb'));
$this->stream->write($value);
}
}
22 changes: 3 additions & 19 deletions tests/Functional/Observable/FromFileObservableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ function () use (&$complete) {
}
);


$loop->tick();
$loop->run();

$this->assertEquals('1 2 3 4 5', $result);
$this->assertTrue($complete);
Expand Down Expand Up @@ -65,7 +64,6 @@ function () use (&$complete) {
}
);


$loop->tick();

$this->assertFalse($result);
Expand All @@ -79,6 +77,7 @@ function () use (&$complete) {
*/
public function fromFile_exceed_buffer()
{

//Create a 10k temp file
$temp = tmpfile();
fwrite($temp, str_repeat('1', 150000));
Expand All @@ -104,22 +103,7 @@ function () use (&$complete) {
}
);


$loop->tick();

$this->assertEquals("65536", strlen($result));
$this->assertFalse($complete);
$this->assertFalse($error);


$loop->tick();

$this->assertEquals("65536", strlen($result));
$this->assertFalse($complete);
$this->assertFalse($error);


$loop->tick();
$loop->run();

$this->assertEquals("18928", strlen($result));
$this->assertTrue($complete);
Expand Down

0 comments on commit 6853604

Please sign in to comment.