Skip to content
This repository has been archived by the owner on Jul 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #110 from juriansluiman/feature/refactor-queue-con…
Browse files Browse the repository at this point in the history
…troller-worker

Do not use queue name in Worker::processQueue($queue, $options);
  • Loading branch information
bakura10 committed Aug 6, 2014
2 parents b6a94ae + ea066f3 commit 0cb63ce
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 0.4.0

- Refactoring of dependency injection to use queue instead of queue name in worker
- Add job status codes so listeners can act on the result of a job's outcome
- Add controller plugin to ease push of jobs into queues
- BC: job's jsonSerialize() removed in favour of the queue's serializeJob() method
Expand Down
19 changes: 14 additions & 5 deletions src/SlmQueue/Controller/AbstractWorkerController.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use SlmQueue\Controller\Exception\WorkerProcessException;
use SlmQueue\Exception\ExceptionInterface;
use SlmQueue\Worker\WorkerInterface;
use SlmQueue\Queue\QueuePluginManager;
use Zend\Mvc\Controller\AbstractActionController;

/**
Expand All @@ -18,11 +19,18 @@ abstract class AbstractWorkerController extends AbstractActionController
protected $worker;

/**
* @param WorkerInterface $worker
* @var QueuePluginManager
*/
public function __construct(WorkerInterface $worker)
protected $queuePluginManager;

/**
* @param WorkerInterface $worker
* @param QueuePluginManager $queuePluginManager
*/
public function __construct(WorkerInterface $worker, QueuePluginManager $queuePluginManager)
{
$this->worker = $worker;
$this->worker = $worker;
$this->queuePluginManager = $queuePluginManager;
}

