Skip to content

Commit

Permalink
Merge pull request #2 from ndthuan/enable-logic-overrides
Browse files Browse the repository at this point in the history
Introduce callbacks for subscriber
  • Loading branch information
ndthuan authored Nov 23, 2018
2 parents 5d10927 + fe70116 commit abc3f58
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 48 deletions.
1 change: 0 additions & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
bootstrap="tests/bootstrap.php"
cacheTokens="false"
colors="true"
stopOnError="true"
>
<testsuites>
<testsuite name="unit">
Expand Down
Empty file modified run_tests.sh
100644 → 100755
Empty file.
4 changes: 1 addition & 3 deletions src/Queue/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ public function __construct(SqsClient $sqsClient, string $queueUrl, array $defau
$this->sqsClient = $sqsClient;
$this->queueUrl = $queueUrl;

$defaultReceiveMessageOptions = $defaultReceiveMessageOptions ?? [
$this->defaultReceiveMessageOptions = $defaultReceiveMessageOptions ?? [
'AttributeNames' => ['SentTimestamp'],
'MaxNumberOfMessages' => 1,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 0,
];

$this->defaultReceiveMessageOptions = $defaultReceiveMessageOptions;
}

/**
Expand Down
53 changes: 16 additions & 37 deletions src/Subscribing/AbstractSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,46 @@
use Ndthuan\AwsSqsWrapper\Queue\Connector;
use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Queue\ResultMetadata;
use Ndthuan\AwsSqsWrapper\Subscribing\Callbacks\SubscriberCallbacksInterface;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\FatalException;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\LogicException;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;

/**
* Class AbstractSubscriber
*/
abstract class AbstractSubscriber implements SubscriberInterface, MessageProcessorInterface, LoggerAwareInterface
abstract class AbstractSubscriber implements SubscriberInterface, MessageProcessorInterface
{
/**
* @var Connector
*/
private $queueConnector;

/**
* @var array
* @var SubscriberCallbacksInterface
*/
private $receiveMessageOptions;
private $callbacks;

/**
* @var LoggerInterface
* @var array
*/
private $logger;
private $receiveMessageOptions;

/**
* AbstractSubscriber constructor.
*
* @param Connector $queueConnector
* @param array $receiveMessageOptions
* @param Connector $queueConnector
* @param SubscriberCallbacksInterface $callbacks
* @param array $receiveMessageOptions
*/
public function __construct(
Connector $queueConnector,
SubscriberCallbacksInterface $callbacks,
array $receiveMessageOptions = []
) {
$this->queueConnector = $queueConnector;
$this->callbacks = $callbacks;
$this->receiveMessageOptions = $receiveMessageOptions;

$this->setLogger(new NullLogger());
}

/**
* @param LoggerInterface $logger
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}

/**
Expand All @@ -65,35 +56,23 @@ public function pullAndProcessMessages()
$receiveResult = $this->queueConnector->receiveMessage($this->receiveMessageOptions);

foreach ($receiveResult->getMessages() as $message) {
$this->logger->info('Received SQS message', ['message' => $message]);
$this->callbacks->onMessageReceived($message);

try {
$this->processMessage($message, $receiveResult->getMetadata());

$this->queueConnector->deleteMessage($message->getReceiptHandle());

$this->logger->info('Successfully processed SQS message', [
'message' => $message,
]);
$this->callbacks->onMessageProcessed($message);
} catch (LogicException $exception) {
$this->queueConnector->deleteMessage($message->getReceiptHandle());

$this->logger->info('Deleted SQS message due to logical exception', [
'message' => $message,
'exception' => $exception,
]);
$this->callbacks->onLogicException($message, $exception);
} catch (FatalException $exception) {
$this->logger->critical('Stopped SQS processing due to fatal exception', [
'message' => $message,
'exception' => $exception,
]);
$this->callbacks->onFatalException($message, $exception);

throw $exception;
} catch (Throwable $exception) {
$this->logger->error('Uncaught exception when processing SQS message', [
'message' => $message,
'exception' => $exception,
]);
$this->callbacks->onUncaughtException($message, $exception);
}
}
}
Expand Down
84 changes: 84 additions & 0 deletions src/Subscribing/Callbacks/LoggingCallbacks.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php
declare(strict_types=1);

namespace Ndthuan\AwsSqsWrapper\Subscribing\Callbacks;

use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\FatalException;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\LogicException;
use Psr\Log\LoggerInterface;
use Throwable;

/**
* Class LoggingCallbacks
*/
class LoggingCallbacks implements SubscriberCallbacksInterface
{
/**
* @var LoggerInterface
*/
private $logger;

/**
* LoggingCallbacks constructor.
* @param LoggerInterface $logger
*/
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}

/**
* @param ReceivedMessage $message
*/
public function onMessageReceived(ReceivedMessage $message)
{
$this->logger->info('Received SQS message', ['messageId' => $message->getId()]);
}

/**
* @param ReceivedMessage $message
*/
public function onMessageProcessed(ReceivedMessage $message)
{
$this->logger->info('Successfully processed SQS message', [
'messageId' => $message->getId(),
]);
}

/**
* @param ReceivedMessage $message
* @param LogicException $exception
*/
public function onLogicException(ReceivedMessage $message, LogicException $exception)
{
$this->logger->info('Deleted SQS message due to logical exception', [
'messageId' => $message->getId(),
'exception' => $exception,
]);
}

/**
* @param ReceivedMessage $message
* @param FatalException $exception
*/
public function onFatalException(ReceivedMessage $message, FatalException $exception)
{
$this->logger->critical('Stopped SQS processing due to fatal exception', [
'messageId' => $message->getId(),
'exception' => $exception,
]);
}

/**
* @param ReceivedMessage $message
* @param Throwable $exception
*/
public function onUncaughtException(ReceivedMessage $message, Throwable $exception)
{
$this->logger->error('Uncaught exception when processing SQS message', [
'messageId' => $message->getId(),
'exception' => $exception,
]);
}
}
55 changes: 55 additions & 0 deletions src/Subscribing/Callbacks/NullCallbacks.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php
declare(strict_types=1);

namespace Ndthuan\AwsSqsWrapper\Subscribing\Callbacks;

use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\FatalException;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\LogicException;
use Throwable;

/**
* Class NullCallbacks
*
* @codeCoverageIgnore
*/
class NullCallbacks implements SubscriberCallbacksInterface
{
/**
* @param ReceivedMessage $message
*/
public function onMessageReceived(ReceivedMessage $message)
{
}

/**
* @param ReceivedMessage $message
*/
public function onMessageProcessed(ReceivedMessage $message)
{
}

/**
* @param ReceivedMessage $message
* @param LogicException $exception
*/
public function onLogicException(ReceivedMessage $message, LogicException $exception)
{
}

/**
* @param ReceivedMessage $message
* @param FatalException $exception
*/
public function onFatalException(ReceivedMessage $message, FatalException $exception)
{
}

/**
* @param ReceivedMessage $message
* @param Throwable $exception
*/
public function onUncaughtException(ReceivedMessage $message, Throwable $exception)
{
}
}
45 changes: 45 additions & 0 deletions src/Subscribing/Callbacks/SubscriberCallbacksInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php
declare(strict_types=1);

namespace Ndthuan\AwsSqsWrapper\Subscribing\Callbacks;

use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\FatalException;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\LogicException;
use Throwable;

/**
* Interface SubscriberCallbacksInterface
*
* Subscriber callback methods.
*/
interface SubscriberCallbacksInterface
{
/**
* @param ReceivedMessage $message
*/
public function onMessageReceived(ReceivedMessage $message);

/**
* @param ReceivedMessage $message
*/
public function onMessageProcessed(ReceivedMessage $message);

/**
* @param ReceivedMessage $message
* @param LogicException $exception
*/
public function onLogicException(ReceivedMessage $message, LogicException $exception);

/**
* @param ReceivedMessage $message
* @param FatalException $exception
*/
public function onFatalException(ReceivedMessage $message, FatalException $exception);

/**
* @param ReceivedMessage $message
* @param Throwable $exception
*/
public function onUncaughtException(ReceivedMessage $message, Throwable $exception);
}
11 changes: 7 additions & 4 deletions src/Subscribing/DelegatorSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Ndthuan\AwsSqsWrapper\Queue\Connector;
use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Queue\ResultMetadata;
use Ndthuan\AwsSqsWrapper\Subscribing\Callbacks\SubscriberCallbacksInterface;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\ValidationFailureException;

/**
Expand All @@ -23,16 +24,18 @@ class DelegatorSubscriber extends AbstractSubscriber
/**
* DelegatorSubscriber constructor.
*
* @param MessageProcessorInterface $messageProcessor
* @param Connector $queueConnector
* @param array $receiveMessageOptions
* @param MessageProcessorInterface $messageProcessor
* @param Connector $queueConnector
* @param SubscriberCallbacksInterface $callbacks
* @param array $receiveMessageOptions
*/
public function __construct(
MessageProcessorInterface $messageProcessor,
Connector $queueConnector,
SubscriberCallbacksInterface $callbacks,
array $receiveMessageOptions = []
) {
parent::__construct($queueConnector, $receiveMessageOptions);
parent::__construct($queueConnector, $callbacks, $receiveMessageOptions);

$this->messageProcessor = $messageProcessor;
}
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/Subscribing/DelegatorSubscriberTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ndthuan\AwsSqsWrapper\Queue\ReceivedMessage;
use Ndthuan\AwsSqsWrapper\Queue\ReceiveMessageResult;
use Ndthuan\AwsSqsWrapper\Queue\ResultMetadata;
use Ndthuan\AwsSqsWrapper\Subscribing\Callbacks\LoggingCallbacks;
use Ndthuan\AwsSqsWrapper\Subscribing\DelegatorSubscriber;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\FatalException;
use Ndthuan\AwsSqsWrapper\Subscribing\Exception\LogicException;
Expand Down Expand Up @@ -83,11 +84,10 @@ protected function setUp()
$this->subscriberUnderTest = new DelegatorSubscriber(
$this->messageProcessorMock,
$this->connectorMock,
new LoggingCallbacks($logger),
$this->receiveMessageOptions
);

$this->subscriberUnderTest->setLogger($logger);

$this->fakeReceiveResultMetadata = ResultMetadata::fromArray([]);

$this->fakeReceivedMessage = $this->createSampleReceivedMessage();
Expand Down Expand Up @@ -197,7 +197,7 @@ public function testProcessMessageOnUncaughtExceptionMessageShouldNotBeDeletedAn
));
}

private function createSampleReceivedMessage()
private function createSampleReceivedMessage(): ReceivedMessage
{
return new ReceivedMessage(
'sample-uuid',
Expand Down

0 comments on commit abc3f58

Please sign in to comment.