Skip to content

Commit

Permalink
Cut fixes and additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Feb 29, 2016
1 parent e5e0a6a commit 80ccbc6
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 17 deletions.
33 changes: 23 additions & 10 deletions src/Operator/CutOperator.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,37 @@ public function __construct($delimiter = PHP_EOL)
*/
public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null)
{
$buffer = '';
$buffer = null;
$items = [];
$disposable = new CompositeDisposable();
$recursing = false;
$completed = false;

$onNext = function ($x) use (&$buffer, $observer, $scheduler, &$items, $disposable) {
$onCompleted = function () use (&$buffer, $observer, $scheduler, &$recursing) {
if ($recursing) {
return;
}
if ($buffer !== null) {
$observer->onNext($buffer);
}
$observer->onCompleted();
};

$onNext = function ($x) use (&$buffer, $observer, $scheduler, &$items, $disposable, &$recursing, &$completed, $onCompleted) {
if ($buffer === null) {
$buffer = '';
}
$buffer .= $x;
$items = array_merge($items, explode($this->delimiter, $buffer));
$buffer = array_pop($items);

$action = function ($reschedule) use (&$observer, &$items, &$buffer) {
$action = function ($reschedule) use (&$observer, &$items, &$buffer, &$recursing, &$completed, $onCompleted) {

if (count($items) === 0) {
$recursing = false;
if ($completed) {
$onCompleted();
}
return;
}

Expand All @@ -56,18 +75,12 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs

};

$recursing = true;
$schedulerDisposable = $scheduler->scheduleRecursive($action);

$disposable->add($schedulerDisposable);
};

$onCompleted = function () use (&$buffer, $observer) {
if (!empty($buffer)) {
$observer->onNext($buffer);
}
$observer->onCompleted();
};

$callbackObserver = new CallbackObserver($onNext, [$observer, "onError"], $onCompleted);
$sourceDisposable = $observable->subscribe($callbackObserver);

Expand Down
91 changes: 84 additions & 7 deletions tests/Functional/Operator/CutTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Rx\Observable;
use Rx\Extra\Operator\CutOperator;


class CutTest extends FunctionalTestCase
{

Expand Down Expand Up @@ -94,6 +93,62 @@ function cut_comma_delimiter()
], $results->getMessages());
}

/**
* @test
*/
function cut_comma_delimiter_empty_last()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(201, "1,2,3,4,5,6,"),
onCompleted(230)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->lift(function () {
return new CutOperator(',');
});
});
$this->assertMessages([
onNext(202, "1"),
onNext(203, "2"),
onNext(204, "3"),
onNext(205, "4"),
onNext(206, "5"),
onNext(207, "6"),
onNext(230, ""),
onCompleted(230)
], $results->getMessages());
}

/**
* @test
*/
function cut_comma_delimiter_empty_first()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(201, ",2,3,4,5,6,7"),
onCompleted(230)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->lift(function () {
return new CutOperator(',');
});
});
$this->assertMessages([
onNext(202, ""),
onNext(203, "2"),
onNext(204, "3"),
onNext(205, "4"),
onNext(206, "5"),
onNext(207, "6"),
onNext(230, "7"),
onCompleted(230)
], $results->getMessages());
}

/**
* @test
*/
Expand Down Expand Up @@ -128,8 +183,6 @@ function cut_comma_delimiter_skip_time()
*/
function cut_comma_delimiter_buffer_all()
{
$this->markTestSkipped("Not sure how this should work yet");

$xs = $this->createHotObservable([
onNext(150, 1),
onNext(201, "1,"),
Expand All @@ -147,7 +200,7 @@ function cut_comma_delimiter_buffer_all()
$this->assertMessages([
onNext(202, "1"),
onNext(203, "2"),
onNext(204, "3"),
onNext(203, "3"),
onNext(230, "4"),
onCompleted(230)
], $results->getMessages());
Expand All @@ -170,6 +223,7 @@ function cut_empty_string()
});
});
$this->assertMessages([
onNext(230, ""),
onCompleted(230)
], $results->getMessages());
}
Expand All @@ -179,9 +233,6 @@ function cut_empty_string()
*/
function cut_just_delimiter()
{

$this->markTestSkipped("Not sure how this should work yet");

$xs = $this->createHotObservable([
onNext(150, 1),
onNext(201, PHP_EOL),
Expand All @@ -200,6 +251,32 @@ function cut_just_delimiter()
], $results->getMessages());
}

/**
* @test
*/
function cut_split_delimiter()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(201, "1-"),
onNext(202, "-2-"),
onNext(203, "-"),
onCompleted(230)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->lift(function () {
return new CutOperator("--");
});
});
$this->assertMessages([
onNext(202, "1"),
onNext(203, "2"),
onNext(230, ""),
onCompleted(230)
], $results->getMessages());
}

/**
* @test
*/
Expand Down

0 comments on commit 80ccbc6

Please sign in to comment.