Skip to content

Commit

Permalink
Merge pull request #61 from acelaya-forks/feature/track-request-id
Browse files Browse the repository at this point in the history
Allow RoadRunner jobs to optionally forward a requestId as part of the task payload
  • Loading branch information
acelaya authored Apr 7, 2024
2 parents 28f1296 + c276f37 commit bfdce26
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 29 deletions.
10 changes: 7 additions & 3 deletions functions/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
use Psr\Container\ContainerInterface;
use Shlinkio\Shlink\EventDispatcher\Listener\DummyEnabledListenerChecker;
use Shlinkio\Shlink\EventDispatcher\Listener\EnabledListenerCheckerInterface;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;

function lazyListener(ContainerInterface $container, string $listenerServiceName): callable
{
return new Listener\LazyEventListener($container, $listenerServiceName);
}

function roadRunnerTaskListener(JobsInterface $jobs, string $listenerServiceName): callable
{
return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName);
function roadRunnerTaskListener(
JobsInterface $jobs,
string $listenerServiceName,
RequestIdProviderInterface $requestIdProvider,
): callable {
return new RoadRunner\RoadRunnerTaskListener($jobs, $listenerServiceName, $requestIdProvider);
}

function resolveEnabledListenerChecker(ContainerInterface $container): EnabledListenerCheckerInterface
Expand Down
8 changes: 3 additions & 5 deletions src/Listener/LazyEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

use Psr\Container\ContainerInterface;

