Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove blocking I/O from ReactPHP implementation #8550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions frameworks/PHP/reactphp/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
vendor
composer.lock
*.dockerfile
.dockerignore
.gitignore
2 changes: 2 additions & 0 deletions frameworks/PHP/reactphp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
composer.lock
vendor
209 changes: 111 additions & 98 deletions frameworks/PHP/reactphp/app.php
Original file line number Diff line number Diff line change
@@ -1,134 +1,147 @@
<?php

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface as Request;
use React\EventLoop\Loop;
use React\MySQL\ConnectionInterface as DbConnection;
use React\MySQL\Factory as DbFactory;
use React\Http\Message\Response;
use React\MySQL\QueryResult;
use React\Promise\PromiseInterface;

function init()
{
global $world, $fortune, $update;
$pdo = new PDO(
'mysql:host=tfb-database;dbname=hello_world',
'benchmarkdbuser',
'benchmarkdbpass',
[
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false
]
);
$world = $pdo->prepare('SELECT id,randomNumber FROM World WHERE id=?');
$update = $pdo->prepare('UPDATE World SET randomNumber=? WHERE id=?');
$fortune = $pdo->prepare('SELECT id,message FROM Fortune');
$fortune->setFetchMode(PDO::FETCH_KEY_PAIR);
}
use function React\Promise\all;

function router(Request $request)
/** @return Closure(Request):ResponseInterface */
function requestHandler(): Closure
{
return match($request->getUri()->getPath()) {
'/plaintext' => text(),
'/json' => json(),
'/db' => db(),
'/fortunes' => fortune(),
'/query' => query($request),
'/update' => updateraw($request),
// '/info' => info(),
default => new Response(404, [], 'Error 404'),
$connection = establishDbConnection('benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world?idle=0.5');
lcobucci marked this conversation as resolved.
Show resolved Hide resolved

$world = static function (int $id) use ($connection): PromiseInterface {
return $connection->query('SELECT id,randomNumber FROM World WHERE id=?', [$id]);
};
}

function text()
{
return new Response(200, [
'Content-Type' => 'text/plain'
], 'Hello, World!');
}
$fortune = static function () use ($connection): PromiseInterface {
return $connection->query('SELECT id,message FROM Fortune');
};
lcobucci marked this conversation as resolved.
Show resolved Hide resolved

function json()
{
return new Response(200, [
'Content-Type' => 'application/json'
], json_encode(['message' => 'Hello, World!']));
$update = static function (int $id, int $randomNumber) use ($connection): PromiseInterface {
return $connection->query('UPDATE World SET randomNumber=? WHERE id=?', [$randomNumber, $id]);
};

return static function (Request $request) use ($world, $fortune, $update): ResponseInterface | PromiseInterface {
return match($request->getUri()->getPath()) {
'/plaintext' => Response::plaintext('Hello, World!'),
'/json' => Response::json(['message' => 'Hello, World!']),
'/db' => db($world),
'/fortunes' => fortune($fortune),
'/query' => query(queryCount($request), $world),
'/update' => updateraw(queryCount($request), $world, $update),
// '/info' => info(),
default => new Response(404, [], 'Error 404'),
};
};
}

function db()
{
global $world;
function establishDbConnection(
#[SensitiveParameter]
string $uri,
): DbConnection {
$connection = (new DbFactory())->createLazyConnection($uri);

$interrupt = $connection->quit(...);

$world->execute([mt_rand(1, 10000)]);
$connection->on('close', static function () use (&$interrupt) {
Loop::removeSignal(SIGINT, $interrupt);
Loop::removeSignal(SIGTERM, $interrupt);
});

return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($world->fetch()));
Loop::addSignal(SIGINT, $interrupt);
Loop::addSignal(SIGTERM, $interrupt);

return $connection;
}

function query($request)
/** @param Closure(int):PromiseInterface $world */
function db(Closure $world): PromiseInterface
{
global $world;
$id = mt_rand(1, 10000);

$query_count = 1;
$q = (int) $request->getQueryParams()['q'];
if ($q > 1) {
$query_count = min($q, 500);
}
return $world($id)->then(
static fn (QueryResult $result): ResponseInterface => Response::json($result->resultRows[0]),
);
}

while ($query_count--) {
$world->execute([mt_rand(1, 10000)]);
$arr[] = $world->fetch();
function queryCount(Request $request): int
{
$count = (int) ($request->getQueryParams()['q'] ?? 1);

if ($count > 1) {
return min($count, 500);
}

return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($arr));
return 1;
}

function updateraw($request)
/** @param Closure(int):PromiseInterface $world */
function query(int $queryCount, Closure $world): PromiseInterface
{
global $world, $update;
$processQueries = static function (int $count) use ($world): iterable {
while ($count--) {
$id = mt_rand(1, 10000);

$query_count = 1;
$q = (int) $request->getQueryParams()['q'];
if ($q > 1) {
$query_count = min($q, 500);
}
yield $world($id)->then(static fn (QueryResult $result): array => $result->resultRows[0]);
}
};

while ($query_count--) {
$id = mt_rand(1, 10000);
$world->execute([$id]);
$item = $world->fetch();
$update->execute(
[$item['randomNumber'] = mt_rand(1, 10000), $id]
);
return all($processQueries($queryCount))
->then(static fn (array $result): ResponseInterface => Response::json($result));
}

$arr[] = $item;
}
/**
* @param Closure(int):PromiseInterface $world
* @param Closure(int, int):PromiseInterface $update
*/
function updateraw(int $queryCount, Closure $world, Closure $update): PromiseInterface
{
$processQueries = static function (int $count) use ($world, $update): iterable {
while ($count--) {
$id = mt_rand(1, 10000);

yield $world($id)->then(
static function (QueryResult $result) use ($update): PromiseInterface {
$updated = $result->resultRows[0];
$updated['randomNumber'] = mt_rand(1, 10000);

return $update($updated['id'], $updated['randomNumber'])
->then(static fn (): array => $updated);
}
);
}
};

// $pdo->beginTransaction();
// foreach($arr as $world) {
// $update->execute([$world['randomNumber'], $world['id']]);
// }
// $pdo->commit();
return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($arr));
return all($processQueries($queryCount))
->then(static fn (array $result): ResponseInterface => Response::json($result));
}

