Skip to content
This repository has been archived by the owner on Jul 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #109 from basz/feature/worker-listener-2
Browse files Browse the repository at this point in the history
adding worker aggregate listeners (2)
  • Loading branch information
bakura10 committed Oct 2, 2014
2 parents 0cb63ce + b62af7b commit 5c2452b
Show file tree
Hide file tree
Showing 43 changed files with 2,285 additions and 520 deletions.
40 changes: 33 additions & 7 deletions config/module.config.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
return array(
'service_manager' => array(
'factories' => array(
'SlmQueue\Job\JobPluginManager' => 'SlmQueue\Factory\JobPluginManagerFactory',
'SlmQueue\Options\WorkerOptions' => 'SlmQueue\Factory\WorkerOptionsFactory',
'SlmQueue\Queue\QueuePluginManager' => 'SlmQueue\Factory\QueuePluginManagerFactory'
'SlmQueue\Job\JobPluginManager' => 'SlmQueue\Factory\JobPluginManagerFactory',
'SlmQueue\Listener\StrategyPluginManager' => 'SlmQueue\Factory\StrategyPluginManagerFactory',
'SlmQueue\Queue\QueuePluginManager' => 'SlmQueue\Factory\QueuePluginManagerFactory'
),
),

Expand All @@ -17,11 +17,20 @@

'slm_queue' => array(
/**
* Worker options
* Worker Strategies
*/
'worker' => array(
'max_runs' => 100000,
'max_memory' => 100 * 1024 * 1024
'worker_strategies' => array(
'default' => array( // per worker
'SlmQueue\Strategy\AttachQueueListenersStrategy', // attaches strategies per queue
'SlmQueue\Strategy\MaxRunsStrategy' => array('max_runs' => 100000),
'SlmQueue\Strategy\MaxMemoryStrategy' => array('max_memory' => 100 * 1024 * 1024),
'SlmQueue\Strategy\InterruptStrategy',
),
'queues' => array( // per queue
'default' => array(
'SlmQueue\Strategy\ProcessQueueStrategy',
)
),
),

/**
Expand All @@ -38,5 +47,22 @@
* Queue manager configuration
*/
'queue_manager' => array(),

/**
* Strategy manager configuration
*/
'strategy_manager' => array(
'invokables' => array(
'SlmQueue\Strategy\ProcessQueueStrategy' => 'SlmQueue\Strategy\ProcessQueueStrategy',
'SlmQueue\Strategy\InterruptStrategy' => 'SlmQueue\Strategy\InterruptStrategy',
'SlmQueue\Strategy\MaxRunsStrategy' => 'SlmQueue\Strategy\MaxRunsStrategy',
'SlmQueue\Strategy\MaxMemoryStrategy' => 'SlmQueue\Strategy\MaxMemoryStrategy',
'SlmQueue\Strategy\FileWatchStrategy' => 'SlmQueue\Strategy\FileWatchStrategy',
),
'factories' => array(
'SlmQueue\Strategy\AttachQueueListenersStrategy' => 'SlmQueue\Strategy\Factory\AttachQueueListenersStrategyFactory',
'SlmQueue\Strategy\LogJobStrategy' => 'SlmQueue\Strategy\Factory\LogJobStrategyFactory',
)
),
)
);
72 changes: 54 additions & 18 deletions config/slm_queue.global.php.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,63 @@
return array(
'slm_queue' => array(
/**
* Parameters for the worker. It defines some criterias that can be reached before the
* worker stops to process any other jobs
* Allow to configure a specific queue.
*
* Available options depends on the queue factory
*/
'worker' => array(
/**
* Specify how many jobs can be processed by a worker until it stops (default to 100 000)
*/
// 'max_runs' => 100000,
'queues' => array(),

/**
* Specifiy the max memory (in bytes) that can be used by the worker before it stops (default to 100 MB)
*/
// 'max_memory' => 100 * 1024 * 1024
),
/**
* This block is use to register and configure strategies to the worker event manager. The default key holds any
* configuration for all instanciated workers. The ones configured within the 'queues' keys are specific to
* specific queues only.
*
* Note that module.config.php defines a few defaults and that configuration where the value is not an array
* will be ignored (thus allows you to disable preconfigured strategies).
*
* 'worker_strategies' => array(
* 'default' => array( // per worker
* // Would disable the pre configured max memory strategy
* 'SlmQueue\Strategy\MaxMemoryStrategy' => null
* // Reconfigure the pre configured max memory strategy to use 250Mb max
* 'SlmQueue\Strategy\MaxMemoryStrategy' => array('max_memory' => 250 * 1024 * 1024)
* ),
* ),
*
* As queue processing is handled by strategies it is important that for each queue a ProcessQueueStrategy
* (a strategy that listens to WorkerEvent::EVENT_PROCESS) is registered. By default SlmQueue does handles that
* for the queue called 'default'.
*
* 'worker_strategies' => array(
* 'queues' => array(
* 'my-queue' => array(
* 'SlmQueue\Strategy\ProcessQueueStrategy',
* )
* ),
* ),
*/
'worker_strategies' => array(
'default' => array( // per worker
),
'queues' => array( // per queue
'default' => array(
),
),
),

/**
* Allow to configure a specific queue.
*
* Available options depends on the queue factory
*/
'queues' => array(),
/**
* Allow to configure the plugin manager that manages strategies. This works like any other
* PluginManager in Zend Framework 2.
*
* Add you own or override existing factories
*
* 'strategy_manager' => array(
* 'factories' => array(
* 'SlmQueue\Strategy\LogJobStrategy' => 'MyVeryOwn\LogJobStrategyFactory',
* )
* ),
*/
'strategy_manager' => array(),

/**
* Allow to configure dependencies for jobs that are pulled from any queue. This works like any other
Expand Down
29 changes: 7 additions & 22 deletions docs/5.Workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,15 @@ for the different queue implementations.
Worker stop conditions
----------------------

The worker will be a long running call, due to the `while(true){ /*...*/ }` loop it contains inside the `processQueue()`
method. However, there are reasons to cancel the loop and stop the worker to continue. PHP is not the best language for
creating infinite running scripts.
The worker will be a long running call, due to the `while(...){ /*...*/ }` loop it
contains inside the `processQueue()` method. However, there are reasons to cancel the loop and stop the worker to
continue. PHP is not the best language for creating infinite running scripts.

It is wise to abort the script frequently, for example after x number of cycles in the `while` loop. A worker is
initiated with a `WorkerOptions` class, where it is possible to set a maximum number of runs. In this same options class,
a maximum memory level can be set. If you have accidentally a memory leak, this option can make sure the script aborts
eventually after reaching this level.
It is wise to abort the script frequently, for example after x number of cycles in the `while` loop.

Both options can be set at the `worker` key in the SlmQueue configuration:

```php
'slm_queue' => array(
'worker' => array(
'max_runs' => 100000, // 10,000 runs
'max_memory' => 20 * 1024 * 1024, // 20 MB
)
),
```

Secondly, it is possible to catch a stop condition under Linux-like systems (as well as OS X). If a worker is started
from the command line interface (CLI), it is possible to send a SIGTERM or SiGINT call to the worker. SlmQueue is smart
enough not to quit the script diretly, but let the job finish its work first and then break out of the loop. For Windows
users this option is not available.
Various build-in strategies (['see 6.Events'](6.Events.md)) are used to decide if the worker should exit. These
strategies are aggregate listeners that hook into events the worker dispatches. Any listener may call a method
on the passed event `WorkerEvent::exitWorkerLoop()`. Which the worker will check the event the next iteration of the loop.

Command line utility
--------------------
Expand Down
Loading

0 comments on commit 5c2452b

Please sign in to comment.