/**
Expand All @@ -34,7 +42,8 @@ public function __construct(WorkerInterface $worker)
public function processAction()
{
$options = $this->params()->fromRoute();
$queue = $options['queue'];
$name = $options['queue'];
$queue = $this->queuePluginManager->get($name);

try {
$result = $this->worker->processQueue($queue, $options);
Expand All @@ -48,7 +57,7 @@ public function processAction()

return sprintf(
"Finished worker for queue '%s' with %s jobs\n",
$queue,
$name,
$result
);
}
Expand Down
5 changes: 2 additions & 3 deletions src/SlmQueue/Factory/WorkerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ class WorkerFactory implements FactoryInterface
*/
public function createService(ServiceLocatorInterface $serviceLocator, $canonicalName = null, $requestedName = null)
{
$workerOptions = $serviceLocator->get('SlmQueue\Options\WorkerOptions');
$queuePluginManager = $serviceLocator->get('SlmQueue\Queue\QueuePluginManager');
$workerOptions = $serviceLocator->get('SlmQueue\Options\WorkerOptions');

return new $requestedName($queuePluginManager, $workerOptions);
return new $requestedName($workerOptions);
}
}
18 changes: 4 additions & 14 deletions src/SlmQueue/Worker/AbstractWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use SlmQueue\Job\JobInterface;
use SlmQueue\Options\WorkerOptions;
use SlmQueue\Queue\QueueInterface;
use SlmQueue\Queue\QueuePluginManager;
use SlmQueue\Queue\QueueAwareInterface;
use Zend\EventManager\EventManager;
use Zend\EventManager\EventManagerAwareInterface;
Expand All @@ -16,11 +15,6 @@
*/
abstract class AbstractWorker implements WorkerInterface, EventManagerAwareInterface
{
/**
* @var QueuePluginManager
*/
protected $queuePluginManager;

/**
* @var EventManagerInterface
*/
Expand All @@ -39,13 +33,11 @@ abstract class AbstractWorker implements WorkerInterface, EventManagerAwareInter
/**
* Constructor
*
* @param QueuePluginManager $queuePluginManager
* @param WorkerOptions $options
* @param WorkerOptions $options
*/
public function __construct(QueuePluginManager $queuePluginManager, WorkerOptions $options)
public function __construct(WorkerOptions $options)
{
$this->queuePluginManager = $queuePluginManager;
$this->options = $options;
$this->options = $options;

// Listen to the signals SIGTERM and SIGINT so that the worker can be killed properly. Note that
// because pcntl_signal may not be available on Windows, we needed to check for the existence of the function
Expand All @@ -59,10 +51,8 @@ public function __construct(QueuePluginManager $queuePluginManager, WorkerOption
/**
* {@inheritDoc}
*/
public function processQueue($queueName, array $options = array())
public function processQueue(QueueInterface $queue, array $options = array())
{
/** @var $queue QueueInterface */
$queue = $this->queuePluginManager->get($queueName);
$eventManager = $this->getEventManager();
$count = 0;

Expand Down
8 changes: 4 additions & 4 deletions src/SlmQueue/Worker/WorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
interface WorkerInterface
{
/**
* Process jobs in the queue identified by its name. Some queuing systems accept various options when
* Process jobs in the given queue. Some queuing systems accept various options when
* popping jobs, so you can set the options array. Those options depends on the concrete worker
*
* @param string $queueName
* @param array $options
* @param QueueInterface $queue
* @param array $options
* @return int How many jobs were processed
*/
public function processQueue($queueName, array $options = array());
public function processQueue(QueueInterface $queue, array $options = array());

/**
* Process a job that comes from the given queue
Expand Down
11 changes: 9 additions & 2 deletions tests/SlmQueueTest/Controller/AbstractControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ class AbstractControllerTest extends TestCase

public function setUp()
{
$this->queuePluginManager = new QueuePluginManager(new Config(array('factories' => array('knownQueue' => 'SlmQueueTest\Asset\SimpleQueueFactory'))));
$this->controller = new SimpleController(new SimpleWorker($this->queuePluginManager, new WorkerOptions()));
$worker = new SimpleWorker(new WorkerOptions());
$config = new Config(array(
'factories' => array(
'knownQueue' => 'SlmQueueTest\Asset\SimpleQueueFactory'
),
));

$this->queuePluginManager = new QueuePluginManager($config);
$this->controller = new SimpleController($worker, $this->queuePluginManager);
}

public function testThrowExceptionIfQueueIsUnknown()
Expand Down
51 changes: 20 additions & 31 deletions tests/SlmQueueTest/Worker/AbstractWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,26 @@

class AbstractWorkerTest extends TestCase
{
protected $worker, $plugins, $options, $queue, $job;
protected $options, $worker, $queue, $job;

public function setUp()
{
$queueName = 'foo';
$options = new WorkerOptions;
$options->setMaxRuns(1);
$options->setMaxMemory(1024*1024*1024);

$plugins = $this->getMock('SlmQueue\Queue\QueuePluginManager', array('get'));
$worker = new SimpleWorker($plugins, $options);

$queue = $this->getMock('SlmQueue\Queue\QueueInterface');
$plugins->expects($this->any())
->method('get')
->with($queueName)
->will($this->returnValue($queue));

$job = $this->getMock('SlmQueue\Job\JobInterface');

$this->worker = $worker;
$this->plugins = $plugins;
$this->options = $options;
$this->queue = $queue;
$this->job = $job;
$this->worker = new SimpleWorker($options);
$this->queue = $this->getMock('SlmQueue\Queue\QueueInterface');
$this->job = $this->getMock('SlmQueue\Job\JobInterface');
}
public function testWorkerPopsFromQueue()
{
$this->queue->expects($this->once())
->method('pop')
->will($this->returnValue($this->job));

$this->worker->processQueue('foo');
$this->worker->processQueue($this->queue);
}

public function testWorkerExecutesJob()
Expand All @@ -54,7 +41,7 @@ public function testWorkerExecutesJob()
$this->job->expects($this->once())
->method('execute');

$this->worker->processQueue('foo');
$this->worker->processQueue($this->queue);
}

public function testWorkerCountsRuns()
Expand All @@ -65,14 +52,14 @@ public function testWorkerCountsRuns()
->method('pop')
->will($this->returnValue($this->job));

$this->worker->processQueue('foo');
$this->worker->processQueue($this->queue);
}

public function testWorkerSkipsVoidValuesFromQueue()
{
$i = 0;
$job = $this->job;
$callback = function() use (&$i, $job) {
$callback = function () use (&$i, $job) {
// We return the job on the 4th call
if ($i === 3) {
return $job;
Expand All @@ -87,7 +74,7 @@ public function testWorkerSkipsVoidValuesFromQueue()
->method('pop')
->will($this->returnCallback($callback));

$count = $this->worker->processQueue('foo');
$count = $this->worker->processQueue($this->queue);
$this->assertEquals(1, $count);
}

Expand All @@ -98,7 +85,7 @@ public function testWorkerMaxMemory()
$this->queue->expects($this->exactly(1))
->method('pop');

$this->assertTrue($this->worker->processQueue('foo') === 0);
$this->assertTrue($this->worker->processQueue($this->queue) === 0);
}

public function testCorrectIdentifiersAreSetToEventManager()
Expand Down Expand Up @@ -139,7 +126,7 @@ public function testEventManagerTriggersEvents()
->method('trigger')
->with($this->equalTo(WorkerEvent::EVENT_PROCESS_QUEUE_POST));

$this->worker->processQueue('foo');
$this->worker->processQueue($this->queue);
}

public function testWorkerSetsJobStatusInEventClass()
Expand All @@ -156,22 +143,24 @@ public function testWorkerSetsJobStatusInEventClass()
->will($this->returnValue($this->job));

$self = $this;
$eventManager->attach(WorkerEvent::EVENT_PROCESS_JOB_POST, function($e) use ($self) {
$eventManager->attach(WorkerEvent::EVENT_PROCESS_JOB_POST, function ($e) use ($self) {
$self->assertEquals(WorkerEvent::JOB_STATUS_SUCCESS, $e->getResult());
});

$this->worker->processQueue('foo');
$this->worker->processQueue($this->queue);
}

public function testMethod_hasMemoryExceeded() {
public function testMethod_hasMemoryExceeded()
{
$this->options->setMaxMemory(10000000000);
$this->assertFalse($this->worker->isMaxMemoryExceeded());

$this->options->setMaxMemory(1);
$this->assertTrue($this->worker->isMaxMemoryExceeded());
}

public function testMethod_willExceedMaxRuns() {
public function testMethod_willExceedMaxRuns()
{
$this->options->setMaxRuns(10);
$this->assertFalse($this->worker->isMaxRunsReached(9));
$this->assertTrue($this->worker->isMaxRunsReached(10));
Expand All @@ -185,7 +174,7 @@ public function testSignalStopsWorkerForSigterm()
->method('pop');

$worker->handleSignal(SIGTERM);
$count = $worker->processQueue('foo');
$count = $worker->processQueue($this->queue);

$this->assertEquals(0, $count);
}
Expand All @@ -197,7 +186,7 @@ public function testSignalStopsWorkerForSigint()
->method('pop');

$worker->handleSignal(SIGINT);
$count = $worker->processQueue('foo');
$count = $worker->processQueue($this->queue);

$this->assertEquals(0, $count);
}
Expand All @@ -210,7 +199,7 @@ public function testNonStoppingSignalDoesNotStopWorker()
->will($this->returnValue($this->job));

$this->worker->handleSignal(SIGPOLL);
$count = $this->worker->processQueue('foo');
$count = $this->worker->processQueue($this->queue);

$this->assertEquals(1, $count);
}
Expand Down

0 comments on commit 0cb63ce

Please sign in to comment.