function fortune()
function fortune(Closure $fortune): PromiseInterface
{
global $fortune;
$formatResult = static function (array $rows): string {
$rows[] = ['id' => 0, 'message' => 'Additional fortune added at request time.'];
usort($rows, static fn (array $one, array $other) => $one['message'] <=> $other['message']);

$fortune->execute();
$html = '';

$arr = $fortune->fetchAll();
$arr[0] = 'Additional fortune added at request time.';
asort($arr);
foreach ($rows as $row) {
$message = htmlspecialchars($row['message'], ENT_QUOTES, 'UTF-8');

$html = '';
foreach ($arr as $id => $message) {
$message = htmlspecialchars($message, ENT_QUOTES, 'UTF-8');
$html .= "<tr><td>$id</td><td>$message</td></tr>";
}
$html .= "<tr><td>${row['id']}</td><td>${message}</td></tr>";
}

return "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>";
};

return new Response(200, [
'Content-Type' => 'text/html; charset=UTF-8',
], "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>"
return $fortune()->then(
static fn (QueryResult $result): ResponseInterface => Response::html($formatResult($result->resultRows)),
);
}

Expand All @@ -138,4 +151,4 @@ function fortune()
phpinfo();
return new Response(200, ['Content-Type' => 'text/plain'], ob_get_clean());
}
*/
*/
9 changes: 5 additions & 4 deletions frameworks/PHP/reactphp/composer.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"require": {
"php": ">=5.3.0",
"php": ">=8.3.0",
"psr/http-message": "^1.0",
"react/event-loop": "^1.2",
"react/http": "^1.6",
"react/socket": "^1.11"
"react/event-loop": "^1.5",
"react/http": "^1.9",
"react/socket": "^1.14",
"react/mysql": "^0.6"
}
}
23 changes: 12 additions & 11 deletions frameworks/PHP/reactphp/reactphp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@ RUN apt-get update -yqq && apt-get install -yqq software-properties-common > /de
RUN LC_ALL=C.UTF-8 add-apt-repository ppa:ondrej/php
RUN apt-get update -yqq > /dev/null && \
apt-get install -yqq git unzip wget curl build-essential \
php8.2-cli php8.2-mbstring php8.2-dev php8.2-xml php8.2-curl php8.2-mysql > /dev/null
php8.3-cli php8.3-mbstring php8.3-dev php8.3-xml > /dev/null

# An extension is required!
# We deal with concurrencies over 1k, which stream_select doesn't support.
RUN wget http://pear.php.net/go-pear.phar --quiet && php go-pear.phar
#RUN apt-get install -y libuv1-dev > /dev/null
RUN apt-get install -y libevent-dev > /dev/null
#RUN pecl install uv-0.2.4 > /dev/null && echo "extension=uv.so" > /etc/php/8.2/cli/conf.d/uv.ini
RUN pecl install event-3.0.8 > /dev/null && echo "extension=event.so" > /etc/php/8.2/cli/conf.d/event.ini
RUN apt-get install -yqq libuv1-dev > /dev/null \
&& pecl install uv-beta > /dev/null \
&& echo "extension=uv.so" > /etc/php/8.3/cli/conf.d/uv.ini

ADD ./ /reactphp
WORKDIR /reactphp
COPY --from=composer:latest /usr/bin/composer /usr/local/bin/composer

COPY deploy/conf/* /etc/php/8.2/cli/conf.d/
COPY deploy/conf/* /etc/php/8.3/cli/conf.d/

COPY --from=composer:latest /usr/bin/composer /usr/local/bin/composer
WORKDIR /reactphp

COPY composer.json .
RUN composer install --prefer-dist --optimize-autoloader --no-dev --quiet

COPY . .

EXPOSE 8080

CMD php server.php
ENTRYPOINT ["/usr/bin/php"]
CMD ["server.php"]
lcobucci marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 17 additions & 10 deletions frameworks/PHP/reactphp/server.php
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
<?php

use React\EventLoop\Loop;
use React\Http\HttpServer;
use React\Socket\SocketServer;

require __DIR__ . '/vendor/autoload.php';
require_once __DIR__.'/app.php';

init();
$server = new HttpServer(requestHandler());
$socket = new SocketServer('0.0.0.0:8080');
$server->listen($socket);

$loop = React\EventLoop\Loop::get();
echo "React Server running at http://0.0.0.0:8080\n";
echo "EventLoop: ", Loop::get()::class, "\n";

$server = new React\Http\Server($loop, function (Psr\Http\Message\ServerRequestInterface $request) {
return router($request);
});
$interrupt = static function () use ($server, $socket, &$interrupt): void {
echo 'Interrupting server', PHP_EOL;

$socket = new React\Socket\Server('0.0.0.0:8080', $loop);
$server->listen($socket);
$socket->close();

echo "React Server running at http://0.0.0.0:8080\n";
echo "EventLoop: ", $loop::class, "\n";
Loop::removeSignal(SIGINT, $interrupt);
Loop::removeSignal(SIGTERM, $interrupt);
};

$loop->run();
Loop::addSignal(SIGINT, $interrupt);
Loop::addSignal(SIGTERM, $interrupt);
Loading