Skip to content

Commit

Permalink
Merge pull request #16 from graze/feature/purge-queue
Browse files Browse the repository at this point in the history
Add purge to ProducerInterface
  • Loading branch information
sjparkinson committed Aug 28, 2015
2 parents 150f49d + 0da424b commit 4226d07
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 6 deletions.
3 changes: 3 additions & 0 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
use Graze\Queue\Adapter\Exception\UnsupportedOperationException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;
use Iterator;
Expand Down Expand Up @@ -43,4 +44,6 @@ public function dequeue(MessageFactoryInterface $factory, $limit);
* @throws FailedEnqueueException
*/
public function enqueue(array $messages);

public function purge();
}
10 changes: 9 additions & 1 deletion src/Adapter/ArrayAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
final class ArrayAdapter implements AdapterInterface
{
/**
* @param MessageInterface[]
* @param array
*/
protected $queue = [];

Expand Down Expand Up @@ -70,6 +70,14 @@ public function enqueue(array $messages)
}
}

/**
* {@inheritdoc}
*/
public function purge()
{
$this->queue = [];
}

/**
* @param MessageInterface $message
*/
Expand Down
10 changes: 9 additions & 1 deletion src/Adapter/SqsAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ public function enqueue(array $messages)
}
}

/**
* {@inheritdoc}
*/
public function purge()
{
$this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
}

/**
* @param MessageInterface[] $messages
* @return array
Expand Down Expand Up @@ -254,7 +262,7 @@ protected function getOption($name, $default = null)
*/
protected function getQueueUrl()
{
if (!$this->url) {
if (! $this->url) {
$result = $this->client->createQueue([
'QueueName' => $this->name,
'Attributes' => $this->options,
Expand Down
8 changes: 8 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public function send(array $messages)
return $this->adapter->enqueue($messages);
}

/**
* {@inheritdoc}
*/
public function purge()
{
return $this->adapter->purge();
}

/**
* @return callable
*/
Expand Down
4 changes: 2 additions & 2 deletions src/ConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
interface ConsumerInterface
{
/**
* @param callable $worker
* @param integer|null $limit Integer limit or Null no limit
* @param callable $worker
* @param integer $limit
*/
public function receive(callable $worker, $limit = 1);
}
8 changes: 6 additions & 2 deletions src/ProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@

namespace Graze\Queue;

use Graze\Queue\Adapter\Exception\UnsupportedOperationException;
use Graze\Queue\Message\MessageInterface;

interface ProducerInterface
{
/**
* @return MessageInterface
* @param string $body
* @param array $options
*/
public function create($body, array $options = []);

/**
* @param MessageInterface[] $message
* @param array $message
*/
public function send(array $messages);

public function purge();
}
12 changes: 12 additions & 0 deletions tests/integration/ArrayIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,16 @@ public function testSend()
{
$this->client->send([$this->client->create('foo')]);
}

public function testPurge()
{
$this->client->purge();

$msgs = [];
$this->client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
}, null);

assertThat($msgs, is(emptyArray()));
}
}
29 changes: 29 additions & 0 deletions tests/integration/SqsIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,33 @@ public function testSend()

$this->client->send([$this->client->create('foo')]);
}

public function testPurge()
{
$url = $this->stubCreateQueue();
$timeout = $this->stubQueueVisibilityTimeout($url);

$receiveModel = m::mock('Aws\ResultInterface');
$receiveModel->shouldReceive('get')->once()->with('Messages')->andReturn([]);
$this->sqsClient->shouldReceive('receiveMessage')->once()->with([
'QueueUrl' => $url,
'AttributeNames' => ['All'],
'MaxNumberOfMessages' => 1,
'VisibilityTimeout' => $timeout
])->andReturn($receiveModel);

$purgeModel = m::mock('Aws\ResultInterface');
$this->sqsClient->shouldReceive('purgeQueue')->once()->with([
'QueueUrl' => $url,
])->andReturn($purgeModel);

$this->client->purge();

$msgs = [];
$this->client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
});

assertThat($msgs, is(emptyArray()));
}
}
13 changes: 13 additions & 0 deletions tests/unit/Adapter/ArrayAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,17 @@ public function testEnqueue()

assertThat(iterator_to_array($iterator), is(identicalTo($merged)));
}

public function testPurge()
{
$iterator = $this->adapter->dequeue($this->factory, 10);

assertThat(iterator_to_array($iterator), is(nonEmptyArray()));

$this->adapter->purge();

$iterator = $this->adapter->dequeue($this->factory, 10);

assertThat(iterator_to_array($iterator), is(emptyArray()));
}
}
12 changes: 12 additions & 0 deletions tests/unit/Adapter/SqsAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,16 @@ public function testReceiveMessageWaitTimeSecondsOption()
assertThat($iterator, is(anInstanceOf('Generator')));
assertThat(iterator_to_array($iterator), is(equalTo($this->messages)));
}

public function testPurge()
{
$adapter = new SqsAdapter($this->client, 'foo');
$url = $this->stubCreateQueue('foo');

$this->client->shouldReceive('purgeQueue')->once()->with([
'QueueUrl' => $url,
])->andReturn($this->model);

assertThat($adapter->purge(), is(nullValue()));
}
}

0 comments on commit 4226d07

Please sign in to comment.