class LazyEventListener
readonly class LazyEventListener
{
public function __construct(
private readonly ContainerInterface $container,
private readonly string $listenerServiceName,
) {
public function __construct(private ContainerInterface $container, private string $listenerServiceName)
{
}

public function __invoke(object $event): void
Expand Down
14 changes: 12 additions & 2 deletions src/RoadRunner/RoadRunnerEventDispatcherFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use League\Event\EventDispatcher;
use League\Event\PrioritizedListenerRegistry;
use Psr\Container\ContainerInterface;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\Jobs;

use function Shlinkio\Shlink\Config\env;
Expand All @@ -21,15 +22,24 @@ public function __invoke(ContainerInterface $container): EventDispatcher
{
$provider = new PrioritizedListenerRegistry();
$eventsConfig = $container->get('config')['events'] ?? [];
$requestIdProvider = $container->has(RequestIdProviderInterface::class)
? $container->get(RequestIdProviderInterface::class)
: new class implements RequestIdProviderInterface {
public function currentRequestId(): string
{
return '-';
}
};

$this->registerEvents($provider, $container, $eventsConfig['async'] ?? []);
$this->registerEvents($provider, $container, $requestIdProvider, $eventsConfig['async'] ?? []);

return new EventDispatcher($provider);
}

private function registerEvents(
PrioritizedListenerRegistry $provider,
ContainerInterface $container,
RequestIdProviderInterface $requestIdProvider,
array $events,
): void {
if (env('RR_MODE') === null) {
Expand All @@ -45,7 +55,7 @@ private function registerEvents(
continue;
}

$provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener));
$provider->subscribeTo($eventName, roadRunnerTaskListener($jobs, $listener, $requestIdProvider));
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions src/RoadRunner/RoadRunnerTaskConsumerToListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
use function is_subclass_of;
use function Shlinkio\Shlink\Json\json_decode;

class RoadRunnerTaskConsumerToListener
readonly class RoadRunnerTaskConsumerToListener
{
public function __construct(
private readonly ConsumerInterface $consumer,
private readonly ContainerInterface $container,
private readonly LoggerInterface $logger,
private ConsumerInterface $consumer,
private ContainerInterface $container,
private LoggerInterface $logger,
) {
}

public function listenForTasks(): void
/**
* @param (callable(string): void)|null $setCurrentRequestId
*/
public function listenForTasks(?callable $setCurrentRequestId = null): void
{
while ($task = $this->consumer->waitTask()) {
try {
Expand All @@ -37,8 +40,16 @@ public function listenForTasks(): void
continue;
}

['listenerServiceName' => $listener, 'eventPayload' => $payload] = json_decode($task->getPayload());
$this->container->get($listener)($event::fromPayload($payload));
[
'listenerServiceName' => $listenerService,
'eventPayload' => $payload,
'requestId' => $requestId,
] = json_decode($task->getPayload());
if ($setCurrentRequestId !== null) {
$setCurrentRequestId($requestId);
}

$this->container->get($listenerService)($event::fromPayload($payload));
$task->complete();
} catch (Throwable $e) {
$task->fail($e);
Expand Down
11 changes: 8 additions & 3 deletions src/RoadRunner/RoadRunnerTaskListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
namespace Shlinkio\Shlink\EventDispatcher\RoadRunner;

use JsonSerializable;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;

use function Shlinkio\Shlink\Json\json_encode;

class RoadRunnerTaskListener
readonly class RoadRunnerTaskListener
{
private const SHLINK_QUEUE = 'shlink';

public function __construct(private readonly JobsInterface $jobs, private readonly string $listenerServiceName)
{
public function __construct(
private JobsInterface $jobs,
private string $listenerServiceName,
private RequestIdProviderInterface $requestIdProvider,
) {
}

public function __invoke(object $event): void
Expand All @@ -23,6 +27,7 @@ public function __invoke(object $event): void
$task = $queue->create($event::class, json_encode([
'listenerServiceName' => $this->listenerServiceName,
'eventPayload' => $event instanceof JsonSerializable ? $event : [],
'requestId' => $this->requestIdProvider->currentRequestId(),
]));
$queue->dispatch($task);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Util/RequestIdProviderInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Shlinkio\Shlink\EventDispatcher\Util;

interface RequestIdProviderInterface
{
public function currentRequestId(): string;
}
29 changes: 23 additions & 6 deletions test/RoadRunner/RoadRunnerEventDispatcherFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
use PHPUnit\Framework\Assert;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Attributes\TestWith;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use ReflectionObject;
use Shlinkio\Shlink\EventDispatcher\Listener\EnabledListenerCheckerInterface;
use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerEventDispatcherFactory;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner\Jobs\JobsInterface;
use stdClass;
Expand Down Expand Up @@ -53,7 +55,9 @@ public static function provideEnv(): iterable
}

#[Test]
public function skipsListenersWhenEnabledListenerCheckerIsRegistered(): void
#[TestWith([true])]
#[TestWith([false])]
public function skipsListenersWhenEnabledListenerCheckerIsRegistered(bool $hasRequestIdProvider): void
{
putenv('RR_MODE=http');

Expand All @@ -62,7 +66,7 @@ public function shouldRegisterListener(string $event, string $listener, bool $is
{
return $isAsync && $listener === 'foo';
}
});
}, hasRequestIdProvider: $hasRequestIdProvider);

$dispatcher = ($this->factory)($container);
$listenerProvider = $this->getPrivateProp($dispatcher, 'listenerProvider');
Expand All @@ -80,10 +84,11 @@ private function getPrivateProp(object $object, string $propName): mixed
return $prop->getValue($object);
}

private function container(?EnabledListenerCheckerInterface $listenerChecker = null): ContainerInterface
{
private function container(
?EnabledListenerCheckerInterface $listenerChecker = null,
bool $hasRequestIdProvider = false,
): ContainerInterface {
$container = $this->createMock(ContainerInterface::class);

$getServiceReturnMap = [
['config', [
'events' => [
Expand All @@ -95,12 +100,24 @@ private function container(?EnabledListenerCheckerInterface $listenerChecker = n
]],
[Jobs::class, $this->createMock(JobsInterface::class)],
];
$hasServiceReturnMap = [
[RequestIdProviderInterface::class, $hasRequestIdProvider],
];

if ($listenerChecker !== null) {
$container->method('has')->with(EnabledListenerCheckerInterface::class)->willReturn(true);
$hasServiceReturnMap[] = [EnabledListenerCheckerInterface::class, true];
$getServiceReturnMap[] = [EnabledListenerCheckerInterface::class, $listenerChecker];
}

if ($hasRequestIdProvider) {
$getServiceReturnMap[] = [
RequestIdProviderInterface::class,
$this->createMock(RequestIdProviderInterface::class),
];
}

$container->method('get')->willReturnMap($getServiceReturnMap);
$container->method('has')->willReturnMap($hasServiceReturnMap);

return $container;
}
Expand Down
15 changes: 13 additions & 2 deletions test/RoadRunner/RoadRunnerTaskConsumerToListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace ShlinkioTest\Shlink\EventDispatcher\RoadRunner;

use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Attributes\TestWith;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
Expand Down Expand Up @@ -58,14 +59,18 @@ function () use (&$callCount, $task) {
}

#[Test]
public function listenerIsLoadedAndInvoked(): void
#[TestWith(['123'])]
#[TestWith(['456'])]
#[TestWith(['abc'])]
public function listenerIsLoadedAndInvoked(string $requestId): void
{
$callCount = 0;
$task = $this->createMock(ReceivedTaskInterface::class);
$task->method('getName')->willReturn(DummyJsonDeserializable::class);
$task->method('getPayload')->willReturn(json_encode([
'listenerServiceName' => 'my_listener',
'eventPayload' => [],
'requestId' => $requestId,
]));
$task->expects($this->once())->method('complete');
$task->expects($this->never())->method('fail');
Expand All @@ -79,7 +84,12 @@ function () use (&$callCount, $task) {
});
$this->logger->expects($this->never())->method('warning');

$this->taskConsumer->listenForTasks();
$providedRequestId = null;
$this->taskConsumer->listenForTasks(function (string $id) use (&$providedRequestId): void {
$providedRequestId = $id;
});

self::assertEquals($requestId, $providedRequestId);
}

#[Test]
Expand All @@ -91,6 +101,7 @@ public function taskIsFailedInCaseOfError(): void
$task->method('getPayload')->willReturn(json_encode([
'listenerServiceName' => 'my_listener',
'eventPayload' => [],
'requestId' => '123',
]));
$task->expects($this->never())->method('complete');
$task->expects($this->once())->method('fail')->with($this->isInstanceOf(RuntimeException::class));
Expand Down
13 changes: 12 additions & 1 deletion test/RoadRunner/RoadRunnerTaskListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerTaskListener;
use Shlinkio\Shlink\EventDispatcher\Util\RequestIdProviderInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface;
Expand All @@ -27,7 +28,16 @@ class RoadRunnerTaskListenerTest extends TestCase
public function setUp(): void
{
$this->jobs = $this->createMock(JobsInterface::class);
$this->listener = new RoadRunnerTaskListener($this->jobs, $this->listenerServiceName);
$this->listener = new RoadRunnerTaskListener(
$this->jobs,
$this->listenerServiceName,
new class implements RequestIdProviderInterface {
public function currentRequestId(): string
{
return '-';
}
},
);
}

#[Test, DataProvider('provideEvents')]
Expand All @@ -40,6 +50,7 @@ public function expectedTaskIsDispatchedBasedOnProvidedEvent(object $event, arra
$queue->expects($this->once())->method('create')->with($event::class, json_encode([
'listenerServiceName' => $this->listenerServiceName,
'eventPayload' => $expectedPayload,
'requestId' => '-',
]))->willReturn($task);
$queue->expects($this->once())->method('dispatch')->with($task)->willReturn(
$this->createMock(QueuedTaskInterface::class),
Expand Down

0 comments on commit bfdce26

Please sign in to comment.