diff --git a/config/module.config.php b/config/module.config.php index af71307..c40f610 100644 --- a/config/module.config.php +++ b/config/module.config.php @@ -3,9 +3,9 @@ return array( 'service_manager' => array( 'factories' => array( - 'SlmQueue\Job\JobPluginManager' => 'SlmQueue\Factory\JobPluginManagerFactory', - 'SlmQueue\Options\WorkerOptions' => 'SlmQueue\Factory\WorkerOptionsFactory', - 'SlmQueue\Queue\QueuePluginManager' => 'SlmQueue\Factory\QueuePluginManagerFactory' + 'SlmQueue\Job\JobPluginManager' => 'SlmQueue\Factory\JobPluginManagerFactory', + 'SlmQueue\Listener\StrategyPluginManager' => 'SlmQueue\Factory\StrategyPluginManagerFactory', + 'SlmQueue\Queue\QueuePluginManager' => 'SlmQueue\Factory\QueuePluginManagerFactory' ), ), @@ -17,11 +17,20 @@ 'slm_queue' => array( /** - * Worker options + * Worker Strategies */ - 'worker' => array( - 'max_runs' => 100000, - 'max_memory' => 100 * 1024 * 1024 + 'worker_strategies' => array( + 'default' => array( // per worker + 'SlmQueue\Strategy\AttachQueueListenersStrategy', // attaches strategies per queue + 'SlmQueue\Strategy\MaxRunsStrategy' => array('max_runs' => 100000), + 'SlmQueue\Strategy\MaxMemoryStrategy' => array('max_memory' => 100 * 1024 * 1024), + 'SlmQueue\Strategy\InterruptStrategy', + ), + 'queues' => array( // per queue + 'default' => array( + 'SlmQueue\Strategy\ProcessQueueStrategy', + ) + ), ), /** @@ -38,5 +47,22 @@ * Queue manager configuration */ 'queue_manager' => array(), + + /** + * Strategy manager configuration + */ + 'strategy_manager' => array( + 'invokables' => array( + 'SlmQueue\Strategy\ProcessQueueStrategy' => 'SlmQueue\Strategy\ProcessQueueStrategy', + 'SlmQueue\Strategy\InterruptStrategy' => 'SlmQueue\Strategy\InterruptStrategy', + 'SlmQueue\Strategy\MaxRunsStrategy' => 'SlmQueue\Strategy\MaxRunsStrategy', + 'SlmQueue\Strategy\MaxMemoryStrategy' => 'SlmQueue\Strategy\MaxMemoryStrategy', + 'SlmQueue\Strategy\FileWatchStrategy' => 'SlmQueue\Strategy\FileWatchStrategy', + ), + 'factories' => array( + 'SlmQueue\Strategy\AttachQueueListenersStrategy' => 'SlmQueue\Strategy\Factory\AttachQueueListenersStrategyFactory', + 'SlmQueue\Strategy\LogJobStrategy' => 'SlmQueue\Strategy\Factory\LogJobStrategyFactory', + ) + ), ) ); diff --git a/config/slm_queue.global.php.dist b/config/slm_queue.global.php.dist index b678403..7a31cc2 100644 --- a/config/slm_queue.global.php.dist +++ b/config/slm_queue.global.php.dist @@ -8,27 +8,63 @@ return array( 'slm_queue' => array( /** - * Parameters for the worker. It defines some criterias that can be reached before the - * worker stops to process any other jobs + * Allow to configure a specific queue. + * + * Available options depends on the queue factory */ - 'worker' => array( - /** - * Specify how many jobs can be processed by a worker until it stops (default to 100 000) - */ - // 'max_runs' => 100000, + 'queues' => array(), - /** - * Specifiy the max memory (in bytes) that can be used by the worker before it stops (default to 100 MB) - */ - // 'max_memory' => 100 * 1024 * 1024 - ), + /** + * This block is use to register and configure strategies to the worker event manager. The default key holds any + * configuration for all instanciated workers. The ones configured within the 'queues' keys are specific to + * specific queues only. + * + * Note that module.config.php defines a few defaults and that configuration where the value is not an array + * will be ignored (thus allows you to disable preconfigured strategies). + * + * 'worker_strategies' => array( + * 'default' => array( // per worker + * // Would disable the pre configured max memory strategy + * 'SlmQueue\Strategy\MaxMemoryStrategy' => null + * // Reconfigure the pre configured max memory strategy to use 250Mb max + * 'SlmQueue\Strategy\MaxMemoryStrategy' => array('max_memory' => 250 * 1024 * 1024) + * ), + * ), + * + * As queue processing is handled by strategies it is important that for each queue a ProcessQueueStrategy + * (a strategy that listens to WorkerEvent::EVENT_PROCESS) is registered. By default SlmQueue does handles that + * for the queue called 'default'. + * + * 'worker_strategies' => array( + * 'queues' => array( + * 'my-queue' => array( + * 'SlmQueue\Strategy\ProcessQueueStrategy', + * ) + * ), + * ), + */ + 'worker_strategies' => array( + 'default' => array( // per worker + ), + 'queues' => array( // per queue + 'default' => array( + ), + ), + ), - /** - * Allow to configure a specific queue. - * - * Available options depends on the queue factory - */ - 'queues' => array(), + /** + * Allow to configure the plugin manager that manages strategies. This works like any other + * PluginManager in Zend Framework 2. + * + * Add you own or override existing factories + * + * 'strategy_manager' => array( + * 'factories' => array( + * 'SlmQueue\Strategy\LogJobStrategy' => 'MyVeryOwn\LogJobStrategyFactory', + * ) + * ), + */ + 'strategy_manager' => array(), /** * Allow to configure dependencies for jobs that are pulled from any queue. This works like any other diff --git a/docs/5.Workers.md b/docs/5.Workers.md index bf13b41..a8ce3d2 100644 --- a/docs/5.Workers.md +++ b/docs/5.Workers.md @@ -13,30 +13,15 @@ for the different queue implementations. Worker stop conditions ---------------------- -The worker will be a long running call, due to the `while(true){ /*...*/ }` loop it contains inside the `processQueue()` -method. However, there are reasons to cancel the loop and stop the worker to continue. PHP is not the best language for -creating infinite running scripts. +The worker will be a long running call, due to the `while(...){ /*...*/ }` loop it +contains inside the `processQueue()` method. However, there are reasons to cancel the loop and stop the worker to +continue. PHP is not the best language for creating infinite running scripts. -It is wise to abort the script frequently, for example after x number of cycles in the `while` loop. A worker is -initiated with a `WorkerOptions` class, where it is possible to set a maximum number of runs. In this same options class, -a maximum memory level can be set. If you have accidentally a memory leak, this option can make sure the script aborts -eventually after reaching this level. +It is wise to abort the script frequently, for example after x number of cycles in the `while` loop. -Both options can be set at the `worker` key in the SlmQueue configuration: - -```php -'slm_queue' => array( - 'worker' => array( - 'max_runs' => 100000, // 10,000 runs - 'max_memory' => 20 * 1024 * 1024, // 20 MB - ) -), -``` - -Secondly, it is possible to catch a stop condition under Linux-like systems (as well as OS X). If a worker is started -from the command line interface (CLI), it is possible to send a SIGTERM or SiGINT call to the worker. SlmQueue is smart -enough not to quit the script diretly, but let the job finish its work first and then break out of the loop. For Windows -users this option is not available. +Various build-in strategies (['see 6.Events'](6.Events.md)) are used to decide if the worker should exit. These +strategies are aggregate listeners that hook into events the worker dispatches. Any listener may call a method +on the passed event `WorkerEvent::exitWorkerLoop()`. Which the worker will check the event the next iteration of the loop. Command line utility -------------------- diff --git a/docs/6.Events.md b/docs/6.Events.md index 170e5df..610f8a9 100644 --- a/docs/6.Events.md +++ b/docs/6.Events.md @@ -1,19 +1,227 @@ Documentation - Events -==================== +====================== -SlmQueue triggers events at selected moments in the worker code flow. This enables users of SlmQueue to create hooks at -several points: +As of version 0.4.0 the worker has been rewritten to a flexible event driven approach. The processing logic is now a +very minimalistic method. In pseudocode it looks like this; - * `WorkerEvent::EVENT_PROCESS_QUEUE_PRE`: just before queue processing starts - * `WorkerEvent::EVENT_PROCESS_QUEUE_POST`: just after processing ends - * `WorkerEvent::EVENT_PROCESS_JOB_PRE`: just before a job will be processed - * `WorkerEvent::EVENT_PROCESS_JOB_POST`: just after a job has been processed +``` +processQueue + trigger event 'bootstrap' + + while event says continue processing + trigger event 'process' + + trigger event 'finish' + + trigger event 'state' + +``` + +Worker Strategies +----------------- + +To get some useful results it is required to register so called 'worker strategies' to the worker. SlmQueue makes this +trivial via configuration. + +Worker strategies are aggregate listeners which are created via a plugin manager. + +At least one worker strategy listening to the bootstrap event must be registered to the worker. The Worker Factory will +throw an exception if its not. SlmQueue attaches the provided `AttachQueueListenersStrategy` to do just that. + +It is worth noting that events will be dispatched from the worker (obviously) but can also be dispatch from within +worker strategies. + +The plugin manager ensures they extend `SlmQueue\Listener\Strategy\AbstractStrategy` and each worker strategy therefore +gains the following capabilities; + +### Accept options + +Configuration options are passed by the plugin manager to the constructor of an worker strategy. Setter methods will be +called for each option. If a setter does not exist an exception will be thrown. + +``` +'SlmQueue\Strategy\MaxRunStrategy' => array('max_runs' => 10); + +``` +Such a config will result in an MaxRunStrategy instance of which the setMaxRuns method is called with '10'. + +*The optional 'priority' option is used when the aggregates listeners are are registered with event manager and is +thereafter removed from the passed options. This means a Worker Strategy cannot have this option.* + +### Request to stop processing the queue + +Worker strategies may inform the worker to stop processing the queue. Or more concrete; invalidate the condition of +the while loop. + +``` +public function onSomeListener(WorkerEvent $event) +{ + $event->exitWorkerLoop(); + ... +} +``` + +### Do something before or after the processing of a queue + +While processing a queue it might be required to execute some setup- or teardown logic. A worker strategy may listen to +the `bootstrap` and/or `finish` event to do just this. + +``` +/** + * @param EventManagerInterface $events + */ +public function attach(EventManagerInterface $events) +{ + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_BOOTSTRAP, + array($this, 'onBootstrap') + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_FINISH, + array($this, 'onFinish') + ); +} + +/** + * @param WorkerEvent $e + */ +public function onBootstrap(WorkerEvent $e) +{ + // setup code +} + +/** + * @param WorkerEvent $e + */ +public function onFinish(WorkerEvent $e) +{ + // teardown code +} +``` -Any listener waiting for above events will be passed a `WorkerEvent` class which contains a reference to the queue. The -`EVENT_PROCESS_JOB_PRE` and `EVENT_PROCESS_JOB_POST` events also have a reference to the job class. +### Do something before or after the processing of a job + +For some types of jobs it might be required to do something before or after the execution of an individual job. + +This can be done by listening to the `process` event at different priorities. + +``` +/** + * @param EventManagerInterface $events + */ +public function attach(EventManagerInterface $events) +{ + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onPreProcess'), + 100 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onPostProcess'), + -100 + ); +} + +/** + * @param WorkerEvent $e + */ +public function onPreProcess(WorkerEvent $e) +{ + // pre job execution code +} + +/** + * @param WorkerEvent $e + */ +public function onPostProcess(WorkerEvent $e) +{ + // post job execution code +} +``` + +### Report on 'the thing' a strategy is tasked with. + +A worker strategy may report a state. A state is simply a string which will be returned when +the `WorkerEvent::EVENT_PROCESS_STATE` is received. + +From the MaxRunStrategy; + +``` +public function onSomeListener(WorkerEvent $event) +{ + $this->runCount++; + + if ($this->maxRuns && $this->runCount >= $this->maxRuns) { + $event->exitWorkerLoop(); + + $this->state = sprintf('maximum of %s jobs processed', $this->runCount); + } else { + $this->state = sprintf('%s jobs processed', $this->runCount); + } +} +``` + +### Dispatch WorkerEvents + +A worker strategy may ask the worker to dispatch events. + +From the ProcessQueueStrategy + +``` +public function onJobPop(WorkerEvent $e) +{ + $queue = $e->getQueue(); + $options = $e->getOptions(); + $job = $queue->pop($options); + + // The queue may return null, for instance if a timeout was set + if (!$job instanceof JobInterface) { + /** @var AbstractWorker $worker */ + $worker = $e->getTarget(); + + $worker->getEventManager()->trigger(WorkerEvent::EVENT_PROCESS_IDLE, $e); + + // make sure the event doesn't propagate + $e->stopPropagation(); + + return; + } + + $e->setJob($job); +} +``` + +Configuration +------------- + + +Services +-------- + +Worker strategies are regular ZF2 services that are instanciated via a plugin manager. If a worker strategy has +dependancies on other services it should be created it via service factory. + +**The plugin manager is configured to *not* share services.** + +WorkerEvent +----------- + +Events the worker *and* worker strategies may dispatch; + + * `WorkerEvent::EVENT_BOOTSTRAP` just before loop is entered + * `WorkerEvent::EVENT_FINISH` just after the loop has exited + * `WorkerEvent::EVENT_PROCESS` fetch and process job(s) + * `WorkerEvent::EVENT_PROCESS_IDLE` when the queue is empty + * `WorkerEvent::EVENT_PROCESS_STATE` collects 'states' from strategies. + + + +Any listener waiting for above events will be passed a `WorkerEvent` class which contains a reference to the queue. +The `EVENT_PROCESS` event also has a reference to the job instance. ```php -$em->attach(WorkerEvent::EVENT_PROCESS_JOB_PRE, function(WorkerEvent $e) { +$em->attach(WorkerEvent::EVENT_PROCESS, function(WorkerEvent $e) { $queue = $e->getQueue(); $job = $e->getJob(); }); @@ -40,12 +248,135 @@ $em->attach(WorkerEvent::EVENT_PROCESS_JOB_POST, function(WorkerEvent $e) use ($ }); ``` +Provided Worker Strategies +-------------------------- + +#### AttachQueueListenersStrategy + +The purpose of this strategy is to register additional strategies that are specific to the queue that is being +processed. + +After registering any additional worker strategies it will unregister itself as a listener. Finally it halts the event +propagation and re-triggers the `bootstrap` event. + +A new cycle of bootstraping will occure but now with additional queue specific strategies. + +listens to: + +- `bootstrap` event at priority PHP_MAX_INT + +triggers: + +- `bootstrap` + +throws: + +- RunTimeException if the `process` event isn't listened to by any registered strategy. + +This strategy is enabled by default for all queue's. + +#### FileWatchStrategy + +This strategy is able to 'watch' files by creating a hash of their contents. If it detects a change it will request to +stop processing the queue. This is useful if you have something like [supervisor](7.WorkerManagement.md) automaticly +restarting the worker process. + +The strategy builds a list of files it needs to watch via a preg_match on the filenames within the application. + +listens to: + +- `idle` event at priority 1 +- `process` event at priority -1000 +- `state` event at priority 1 + +options: + +- pattern defaults to '/^\.\/(config|module).*\.(php|phtml)$/' + +This strategy is not enabled by default. It can be slow and is recommended for development only. In production you may +watch a single file. + +#### InterruptStrategy + +The InterruptStrategy is able to catch a stop condition under Linux-like systems (as well as OS X). If a worker is +started from the command line interface (CLI), it is possible to send a SIGTERM or SiGINT call to the worker. SlmQueue +is smart enough not to quit the script directly, but let the job finish its work first and then break out of the loop. +On Windows systems this strategy does nothing. + +listens to: + +- `idle` event at priority 1 +- `process` event at priority -1000 +- `state` event at priority 1 + +This strategy is enabled by default for all queue's. + +#### LogJobStrategy + +Simple outputs to the console a when it begin executing a job and when it is done. + +listens to: + + - `process` event at priority 1000 + - `process` event at priority -1000 + +#### MaxMemoryStrategy + +The MaxMemoryStrategy will measure the amount of memory allocated to PHP after each processed job. It will request to +exit when a threshold is exceeded. + +Note that an individual job may exceed this threshold during it's live time. But if you have a memory leak this strategy +can make sure the script aborts eventually. + +listens to: + +- `idle` event at priority 1 +- `process` event at priority -1000 +- `state` event at priority 1 + +options: + +- max_memory defaults to 100\*1024\*1024 + +This strategy is enabled by default for all queue's. + +#### MaxRunsStrategy + +The MaxRunStrategy will request to exit after a set number of jobs have been processed. + +listens to: + +- `idle` event at priority 1 +- `process` event at priority -1000 +- `state` event at priority 1 + +options: + +- max_runs defaults to 100000 + +This strategy is enabled by default for all queue's. + +#### ProcessQueueStrategy + +Responsible for quering the queue for jobs and executing them. + +listens to: + +- `process` event at priority 2 +- `process` event at priority 1 + +triggers: + +- `idle` if the queue returns null (it might be empty or timed out) + + Using the shared event manager ------------------------------ Instead of direct access to the worker's event manager, the shared manager is available to register events too: ```php +getApplication()->getEventManager(); $sharedEm = $em->getSharedManager(); - $sharedEm->attach('SlmQueue\Worker\WorkerInterface', WorkerEvent::EVENT_PROCESS_QUEUE_PRE, function(){ - // log the start of the worker processing - }); + $sharedEm->attach('SlmQueue\Worker\WorkerInterface', WorkerEvent::EVENT_PROCESS, function() { + // some thing just before a job starts. + }, -1000); } } ``` -Using an aggregate listener ---------------------------- +An example +---------- -If it is required to listen at multiple events, an aggregate listener is a powerful tool to hook into multiple events of -the same event manager. A good example is i18n: a job is given a locale if the job performs localized actions. This -locale is set to the translator just before processing starts. The original locale is reverted when the job has finished -processing. +A good example is i18n: a job is given a locale if the job performs localized actions. This locale is set to the +translator just before processing starts. The original locale is reverted when the job has finished processing. In this case, all jobs which require a locale set are implementing a `LocaleAwareInterface`: ```php +listeners[] = $events->attach(WorkerEvent::EVENT_PROCESS_JOB_PRE, array($this, 'onPreJobProcessing')); - $this->listeners[] = $events->attach(WorkerEvent::EVENT_PROCESS_JOB_POST, array($this, 'onPostJobProcessing')); + $this->listeners[] = $events->attach(WorkerEvent::EVENT_PROCESS, array($this, 'onPreJobProc'), 1000); + $this->listeners[] = $events->attach(WorkerEvent::EVENT_PROCESS, array($this, 'onPostJobProc), -1000); } public function onPreJobProcessing(WorkerEvent $e) @@ -158,28 +489,78 @@ class JobTranslatorListener extends AbstractListenerAggregate } ``` -The last step is to register the aggregate listener to the event manager of the worker object: +Since this worker strategy has a dependency that needs to be injected we should create a factory for it. ```php -public function onBootstrap(MvcEvent $e) -{ - $sm = $e->getApplication()->getServiceManager(); +get('MvcTranslator'); +use MyModule\Strategy\JobTranslatorStrategy; +use Zend\ServiceManager\FactoryInterface; +use Zend\ServiceManager\ServiceLocatorInterface; - /** @var $worker \SlmQueueDoctrine\Worker\DoctrineWorker */ - $worker = $sm->get('SlmQueueDoctrine\Worker\DoctrineWorker'); +class JobTranslatorStrategyFactory implements FactoryInterface +{ + /** + * Create service + * + * @param ServiceLocatorInterface $serviceLocator + * @return JobTranslatorStrategy + */ + public function createService(ServiceLocatorInterface $serviceLocator) + { + $sm = $serviceLocator->getServiceLocator(); + + /** @var $sm \Zend\Mvc\I18n\Translator */ + $translator = $sm->get('MvcTranslator'); + + $strategy = new JobTranslatorStrategy($translator); - $listener = new JobTranslatorListener($translator); - $worker->getEventManager()->attachAggregate($listener); + return $strategy; + } } ``` +Finally add two configuration settings; + + 1. Register the factory to the plugin manager to the Strategy Manager. + 2. Add the strategy by name to the worker strategies. Note we can do this for all queue's or for specific ones. + +```php + array( + /** + * Worker Strategies + */ + 'worker_strategies' => array( + 'default' => array( // per worker + // add it here to enable the + ), + 'queues' => array( // per queue + 'my-queue' => array( + 'MyModule\Strategy\JobTranslatorStrategy', + ) + ), + ), + + /** + * Strategy manager + */ + 'strategy_manager' => array( + 'factories' => array( + 'MyModule\Strategy\JobTranslatorStrategy' => 'MyModule\Strategy\Factory\JobTranslatorStrategyFactory', + ) + ), + ) +); + +``` + Navigation ---------- -Previous page: [Workers](5.Internals.md) +Previous page: [Workers](5.Worker.md) Next page: [Worker Management](7.WorkerManagement.md) 1. [Introduction](1.Introduction.md) diff --git a/src/SlmQueue/Controller/AbstractWorkerController.php b/src/SlmQueue/Controller/AbstractWorkerController.php index f3a12b4..ec6f613 100644 --- a/src/SlmQueue/Controller/AbstractWorkerController.php +++ b/src/SlmQueue/Controller/AbstractWorkerController.php @@ -46,7 +46,7 @@ public function processAction() $queue = $this->queuePluginManager->get($name); try { - $result = $this->worker->processQueue($queue, $options); + $messages = $this->worker->processQueue($queue, $options); } catch (ExceptionInterface $e) { throw new WorkerProcessException( 'Caught exception while processing queue', @@ -55,10 +55,14 @@ public function processAction() ); } + $messages = implode("\n", array_map(function ($m) { + return sprintf(' - %s', $m); + }, $messages)); + return sprintf( - "Finished worker for queue '%s' with %s jobs\n", + "Finished worker for queue '%s':\n%s\n", $name, - $result + $messages ); } } diff --git a/src/SlmQueue/Exception/BadMethodCallException.php b/src/SlmQueue/Exception/BadMethodCallException.php new file mode 100644 index 0000000..08aa9ef --- /dev/null +++ b/src/SlmQueue/Exception/BadMethodCallException.php @@ -0,0 +1,7 @@ +get('Config'); + $config = $config['slm_queue']['strategy_manager']; + + $listenerPluginManager = new StrategyPluginManager(new Config($config)); + $listenerPluginManager->setServiceLocator($serviceLocator); + + return $listenerPluginManager; + } +} diff --git a/src/SlmQueue/Factory/WorkerFactory.php b/src/SlmQueue/Factory/WorkerFactory.php index b3499d4..63c81ac 100644 --- a/src/SlmQueue/Factory/WorkerFactory.php +++ b/src/SlmQueue/Factory/WorkerFactory.php @@ -1,6 +1,11 @@ get('SlmQueue\Options\WorkerOptions'); + $config = $serviceLocator->get('Config'); + $strategies = $config['slm_queue']['worker_strategies']['default']; - return new $requestedName($workerOptions); + $eventManager = $serviceLocator->get('EventManager'); + $listenerPluginManager = $serviceLocator->get('SlmQueue\Listener\StrategyPluginManager'); + $this->attachWorkerListeners($eventManager, $listenerPluginManager, $strategies); + + /** @var WorkerInterface $worker */ + $worker = new $requestedName($eventManager); + return $worker; + } + + /** + * @param EventManagerInterface $eventManager + * @param StrategyPluginManager $listenerPluginManager + * @param array $strategyConfig + * @throws RunTimeException + */ + protected function attachWorkerListeners( + EventManagerInterface $eventManager, + StrategyPluginManager $listenerPluginManager, + array $strategyConfig = array() + ) { + foreach ($strategyConfig as $strategy => $options) { + if (is_numeric($strategy) && is_string($options)) { // no options given, name stored as value + $strategy = $options; + $options = array(); + } + + if (!is_string($strategy) || !is_array($options)) { + continue; + } + + $priority = null; + if (isset($options['priority'])) { + $priority = $options['priority']; + unset($options['priority']); + } + + $listener = $listenerPluginManager->get($strategy, $options); + + if (!is_null($priority)) { + $eventManager->attachAggregate($listener, $priority); + } else { + $eventManager->attachAggregate($listener); + } + } + + if (!in_array(WorkerEvent::EVENT_BOOTSTRAP, $eventManager->getEvents())) { + throw new RunTimeException(sprintf( + "No worker strategy has been registered to respond to the '%s' event.", + WorkerEvent::EVENT_BOOTSTRAP + )); + } } } diff --git a/src/SlmQueue/Factory/WorkerOptionsFactory.php b/src/SlmQueue/Factory/WorkerOptionsFactory.php deleted file mode 100644 index af97f97..0000000 --- a/src/SlmQueue/Factory/WorkerOptionsFactory.php +++ /dev/null @@ -1,19 +0,0 @@ -get('Config'); - return new WorkerOptions($config['slm_queue']['worker']); - } -} diff --git a/src/SlmQueue/Options/WorkerOptions.php b/src/SlmQueue/Options/WorkerOptions.php deleted file mode 100644 index 1936e13..0000000 --- a/src/SlmQueue/Options/WorkerOptions.php +++ /dev/null @@ -1,63 +0,0 @@ -maxRuns = (int) $maxRuns; - } - - /** - * Get how many jobs can be processed before the worker stops - * - * @return int - */ - public function getMaxRuns() - { - return $this->maxRuns; - } - - /** - * Set the max memory the worker can use (in bytes) - * - * @param int $maxMemory - * @return void - */ - public function setMaxMemory($maxMemory) - { - $this->maxMemory = (int) $maxMemory; - } - - /** - * Get the max memory the worker can use (in bytes) - * - * @return int - */ - public function getMaxMemory() - { - return $this->maxMemory; - } -} diff --git a/src/SlmQueue/Strategy/AbstractStrategy.php b/src/SlmQueue/Strategy/AbstractStrategy.php new file mode 100644 index 0000000..bb75337 --- /dev/null +++ b/src/SlmQueue/Strategy/AbstractStrategy.php @@ -0,0 +1,58 @@ +setOptions($options); + } + } + + /** + * Set options from array + */ + public function setOptions(array $options) + { + foreach ($options as $key => $value) { + $setter = 'set' . str_replace(' ', '', ucwords(str_replace('_', ' ', $key))); + if (!method_exists($this, $setter)) { + throw new Exception\BadMethodCallException( + 'The option "' . $key . '" does not ' + . 'have a matching ' . $setter . ' setter method ' + . 'which must be defined' + ); + } + $this->{$setter}($value); + } + } + + /** + * Event listener which returns the state of the queue + * + * @param WorkerEvent $event + * @return bool|string + */ + public function onReportQueueState(WorkerEvent $event) + { + return is_string($this->state) ? $this->state : false; + } +} diff --git a/src/SlmQueue/Strategy/AttachQueueListenersStrategy.php b/src/SlmQueue/Strategy/AttachQueueListenersStrategy.php new file mode 100644 index 0000000..48b3e54 --- /dev/null +++ b/src/SlmQueue/Strategy/AttachQueueListenersStrategy.php @@ -0,0 +1,94 @@ +pluginManager = $pluginManager; + $this->strategyConfig = $strategyConfig; + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events) + { + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_BOOTSTRAP, + array($this, 'attachQueueListeners'), + PHP_INT_MAX + ); + } + + /** + * @param WorkerEvent $e + * @throws \SlmQueue\Exception\RunTimeException + */ + public function attachQueueListeners(WorkerEvent $e) + { + /** @var AbstractWorker $worker */ + $worker = $e->getTarget(); + $name = $e->getQueue()->getName(); + $eventManager = $worker->getEventManager(); + + $eventManager->detachAggregate($this); + + if (!isset($this->strategyConfig[$name])) { + return; + } + + $strategies = $this->strategyConfig[$name]; + + foreach ($strategies as $strategy => $options) { + if (is_numeric($strategy) && is_string($options)) { // no options given, name stored as value + $strategy = $options; + $options = array(); + } + + if (!is_string($strategy) || !is_array($options)) { + continue; + } + + $priority = null; + if (isset($options['priority'])) { + $priority = $options['priority']; + unset($options['priority']); + } + + $listener = $this->pluginManager->get($strategy, $options); + + if (!is_null($priority)) { + $eventManager->attachAggregate($listener, $priority); + } else { + $eventManager->attachAggregate($listener); + } + } + + if (!in_array(WorkerEvent::EVENT_PROCESS, $eventManager->getEvents())) { + throw new RunTimeException(sprintf( + "No worker strategy has been registered to respond to the '%s' event.", + WorkerEvent::EVENT_PROCESS + )); + } + + $e->stopPropagation(); + $eventManager->trigger(WorkerEvent::EVENT_BOOTSTRAP, $e); + } +} diff --git a/src/SlmQueue/Strategy/Factory/AttachQueueListenersStrategyFactory.php b/src/SlmQueue/Strategy/Factory/AttachQueueListenersStrategyFactory.php new file mode 100644 index 0000000..1dbf56b --- /dev/null +++ b/src/SlmQueue/Strategy/Factory/AttachQueueListenersStrategyFactory.php @@ -0,0 +1,29 @@ +getServiceLocator(); + $pluginManager = $sm->get('SlmQueue\Listener\StrategyPluginManager'); + $config = $sm->get('Config'); + $strategyConfig = $config['slm_queue']['worker_strategies']['queues']; + + return new AttachQueueListenersStrategy($pluginManager, $strategyConfig); + } +} diff --git a/src/SlmQueue/Strategy/Factory/LogJobStrategyFactory.php b/src/SlmQueue/Strategy/Factory/LogJobStrategyFactory.php new file mode 100644 index 0000000..9754db7 --- /dev/null +++ b/src/SlmQueue/Strategy/Factory/LogJobStrategyFactory.php @@ -0,0 +1,31 @@ +options = $options; + } + /** + * Create service + * + * @param ServiceLocatorInterface $serviceLocator + * @return LogJobStrategy + */ + public function createService(ServiceLocatorInterface $serviceLocator) + { + $strategy = new LogJobStrategy($serviceLocator->getServiceLocator()->get('Console'), $this->options); + + return $strategy; + } +} diff --git a/src/SlmQueue/Strategy/FileWatchStrategy.php b/src/SlmQueue/Strategy/FileWatchStrategy.php new file mode 100644 index 0000000..941d18a --- /dev/null +++ b/src/SlmQueue/Strategy/FileWatchStrategy.php @@ -0,0 +1,107 @@ +pattern = $pattern; + + $this->files = array(); + } + + /** + * @return string + */ + public function getPattern() + { + return $this->pattern; + } + + /** + * Files being watched + * + * @return array|null + */ + public function getFiles() + { + return $this->files; + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events, $priority = 1) + { + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_IDLE, + array($this, 'onStopConditionCheck'), + $priority + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onStopConditionCheck'), + -1000 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_STATE, + array($this, 'onReportQueueState'), + $priority + ); + } + + public function onStopConditionCheck(WorkerEvent $event) + { + if (!count($this->files)) { + $this->constructFileList(); + + $this->state = sprintf("watching %s files for modifications", count($this->files)); + } + + foreach ($this->files as $checksum => $file) { + if (!file_exists($file) || !is_readable($file) || (string) $checksum !== hash_file('crc32', $file)) { + $event->exitWorkerLoop(); + + $this->state = sprintf("file modification detected for '%s'", $file); + } + } + } + + protected function constructFileList() + { + $iterator = new \RecursiveDirectoryIterator('.', \RecursiveDirectoryIterator::FOLLOW_SYMLINKS); + $files = new \RecursiveIteratorIterator($iterator); + + /** @var $file \SplFileInfo */ + foreach ($files as $file) { + if ($file->isDir()) { + continue; + } + + if (!preg_match($this->pattern, $file)) { + continue; + } + + $this->files[hash_file('crc32', $file)] = (string) $file; + } + } +} diff --git a/src/SlmQueue/Strategy/InterruptStrategy.php b/src/SlmQueue/Strategy/InterruptStrategy.php new file mode 100644 index 0000000..324784f --- /dev/null +++ b/src/SlmQueue/Strategy/InterruptStrategy.php @@ -0,0 +1,80 @@ +listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_IDLE, + array($this, 'onStopConditionCheck'), + $priority + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onStopConditionCheck'), + -1000 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_STATE, + array($this, 'onReportQueueState'), + $priority + ); + } + + /** + * Checks for the stop condition of this strategy + * + * @param WorkerEvent $event + * @return string + */ + public function onStopConditionCheck(WorkerEvent $event) + { + if ($this->interrupted) { + $event->exitWorkerLoop(); + + $this->state = sprintf("interrupt by an external signal on '%s'", $event->getName()); + } + } + + /** + * Handle the signal + * + * @param int $signo + */ + public function onPCNTLSignal($signo) + { + switch($signo) { + case SIGTERM: + case SIGINT: + $this->interrupted = true; + break; + } + } +} diff --git a/src/SlmQueue/Strategy/LogJobStrategy.php b/src/SlmQueue/Strategy/LogJobStrategy.php new file mode 100644 index 0000000..eb18622 --- /dev/null +++ b/src/SlmQueue/Strategy/LogJobStrategy.php @@ -0,0 +1,65 @@ +console = $console; + + parent::__construct($options); + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events, $priority = 1) + { + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onLogJobProcessStart'), + 10000 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onLogJobProcessDone'), + -1000 + ); + } + + /** + * @param WorkerEvent $e + */ + public function onLogJobProcessStart(WorkerEvent $e) + { + $job = $e->getJob(); + $name = $job->getMetadata('name'); + if (null === $name) { + $name = get_class($job); + } + + $this->console->write(sprintf('Processing job %s...', $name)); + } + + /** + * @param WorkerEvent $e + */ + public function onLogJobProcessDone(WorkerEvent $e) + { + $this->console->writeLine('Done!'); + } +} diff --git a/src/SlmQueue/Strategy/MaxMemoryStrategy.php b/src/SlmQueue/Strategy/MaxMemoryStrategy.php new file mode 100644 index 0000000..af81b54 --- /dev/null +++ b/src/SlmQueue/Strategy/MaxMemoryStrategy.php @@ -0,0 +1,77 @@ +maxMemory = (int) $maxMemory; + } + + /** + * @return int + */ + public function getMaxMemory() + { + return $this->maxMemory; + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events, $priority = 1) + { + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_IDLE, + array($this, 'onStopConditionCheck'), + $priority + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onStopConditionCheck'), + -1000 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_STATE, + array($this, 'onReportQueueState'), + $priority + ); + } + + public function onStopConditionCheck(WorkerEvent $event) + { + if ($this->maxMemory && memory_get_usage() > $this->maxMemory) { + $event->exitWorkerLoop(); + + $this->state = sprintf( + "memory threshold of %s exceeded (usage: %s)", + $this->humanFormat($this->maxMemory), + $this->humanFormat(memory_get_usage()) + ); + } else { + $this->state = sprintf('%s memory usage', $this->humanFormat(memory_get_usage())); + } + } + + /** + * @param $bytes bytes to be formatted + * @return string human readable + */ + private function humanFormat($bytes) + { + $units = array('b','kB','MB','GB','TB','PB'); + return @round($bytes/pow(1024, ($i=floor(log($bytes, 1024)))), 2) . $units[$i]; + } +} diff --git a/src/SlmQueue/Strategy/MaxRunsStrategy.php b/src/SlmQueue/Strategy/MaxRunsStrategy.php new file mode 100644 index 0000000..7db72f0 --- /dev/null +++ b/src/SlmQueue/Strategy/MaxRunsStrategy.php @@ -0,0 +1,70 @@ +maxRuns = $maxRuns; + } + + /** + * @return int + */ + public function getMaxRuns() + { + return $this->maxRuns; + } + + /** + * {@inheritDoc} + */ + public function attach(EventManagerInterface $events, $priority = 1) + { + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onStopConditionCheck'), + -1000 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS_STATE, + array($this, 'onReportQueueState'), + $priority + ); + } + + public function onStopConditionCheck(WorkerEvent $event) + { + $this->runCount++; + + if ($this->maxRuns && $this->runCount >= $this->maxRuns) { + $event->exitWorkerLoop(); + + $this->state = sprintf('maximum of %s jobs processed', $this->runCount); + } else { + $this->state = sprintf('%s jobs processed', $this->runCount); + } + } +} diff --git a/src/SlmQueue/Strategy/ProcessQueueStrategy.php b/src/SlmQueue/Strategy/ProcessQueueStrategy.php new file mode 100644 index 0000000..84a7b86 --- /dev/null +++ b/src/SlmQueue/Strategy/ProcessQueueStrategy.php @@ -0,0 +1,61 @@ +listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onJobPop'), + $priority + 1 + ); + $this->listeners[] = $events->attach( + WorkerEvent::EVENT_PROCESS, + array($this, 'onJobProcess'), + $priority + ); + } + + public function onJobPop(WorkerEvent $e) + { + $queue = $e->getQueue(); + $options = $e->getOptions(); + $job = $queue->pop($options); + + // The queue may return null, for instance if a timeout was set + if (!$job instanceof JobInterface) { + /** @var AbstractWorker $worker */ + $worker = $e->getTarget(); + + $worker->getEventManager()->trigger(WorkerEvent::EVENT_PROCESS_IDLE, $e); + + // make sure the event doesn't propagate or it will still process + $e->stopPropagation(); + + return; + } + + $e->setJob($job); + } + + public function onJobProcess(WorkerEvent $e) + { + $job = $e->getJob(); + $queue = $e->getQueue(); + /** @var AbstractWorker $worker */ + $worker = $e->getTarget(); + + $result = $worker->processJob($job, $queue); + $e->setResult($result); + } +} diff --git a/src/SlmQueue/Strategy/StrategyPluginManager.php b/src/SlmQueue/Strategy/StrategyPluginManager.php new file mode 100644 index 0000000..34acf6d --- /dev/null +++ b/src/SlmQueue/Strategy/StrategyPluginManager.php @@ -0,0 +1,34 @@ +options = $options; + $eventManager->setIdentifiers(array( + __CLASS__, + get_called_class(), + 'SlmQueue\Worker\WorkerInterface' + )); - // Listen to the signals SIGTERM and SIGINT so that the worker can be killed properly. Note that - // because pcntl_signal may not be available on Windows, we needed to check for the existence of the function - if (function_exists('pcntl_signal')) { - declare(ticks = 1); - pcntl_signal(SIGTERM, array($this, 'handleSignal')); - pcntl_signal(SIGINT, array($this, 'handleSignal')); - } + $this->eventManager = $eventManager; } /** @@ -53,119 +32,28 @@ public function __construct(WorkerOptions $options) */ public function processQueue(QueueInterface $queue, array $options = array()) { - $eventManager = $this->getEventManager(); - $count = 0; + $eventManager = $this->eventManager; + $workerEvent = new WorkerEvent($this, $queue); - $workerEvent = new WorkerEvent($queue); - $eventManager->trigger(WorkerEvent::EVENT_PROCESS_QUEUE_PRE, $workerEvent); + $workerEvent->setOptions($options); - while (true) { - // Check for external stop condition - if ($this->isStopped()) { - break; - } + $eventManager->trigger(WorkerEvent::EVENT_BOOTSTRAP, $workerEvent); - $job = $queue->pop($options); - - // The queue may return null, for instance if a timeout was set - if (!$job instanceof JobInterface) { - // Check for internal stop condition - if ($this->isMaxMemoryExceeded()) { - break; - } - continue; - } - - $workerEvent->setJob($job); - $workerEvent->setResult(WorkerEvent::JOB_STATUS_UNKNOWN); - - $eventManager->trigger(WorkerEvent::EVENT_PROCESS_JOB_PRE, $workerEvent); - - $result = $this->processJob($job, $queue); - $count++; - - $workerEvent->setResult($result); - $eventManager->trigger(WorkerEvent::EVENT_PROCESS_JOB_POST, $workerEvent); - - // Check for internal stop condition - if ($this->isMaxRunsReached($count) || $this->isMaxMemoryExceeded()) { - break; - } + while (!$workerEvent->shouldExitWorkerLoop()) { + $eventManager->trigger(WorkerEvent::EVENT_PROCESS, $workerEvent); } - $eventManager->trigger(WorkerEvent::EVENT_PROCESS_QUEUE_POST, $workerEvent); + $eventManager->trigger(WorkerEvent::EVENT_FINISH, $workerEvent); - return $count; - } + $queueState = $eventManager->trigger(WorkerEvent::EVENT_PROCESS_STATE, $workerEvent); - /** - * {@inheritDoc} - */ - public function setEventManager(EventManagerInterface $eventManager) - { - $eventManager->setIdentifiers(array( - get_called_class(), - 'SlmQueue\Worker\WorkerInterface' - )); + $queueState = array_filter(ArrayUtils::iteratorToArray($queueState)); - $this->eventManager = $eventManager; + return $queueState; } - /** - * {@inheritDoc} - */ public function getEventManager() { - if (null === $this->eventManager) { - $this->setEventManager(new EventManager()); - } - return $this->eventManager; } - - /** - * Check if the script has been stopped from a signal - * - * @return bool - */ - public function isStopped() - { - return $this->stopped; - } - - /** - * Did worker exceed the threshold for memory usage? - * - * @return bool - */ - public function isMaxMemoryExceeded() - { - return memory_get_usage() > $this->options->getMaxMemory(); - } - - /** - * Is the worker about to exceed the threshold for the number of jobs allowed to run? - * - * @param $count current count of executed jobs - * @return bool - */ - public function isMaxRunsReached($count) - { - return $count >= $this->options->getMaxRuns(); - } - - /** - * Handle the signal - * - * @param int $signo - */ - public function handleSignal($signo) - { - switch($signo) { - case SIGTERM: - case SIGINT: - $this->stopped = true; - break; - } - } } diff --git a/src/SlmQueue/Worker/WorkerEvent.php b/src/SlmQueue/Worker/WorkerEvent.php index 70a1198..b02912a 100644 --- a/src/SlmQueue/Worker/WorkerEvent.php +++ b/src/SlmQueue/Worker/WorkerEvent.php @@ -12,12 +12,13 @@ class WorkerEvent extends Event { /** - * Various events you can listen to + * Various events you can subscribe to */ - const EVENT_PROCESS_QUEUE_PRE = 'processQueue.pre'; - const EVENT_PROCESS_QUEUE_POST = 'processQueue.post'; - const EVENT_PROCESS_JOB_PRE = 'processJob.pre'; - const EVENT_PROCESS_JOB_POST = 'processJob.post'; + const EVENT_BOOTSTRAP = 'bootstrap'; + const EVENT_FINISH = 'finish'; + const EVENT_PROCESS_IDLE = 'idle'; + const EVENT_PROCESS_STATE = 'state'; + const EVENT_PROCESS = 'process'; /** * Status for unstarted jobs @@ -28,12 +29,12 @@ class WorkerEvent extends Event * Status for successfully finished job */ const JOB_STATUS_SUCCESS = 1; - + /** * Status for job that has failed and cannot be processed again */ const JOB_STATUS_FAILURE = 2; - + /** * Status for job that has failed but can be processed again */ @@ -55,11 +56,25 @@ class WorkerEvent extends Event */ protected $result; + /** + * Flag indicating we want to exit on the next available occasion + * @var bool + */ + protected $exitWorker = false; + + /** + * Array of options + * @var array + */ + protected $options = array(); + /** * @param QueueInterface $queue */ - public function __construct(QueueInterface $queue) + public function __construct(WorkerInterface $target, QueueInterface $queue) { + $this->setTarget($target); + $this->queue = $queue; } @@ -70,6 +85,7 @@ public function __construct(QueueInterface $queue) public function setJob(JobInterface $job) { $this->job = $job; + $this->setResult(self::JOB_STATUS_UNKNOWN); } /** @@ -103,4 +119,36 @@ public function getResult() { return $this->result; } + + /** + * @param boolean $exitWorker + */ + public function exitWorkerLoop() + { + $this->exitWorker = true; + } + + /** + * @return boolean + */ + public function shouldExitWorkerLoop() + { + return $this->exitWorker; + } + + /** + * @param array $options + */ + public function setOptions(array $options) + { + $this->options = $options; + } + + /** + * @return array + */ + public function getOptions() + { + return $this->options; + } } diff --git a/src/SlmQueue/Worker/WorkerInterface.php b/src/SlmQueue/Worker/WorkerInterface.php index 81f0542..d14fc5b 100644 --- a/src/SlmQueue/Worker/WorkerInterface.php +++ b/src/SlmQueue/Worker/WorkerInterface.php @@ -17,7 +17,7 @@ interface WorkerInterface * * @param QueueInterface $queue * @param array $options - * @return int How many jobs were processed + * @return array description of exit states from strategies that report it */ public function processQueue(QueueInterface $queue, array $options = array()); diff --git a/tests/SlmQueueTest/Asset/SimpleJob.php b/tests/SlmQueueTest/Asset/SimpleJob.php index d9e1216..5e3ce3f 100644 --- a/tests/SlmQueueTest/Asset/SimpleJob.php +++ b/tests/SlmQueueTest/Asset/SimpleJob.php @@ -13,5 +13,7 @@ public function execute() { // Just set some stupid metadata $this->setMetadata('foo', 'bar'); + + return 'result'; } } diff --git a/tests/SlmQueueTest/Asset/SimpleQueue.php b/tests/SlmQueueTest/Asset/SimpleQueue.php index 6a63ff8..f9fa57d 100644 --- a/tests/SlmQueueTest/Asset/SimpleQueue.php +++ b/tests/SlmQueueTest/Asset/SimpleQueue.php @@ -27,6 +27,10 @@ public function push(JobInterface $job, array $options = array()) public function pop(array $options = array()) { $payload = array_pop($this->jobs); + if (!$payload) { + return; + } + return $this->unserializeJob($payload); } diff --git a/tests/SlmQueueTest/Asset/SimpleWorker.php b/tests/SlmQueueTest/Asset/SimpleWorker.php index e6a1828..55cfc98 100644 --- a/tests/SlmQueueTest/Asset/SimpleWorker.php +++ b/tests/SlmQueueTest/Asset/SimpleWorker.php @@ -5,9 +5,19 @@ use SlmQueue\Job\JobInterface; use SlmQueue\Queue\QueueInterface; use SlmQueue\Worker\AbstractWorker; +use Zend\EventManager\EventManager; +use Zend\EventManager\EventManagerInterface; class SimpleWorker extends AbstractWorker { + public function __construct(EventManagerInterface $eventManager = null) + { + if (null === $eventManager) { + $eventManager = new EventManager; + } + parent::__construct($eventManager); + } + public function processJob(JobInterface $job, QueueInterface $queue) { return $job->execute(); diff --git a/tests/SlmQueueTest/Controller/AbstractControllerTest.php b/tests/SlmQueueTest/Controller/AbstractControllerTest.php index e3c3d21..a93a827 100644 --- a/tests/SlmQueueTest/Controller/AbstractControllerTest.php +++ b/tests/SlmQueueTest/Controller/AbstractControllerTest.php @@ -3,8 +3,10 @@ namespace SlmQueueTest\Controller; use PHPUnit_Framework_TestCase as TestCase; -use SlmQueue\Options\WorkerOptions; use SlmQueue\Queue\QueuePluginManager; +use SlmQueue\Strategy\MaxRunsStrategy; +use SlmQueue\Strategy\ProcessQueueStrategy; +use SlmQueue\Worker\WorkerEvent; use SlmQueueTest\Asset\FailingJob; use SlmQueueTest\Asset\SimpleController; use SlmQueueTest\Asset\SimpleJob; @@ -25,9 +27,13 @@ class AbstractControllerTest extends TestCase */ protected $controller; + public function setUp() { - $worker = new SimpleWorker(new WorkerOptions()); + $worker = new SimpleWorker(); + + $worker->getEventManager()->attachAggregate(new ProcessQueueStrategy()); + $worker->getEventManager()->attachAggregate(new MaxRunsStrategy(array('max_runs' => 1))); $config = new Config(array( 'factories' => array( 'knownQueue' => 'SlmQueueTest\Asset\SimpleQueueFactory' @@ -56,7 +62,9 @@ public function testSimpleJob() $routeMatch = new RouteMatch(array('queue' => 'knownQueue')); $this->controller->getEvent()->setRouteMatch($routeMatch); - $this->assertContains("Finished worker for queue 'knownQueue' with 1 jobs", $this->controller->processAction()); + $result = $this->controller->processAction(); + $this->assertContains("Finished worker for queue 'knownQueue'", $result); + $this->assertContains("maximum of 1 jobs processed", $result); } public function testFailingJobThrowException() diff --git a/tests/SlmQueueTest/Options/WorkerOptionsTest.php b/tests/SlmQueueTest/Options/WorkerOptionsTest.php deleted file mode 100644 index c7ff0da..0000000 --- a/tests/SlmQueueTest/Options/WorkerOptionsTest.php +++ /dev/null @@ -1,40 +0,0 @@ -serviceManager = ServiceManagerFactory::getServiceManager(); - } - - public function testCanRetrieveWorkerOptionsWithServiceManager() - { - $workerOptions = $this->serviceManager->get('SlmQueue\Options\WorkerOptions'); - $this->assertInstanceOf('SlmQueue\Options\WorkerOptions', $workerOptions); - } - - public function testGettersAndSetters() - { - $workerOptions = new WorkerOptions(array( - 'max_runs' => 10, - 'max_memory' => 1000 - )); - - $this->assertInstanceOf('SlmQueue\Options\WorkerOptions', $workerOptions); - $this->assertEquals(10, $workerOptions->getMaxRuns()); - $this->assertEquals(1000, $workerOptions->getMaxMemory()); - } -} diff --git a/tests/SlmQueueTest/Queue/QueueAwareTraitTest.php b/tests/SlmQueueTest/Queue/QueueAwareTraitTest.php index 59b93c1..a5be486 100644 --- a/tests/SlmQueueTest/Queue/QueueAwareTraitTest.php +++ b/tests/SlmQueueTest/Queue/QueueAwareTraitTest.php @@ -26,11 +26,13 @@ public function setUp() $this->job = new QueueAwareTraitJob(); } - public function testDefaultGetter() { + public function testDefaultGetter() + { $this->assertNull($this->job->getQueue()); } - public function testSetter() { + public function testSetter() + { $queue = new SimpleQueue('name', new JobPluginManager()); $this->job->setQueue($queue); diff --git a/tests/SlmQueueTest/Queue/QueueTest.php b/tests/SlmQueueTest/Queue/QueueTest.php index bd33274..f067971 100644 --- a/tests/SlmQueueTest/Queue/QueueTest.php +++ b/tests/SlmQueueTest/Queue/QueueTest.php @@ -1,6 +1,6 @@ getMock('SlmQueue\Job\JobPluginManager'); + $queue = $this->getMock( + 'SlmQueue\Queue\AbstractQueue', + array(), + array('queueName', $jobPluginManager) + ); + $strategyPluginManager = $this->getMock('SlmQueue\Strategy\StrategyPluginManager'); + $eventManager = $this->getMock('Zend\EventManager\EventManager'); + $worker = $this->getMock('SlmQueue\Worker\AbstractWorker', array(), array($eventManager)); + $strategyMock = $this->getMock('SlmQueue\Strategy\AbstractStrategy'); + + $queue->expects($this->any())->method('getName')->will($this->returnValue('queueName')); + $worker->expects($this->any())->method('getEventManager')->will($this->returnValue($eventManager)); + $strategyPluginManager->expects($this->any())->method('get')->will($this->returnValue($strategyMock)); + + $event = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $event->setJob($job); + + $this->listener = new AttachQueueListenersStrategy($strategyPluginManager, array('queueName' => array( + 'SlmQueue\Strategy\SomeStrategy', + ))); + + $this->event = $event; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_BOOTSTRAP, array($this->listener, 'attachQueueListeners')); + + $this->listener->attach($evm); + } + + public function testAttachQueueListenersDetachedSelfFromEventManager() { + + $workerMock = $this->event->getTarget(); + $eventManagerMock = $workerMock->getEventManager(); + $eventManagerMock->expects($this->once())->method('detachAggregate')->with($this->listener); + $eventManagerMock->expects($this->any())->method('getEvents')->will($this->returnValue(array(WorkerEvent::EVENT_PROCESS))); + + $this->listener->attachQueueListeners($this->event); + } + + public function testAttachQueueListenersHaltsQueueNameIsNotInStrategyConfig() { + $class = new \ReflectionClass('SlmQueue\Strategy\AttachQueueListenersStrategy'); + $property = $class->getProperty('strategyConfig'); + $property->setAccessible(true); + + $property->setValue($this->listener, array('unknownQueueName' => array( + 'SlmQueue\Strategy\SomeStrategy', + ))); + + $this->isNull($this->listener->attachQueueListeners($this->event)); + } + + public function testAttachQueueListenersStrategyConfig() { + $workerMock = $this->event->getTarget(); + $eventManagerMock = $workerMock->getEventManager(); + $eventManagerMock->expects($this->any())->method('getEvents')->will($this->returnValue(array(WorkerEvent::EVENT_PROCESS))); + + $class = new \ReflectionClass('SlmQueue\Strategy\AttachQueueListenersStrategy'); + $property = $class->getProperty('strategyConfig'); + $property->setAccessible(true); + + $property->setValue($this->listener, array('queueName' => array( + 'SlmQueue\Strategy\SomeStrategy', + 'SlmQueue\Strategy\OtherStrategy' => array('priority' => 3), + 'SlmQueue\Strategy\FinalStrategy' => array('foo' => 'bar'), + 'SlmQueue\Strategy\SomeStrategy' => 'not_an_array', + ))); + + $property = $class->getProperty('pluginManager'); + $property->setAccessible(true); + + $pluginManagerMock = $property->getValue($this->listener); + + $pluginManagerMock->expects($this->at(0))->method('get')->with('SlmQueue\Strategy\SomeStrategy', array()); + $pluginManagerMock->expects($this->at(1))->method('get')->with('SlmQueue\Strategy\OtherStrategy', array()); // priority is removed + $pluginManagerMock->expects($this->at(2))->method('get')->with('SlmQueue\Strategy\FinalStrategy', array('foo' => 'bar')); + + $strategyMock = $this->getMock('SlmQueue\Strategy\AbstractStrategy'); + + $eventManagerMock->expects($this->at(1))->method('attachAggregate')->with($strategyMock); + $eventManagerMock->expects($this->at(2))->method('attachAggregate')->with($strategyMock, 3); + $eventManagerMock->expects($this->at(3))->method('attachAggregate')->with($strategyMock); + + $this->listener->attachQueueListeners($this->event); + } + + public function testAttachQueueListenersThrowsExceptionWhenNoListenersHaveBeenAttachedListeningToWorkerEventProcess() + { + $workerMock = $this->event->getTarget(); + $eventManagerMock = $workerMock->getEventManager(); + $eventManagerMock->expects($this->any())->method('getEvents')->will($this->returnValue(array(WorkerEvent::EVENT_PROCESS_IDLE))); + + $this->setExpectedException('SlmQueue\Exception\RunTimeException'); + $this->listener->attachQueueListeners($this->event); + } +} diff --git a/tests/SlmQueueTest/Strategy/Factory/LogJobFactoryTest.php b/tests/SlmQueueTest/Strategy/Factory/LogJobFactoryTest.php new file mode 100644 index 0000000..1dc94df --- /dev/null +++ b/tests/SlmQueueTest/Strategy/Factory/LogJobFactoryTest.php @@ -0,0 +1,26 @@ +setServiceLocator($sm); + + $factory = new LogJobStrategyFactory(); + $strategy = $factory->createService($plugin); + + $this->assertInstanceOf('SlmQueue\Strategy\LogJobStrategy', $strategy); + } + +} diff --git a/tests/SlmQueueTest/Strategy/FileWatchStrategyTest.php b/tests/SlmQueueTest/Strategy/FileWatchStrategyTest.php new file mode 100644 index 0000000..e1b719f --- /dev/null +++ b/tests/SlmQueueTest/Strategy/FileWatchStrategyTest.php @@ -0,0 +1,139 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + $worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); + + $ev = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $ev->setJob($job); + + $this->listener = new FileWatchStrategy(); + $this->event = $ev; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_IDLE, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(1))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(2))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_STATE, array($this->listener, 'onReportQueueState')); + + $this->listener->attach($evm); + } + + public function testPatternDefault() + { + // standard zf2 application php and phtml files + $this->assertTrue($this->listener->getPattern() == '/^\.\/(config|module).*\.(php|phtml)$/'); + } + + public function testFilesGetterReturnEmptyArrayByDefault() + { + // standard zf2 application php and phtml files + $this->assertEmpty($this->listener->getFiles()); + } + + public function testSettingAPatternWillResetFilesToEmpty() + { + $this->listener->setPattern('/^anything$/'); + $this->assertEmpty($this->listener->getFiles()); + } + + public function testSettingPatternNullifiesCurrentListOfFilesToWatch() + { + // builds a file list + $this->listener->onStopConditionCheck($this->event); + $this->assertNotEmpty($this->listener->getFiles()); + + $this->listener->setPattern('/^$/'); + + $this->assertTrue($this->listener->getPattern() == '/^$/'); + $this->assertCount(0, $this->listener->getFiles()); + } + + public function testCanFileFilesByPattern() + { + // builds a file list + if (!is_dir('tests/build')) { + mkdir('tests/build', 0755, true); + } + file_put_contents('tests/build/filewatch.txt', 'hi'); + + $this->listener->setPattern('/^\.\/(tests\/build).*\.(txt)$/'); + $this->listener->onStopConditionCheck($this->event); + + $this->assertCount(1, $this->listener->getFiles()); + } + + public function testWatchedFileChangeStopsPropagation() + { + // builds a file list + if (!is_dir('tests/build')) { + mkdir('tests/build', 0755, true); + } + file_put_contents('tests/build/filewatch.txt', 'hi'); + + $this->listener->setPattern('/^\.\/(tests\/build).*\.(txt)$/'); + $this->listener->onStopConditionCheck($this->event); + + $this->assertCount(1, $this->listener->getFiles()); + + file_put_contents('tests/build/filewatch.txt', 'hello'); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('file modification detected for', $this->listener->onReportQueueState($this->event)); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } + + public function testWatchedFileRemovedStopsPropagation() + { + // builds a file list + if (!is_dir('tests/build')) { + mkdir('tests/build', 0755, true); + } + file_put_contents('tests/build/filewatch.txt', 'hi'); + + $this->listener->setPattern('/^\.\/(tests\/build).*\.(txt)$/'); + $this->listener->onStopConditionCheck($this->event); + + unlink('tests/build/filewatch.txt'); + + $this->listener->onStopConditionCheck($this->event); + + $this->assertContains('file modification detected for', $this->listener->onReportQueueState($this->event)); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } +} diff --git a/tests/SlmQueueTest/Strategy/InterruptStrategyTest.php b/tests/SlmQueueTest/Strategy/InterruptStrategyTest.php new file mode 100644 index 0000000..83a3fe2 --- /dev/null +++ b/tests/SlmQueueTest/Strategy/InterruptStrategyTest.php @@ -0,0 +1,80 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + $worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); + + $ev = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $ev->setJob($job); + + $this->listener = new InterruptStrategy(); + $this->event = $ev; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_IDLE, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(1))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(2))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_STATE, array($this->listener, 'onReportQueueState')); + + $this->listener->attach($evm); + } + + public function testOnStopConditionCheckHandler_NoSignal() + { + $this->listener->onStopConditionCheck($this->event); + $this->assertFalse($this->listener->onReportQueueState($this->event)); + $this->assertFalse($this->event->shouldExitWorkerLoop()); + + } + + public function testOnStopConditionCheckHandler_SIGTERM() + { + $this->listener->onPCNTLSignal(SIGTERM); + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('interrupt by an external signal', $this->listener->onReportQueueState($this->event)); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } + + public function testOnStopConditionCheckHandler_SIGINT() + { + $this->listener->onPCNTLSignal(SIGTERM); + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('interrupt by an external signal', $this->listener->onReportQueueState($this->event)); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } +} diff --git a/tests/SlmQueueTest/Strategy/LogJobTest.php b/tests/SlmQueueTest/Strategy/LogJobTest.php new file mode 100644 index 0000000..89a8574 --- /dev/null +++ b/tests/SlmQueueTest/Strategy/LogJobTest.php @@ -0,0 +1,105 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + $worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); + + $ev = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $ev->setJob($job); + + $this->console = $this->getMock('Zend\Console\Adapter\AdapterInterface'); + $this->listener = new LogJobStrategy($this->console); + $this->event = $ev; + } + + public function tearDown() + { + + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onLogJobProcessStart')); + $evm->expects($this->at(1))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onLogJobProcessDone')); + + $this->listener->attach($evm); + } + + public function testOnLogJobProcessStart_SendsOutputToConsole() + { + $this->console->expects($this->once())->method('write') + ->with('Processing job SlmQueueTest\Asset\SimpleJob...'); + + $this->listener->onLogJobProcessStart($this->event); + } + public function testOnLogJobProcessStart_DoesNotGenerateState() + { + $this->listener->onLogJobProcessStart($this->event); + + $this->assertFalse($this->listener->onReportQueueState($this->event)); + } + public function testOnLogJobProcessStart_DoesNotHaltPropagation() + { + $this->listener->onLogJobProcessStart($this->event); + + $this->assertFalse($this->event->shouldExitWorkerLoop()); + } + + public function testOnLogJobProcessDone_SendsOutputToConsole() + { + $this->console->expects($this->once())->method('writeLine') + ->with('Done!'); + + $this->listener->onLogJobProcessDone($this->event); + } + public function testOnLogJobProcessDone_DoesNotGenerateState() + { + $this->listener->onLogJobProcessDone($this->event); + + $this->assertFalse($this->listener->onReportQueueState($this->event)); + } + public function testOnLogJobProcessDone_DoesNotHaltPropagation() + { + $this->listener->onLogJobProcessDone($this->event); + + $this->assertFalse($this->event->shouldExitWorkerLoop()); + } +} diff --git a/tests/SlmQueueTest/Strategy/MaxMemoryStrategyTest.php b/tests/SlmQueueTest/Strategy/MaxMemoryStrategyTest.php new file mode 100644 index 0000000..1abdb56 --- /dev/null +++ b/tests/SlmQueueTest/Strategy/MaxMemoryStrategyTest.php @@ -0,0 +1,89 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + $worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); + + $ev = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $ev->setJob($job); + + $this->listener = new MaxMemoryStrategy(); + $this->event = $ev; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testMaxMemoryDefault() + { + $this->assertTrue($this->listener->getMaxMemory() == 0); + } + + public function testMaxMemorySetter() + { + $this->listener->setMaxMemory(1024*25); + + $this->assertTrue($this->listener->getMaxMemory() == 1024*25); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_IDLE, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(1))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(2))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_STATE, array($this->listener, 'onReportQueueState')); + + $this->listener->attach($evm); + } + + public function testContinueWhileThresholdNotExceeded() + { + $this->listener->setMaxMemory(1024*1024*1000); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('memory usage', $this->listener->onReportQueueState($this->event)); + $this->assertFalse($this->event->shouldExitWorkerLoop()); + } + + public function testRequestStopWhileThresholdExceeded() + { + $this->listener->setMaxMemory(1024); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains( + 'memory threshold of 1kB exceeded (usage: ', + $this->listener->onReportQueueState($this->event) + ); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } +} diff --git a/tests/SlmQueueTest/Strategy/MaxRunsStrategyTest.php b/tests/SlmQueueTest/Strategy/MaxRunsStrategyTest.php new file mode 100644 index 0000000..0fb6500 --- /dev/null +++ b/tests/SlmQueueTest/Strategy/MaxRunsStrategyTest.php @@ -0,0 +1,83 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + $worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); + + $ev = new WorkerEvent($worker, $queue); + $job = new SimpleJob(); + + $ev->setJob($job); + + $this->listener = new MaxRunsStrategy(); + $this->event = $ev; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testMaxRunsDefault() + { + $this->assertTrue($this->listener->getMaxRuns() == 0); + } + + public function testMaxRunsSetter() + { + $this->listener->setMaxRuns(2); + + $this->assertTrue($this->listener->getMaxRuns() == 2); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $evm->expects($this->at(0))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onStopConditionCheck')); + $evm->expects($this->at(1))->method('attach') + ->with(WorkerEvent::EVENT_PROCESS_STATE, array($this->listener, 'onReportQueueState')); + + $this->listener->attach($evm); + } + + public function testOnStopConditionCheckHandler() + { + $this->listener->setMaxRuns(3); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('1 jobs processed', $this->listener->onReportQueueState($this->event)); + $this->assertFalse($this->event->shouldExitWorkerLoop()); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('2 jobs processed', $this->listener->onReportQueueState($this->event)); + $this->assertFalse($this->event->shouldExitWorkerLoop()); + + $this->listener->onStopConditionCheck($this->event); + $this->assertContains('maximum of 3 jobs processed', $this->listener->onReportQueueState($this->event)); + $this->assertTrue($this->event->shouldExitWorkerLoop()); + } +} diff --git a/tests/SlmQueueTest/Strategy/ProcessQueueStrategyTest.php b/tests/SlmQueueTest/Strategy/ProcessQueueStrategyTest.php new file mode 100644 index 0000000..c24ba82 --- /dev/null +++ b/tests/SlmQueueTest/Strategy/ProcessQueueStrategyTest.php @@ -0,0 +1,105 @@ +getMockBuilder('SlmQueue\Queue\AbstractQueue') + ->disableOriginalConstructor() + ->getMock(); + + $worker = new SimpleWorker(); + + $event = new WorkerEvent($worker, $queue); + $this->job = new SimpleJob(); + $event->setOptions(array('foo' => 'bar')); + $event->setJob($this->job); + + $this->listener = new ProcessQueueStrategy(); + $this->event = $event; + } + + public function testListenerInstanceOfAbstractStrategy() + { + $this->assertInstanceOf('SlmQueue\Strategy\AbstractStrategy', $this->listener); + } + + public function testListensToCorrectEvents() + { + $evm = $this->getMock('Zend\EventManager\EventManagerInterface'); + + $priority = 1; + + $evm->expects($this->at(0)) + ->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onJobPop'), $priority + 1); + $evm->expects($this->at(1)) + ->method('attach') + ->with(WorkerEvent::EVENT_PROCESS, array($this->listener, 'onJobProcess'), $priority); + + $this->listener->attach($evm, $priority); + } + + public function testOnJobPopHandler() + { + $this->listener->onJobPop($this->event); + $this->assertFalse($this->event->shouldExitWorkerLoop()); + } + + public function testOnJobPopPopsFromQueueWithOptions() + { + $this->event->getQueue() + ->expects($this->once()) + ->method('pop') + ->with(array('foo' => 'bar')) + ->will($this->returnValue($this->job)); + + $this->listener->onJobPop($this->event); + } + + public function testOnJobPopPopsTriggersIdleAndStopPropagation() + { + $this->event->getQueue() + ->expects($this->once()) + ->method('pop') + ->will($this->returnValue(null)); + + $called = false; + $this->event->getTarget()->getEventManager()->attach( + WorkerEvent::EVENT_PROCESS_IDLE, + function(WorkerEvent $e) use (&$called) { + $called = true; + } + ); + + $this->listener->onJobPop($this->event); + + $this->assertTrue($called); + $this->assertTrue($this->event->propagationIsStopped()); + } + + public function testOnJobProcessHandlerEventGetsJobResult() + { + $this->listener->onJobProcess($this->event); + $this->assertTrue($this->event->getResult() == 'result'); + } + +} \ No newline at end of file diff --git a/tests/SlmQueueTest/Worker/AbstractWorkerTest.php b/tests/SlmQueueTest/Worker/AbstractWorkerTest.php index 640ab16..26c008d 100644 --- a/tests/SlmQueueTest/Worker/AbstractWorkerTest.php +++ b/tests/SlmQueueTest/Worker/AbstractWorkerTest.php @@ -3,8 +3,10 @@ namespace SlmQueueTest\Worker; use PHPUnit_Framework_TestCase as TestCase; -use SlmQueue\Options\WorkerOptions; +use SlmQueue\Strategy\InterruptStrategy; +use SlmQueue\Strategy\ProcessQueueStrategy; use SlmQueue\Worker\WorkerEvent; +use SlmQueue\Strategy\MaxRunsStrategy; use SlmQueueTest\Asset\SimpleWorker; use Zend\EventManager\EventManager; @@ -14,78 +16,14 @@ class AbstractWorkerTest extends TestCase public function setUp() { - $options = new WorkerOptions; - $options->setMaxRuns(1); - $options->setMaxMemory(1024*1024*1024); - - $this->options = $options; - $this->worker = new SimpleWorker($options); + $this->worker = new SimpleWorker; $this->queue = $this->getMock('SlmQueue\Queue\QueueInterface'); $this->job = $this->getMock('SlmQueue\Job\JobInterface'); - } - public function testWorkerPopsFromQueue() - { - $this->queue->expects($this->once()) - ->method('pop') - ->will($this->returnValue($this->job)); - - $this->worker->processQueue($this->queue); - } - - public function testWorkerExecutesJob() - { - $this->queue->expects($this->once()) - ->method('pop') - ->will($this->returnValue($this->job)); - - $this->job->expects($this->once()) - ->method('execute'); - - $this->worker->processQueue($this->queue); - } - - public function testWorkerCountsRuns() - { - $this->options->setMaxRuns(2); - - $this->queue->expects($this->exactly(2)) - ->method('pop') - ->will($this->returnValue($this->job)); - $this->worker->processQueue($this->queue); - } - - public function testWorkerSkipsVoidValuesFromQueue() - { - $i = 0; - $job = $this->job; - $callback = function () use (&$i, $job) { - // We return the job on the 4th call - if ($i === 3) { - return $job; - } - - $i++; - return null; - }; - - $this->options->setMaxRuns(1); - $this->queue->expects($this->exactly(4)) - ->method('pop') - ->will($this->returnCallback($callback)); - - $count = $this->worker->processQueue($this->queue); - $this->assertEquals(1, $count); - } - - public function testWorkerMaxMemory() - { - $this->options->setMaxMemory(1); - - $this->queue->expects($this->exactly(1)) - ->method('pop'); - - $this->assertTrue($this->worker->processQueue($this->queue) === 0); + // set max runs so our tests won't run forever + $this->maxRuns = new MaxRunsStrategy; + $this->maxRuns->setMaxRuns(1); + $this->worker->getEventManager()->attach($this->maxRuns); } public function testCorrectIdentifiersAreSetToEventManager() @@ -96,111 +34,89 @@ public function testCorrectIdentifiersAreSetToEventManager() $this->assertContains('SlmQueueTest\Asset\SimpleWorker', $eventManager->getIdentifiers()); } - public function testEventManagerTriggersEvents() + /** + * @dataProvider providerWorkerLoopEvents + */ + public function testWorkerLoopEvents($exitedBy, $exitAfter, $expectedCalledEvents) { - $eventManager = $this->getMock('Zend\EventManager\EventManagerInterface'); - $this->worker->setEventManager($eventManager); - - $this->queue->expects($this->once()) - ->method('pop') - ->will($this->returnValue($this->job)); - - // Trigger will be called 4: one for process queue pre, post, and process job pre, post + $this->worker = new SimpleWorker(); - $eventManager->expects($this->exactly(4)) - ->method('trigger'); - - $eventManager->expects($this->at(0)) - ->method('trigger') - ->with($this->equalTo(WorkerEvent::EVENT_PROCESS_QUEUE_PRE)); - - $eventManager->expects($this->at(1)) - ->method('trigger') - ->with($this->equalTo(WorkerEvent::EVENT_PROCESS_JOB_PRE)); + /** @var EventManager $eventManager */ + $eventManager = $this->worker->getEventManager(); - $eventManager->expects($this->at(2)) - ->method('trigger') - ->with($this->equalTo(WorkerEvent::EVENT_PROCESS_JOB_POST)); + $this->exitedBy = $exitedBy; + $this->exitAfter = $exitAfter; + $this->actualCalled = array(); - $eventManager->expects($this->at(3)) - ->method('trigger') - ->with($this->equalTo(WorkerEvent::EVENT_PROCESS_QUEUE_POST)); + $eventManager->attach(WorkerEvent::EVENT_BOOTSTRAP, array($this, 'callbackWorkerLoopEvents')); + $eventManager->attach(WorkerEvent::EVENT_FINISH, array($this, 'callbackWorkerLoopEvents')); + $eventManager->attach(WorkerEvent::EVENT_PROCESS_IDLE, array($this, 'callbackWorkerLoopEvents')); + $eventManager->attach(WorkerEvent::EVENT_PROCESS, array($this, 'callbackWorkerLoopEvents')); + $eventManager->attach(WorkerEvent::EVENT_PROCESS_STATE, array($this, 'callbackWorkerLoopEvents')); $this->worker->processQueue($this->queue); - } - - public function testWorkerSetsJobStatusInEventClass() - { - $eventManager = new EventManager; - $this->worker->setEventManager($eventManager); - - $this->job->expects($this->once()) - ->method('execute') - ->will($this->returnValue(WorkerEvent::JOB_STATUS_SUCCESS)); - - $this->queue->expects($this->once()) - ->method('pop') - ->will($this->returnValue($this->job)); - - $self = $this; - $eventManager->attach(WorkerEvent::EVENT_PROCESS_JOB_POST, function ($e) use ($self) { - $self->assertEquals(WorkerEvent::JOB_STATUS_SUCCESS, $e->getResult()); - }); - $this->worker->processQueue($this->queue); + $this->assertEquals($expectedCalledEvents, $this->actualCalled); } - public function testMethod_hasMemoryExceeded() - { - $this->options->setMaxMemory(10000000000); - $this->assertFalse($this->worker->isMaxMemoryExceeded()); - - $this->options->setMaxMemory(1); - $this->assertTrue($this->worker->isMaxMemoryExceeded()); + public function providerWorkerLoopEvents() { + return array( + array(WorkerEvent::EVENT_BOOTSTRAP, 1, array('bootstrap' => 1, 'finish' => 1, 'state' => 1)), + array(WorkerEvent::EVENT_PROCESS, 10, array('bootstrap' => 1, 'process' => 10, 'idle' => 5, 'finish' => 1, 'state' => 1)) + ); } - public function testMethod_willExceedMaxRuns() - { - $this->options->setMaxRuns(10); - $this->assertFalse($this->worker->isMaxRunsReached(9)); - $this->assertTrue($this->worker->isMaxRunsReached(10)); - $this->assertTrue($this->worker->isMaxRunsReached(11)); + /** + * Callback facilitating the worker loop + * + * It simulates a process queue strategy. And triggers an idle event on every uneven invokation of the PROCESS event + * + * @param WorkerEvent $e + */ + public function callbackWorkerLoopEvents(WorkerEvent $e) { + if (!isset($this->actualCalled[$e->getName()])) { + $this->actualCalled[$e->getName()] = 1; + } else { + $this->actualCalled[$e->getName()]++; + } + + // mark for exit when event is due + if ($e->getName() == $this->exitedBy && $this->actualCalled[$e->getName()] >= $this->exitAfter) { + $e->exitWorkerLoop(); + } + + // simulate process queue strategy, trigger idle event on every uneven call + if ($e->getName() == WorkerEvent::EVENT_PROCESS) { + if (!($this->actualCalled[WorkerEvent::EVENT_PROCESS] % 2)) { + $e->getTarget()->getEventManager()->trigger(WorkerEvent::EVENT_PROCESS_IDLE, $e); + $e->stopPropagation(); + + return; + } + } } - public function testSignalStopsWorkerForSigterm() - { - $worker = $this->worker; - $this->queue->expects($this->never()) - ->method('pop'); - - $worker->handleSignal(SIGTERM); - $count = $worker->processQueue($this->queue); + public function testProcessQueueSetOptionsOnWorkerEvent() { + /** @var EventManager $eventManager */ + $eventManager = $this->worker->getEventManager(); - $this->assertEquals(0, $count); - } + $eventManager->attach(WorkerEvent::EVENT_PROCESS, array($this, 'callbackProcessQueueSetOptionsOnWorkerEvent')); - public function testSignalStopsWorkerForSigint() - { - $worker = $this->worker; - $this->queue->expects($this->never()) - ->method('pop'); + $options = array('foo' => 'bar'); - $worker->handleSignal(SIGINT); - $count = $worker->processQueue($this->queue); + $this->worker->processQueue($this->queue, $options); - $this->assertEquals(0, $count); + $this->assertEquals($this->eventOptions, $options); } - public function testNonStoppingSignalDoesNotStopWorker() - { - $this->options->setMaxRuns(1); - $this->queue->expects($this->once()) - ->method('pop') - ->will($this->returnValue($this->job)); - - $this->worker->handleSignal(SIGPOLL); - $count = $this->worker->processQueue($this->queue); + /** + * Callback facilitating the worker loop + * + * @param WorkerEvent $e + */ + public function callbackProcessQueueSetOptionsOnWorkerEvent(WorkerEvent $e) { + $e->exitWorkerLoop(); - $this->assertEquals(1, $count); + $this->eventOptions = $e->getOptions(); } } diff --git a/tests/SlmQueueTest/Worker/WorkerEventTest.php b/tests/SlmQueueTest/Worker/WorkerEventTest.php index f0fffab..12af8e2 100644 --- a/tests/SlmQueueTest/Worker/WorkerEventTest.php +++ b/tests/SlmQueueTest/Worker/WorkerEventTest.php @@ -1,7 +1,7 @@ queue = $this->getMock('SlmQueue\Queue\QueueInterface'); + $this->queue = $this->getMock('SlmQueue\Queue\QueueInterface'); + $this->worker = $this->getMock('SlmQueue\Worker\WorkerInterface'); } + + public function testWorkerEventSetsWorkerAsTarget() + { + $event = new WorkerEvent($this->worker, $this->queue); + + $this->assertEquals($this->worker, $event->getTarget()); + } + public function testWorkerEventHoldsStateForQueue() { - $event = new WorkerEvent($this->queue); + $event = new WorkerEvent($this->worker, $this->queue); $this->assertEquals($this->queue, $event->getQueue()); } public function getWorkerEventHoldsStateForJob() { - $event = new WorkerEvent($this->queue); + $event = new WorkerEvent($this->worker, $this->queue); $job = new SimpleJob; $event->setJob($job); diff --git a/tests/testing.config.php b/tests/testing.config.php index 9b325be..d26d9ee 100644 --- a/tests/testing.config.php +++ b/tests/testing.config.php @@ -28,7 +28,7 @@ */ 'queue_manager' => array( 'factories' => array( - 'basic-queue' => function($locator) { + 'basic-queue' => function ($locator) { $parentLocator = $locator->getServiceLocator(); $jobPluginManager = $parentLocator->get('SlmQueue\Job\JobPluginManager');