Skip to content

Commit

Permalink
Asynchronous HTTP Client Library (#72)
Browse files Browse the repository at this point in the history
An asynchronous HTTP client library designed for WordPress. Main features:

 ### Streaming support

 Enqueuing a request returns a PHP resource that can be read by PHP functions like `fopen()`
 and `stream_get_contents()`

 ```php
 $client = new AsyncHttpClient();
 $fp = $client->enqueue(
      new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
 );
 // Read some data
 $first_4_kilobytes = fread($fp, 4096);
 // We've only waited for the first four kilobytes. The download
 // is still in progress at this point, and yet we're free to do
 // other work.
 ```

 ### Delayed execution and concurrent downloads

 The actual socket are not open until the first time the stream is read from:

 ```php
 $client = new AsyncHttpClient();
 // Enqueuing the requests does not start the data transmission yet.
 $batch = $client->enqueue( [
     new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
     new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
 ] );
 // Even though stream_get_contents() will return just the response body for
 // one request, it also opens the network sockets and starts streaming
 // both enqueued requests. The response data for $batch[1] is buffered.
 $gutenberg_zip = stream_get_contents( $batch[0] )

 // At least a chunk of the pendant.zip have already been downloaded, let's
 // wait for the rest of the data:
 $pendant_zip = stream_get_contents( $batch[1] )
 ```

 ### Concurrency limits

 The `AsyncHttpClient` will only keep up to `$concurrency` connections open. When one of the
 requests finishes, it will automatically start the next one.

 For example:
 ```php
 $client = new AsyncHttpClient();
 // Process at most 10 concurrent request at a time.
 $client->set_concurrency_limit( 10 );
 ```

 ### Progress monitoring

 A developer-provided callback (`AsyncHttpClient->set_progress_callback()`) receives progress
 information about every HTTP request.

 ```php
 $client = new AsyncHttpClient();
 $client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
      // $total is computed based on the Content-Length response header and
      // null if it's missing.
      echo "$request->url – Downloaded: $downloaded / $total\n";
 } );
 ```

 ### HTTPS support

 TLS connections work out of the box.

 ### Non-blocking sockets

 The act of opening each socket connection is non-blocking and happens nearly
 instantly. The streams themselves are also set to non-blocking mode via `stream_set_blocking($fp, 0);`

 ### Asynchronous downloads

Start downloading now, do other work in your code, only block once you need the data.

 ### PHP 7.0 support and no dependencies

 `AsyncHttpClient` works on any WordPress installation with vanilla PHP only.
 It does not require any PHP extensions, CURL, or any external PHP libraries.

 ### Supports custom request headers and body

 ## Implementation details

* **Non-blocking stream opening:**
    * `streams_http_open_nonblocking` utilizes `stream_http_open_nonblocking` to open streams for the provided URLs.
    * `stream_http_open_nonblocking` first validates the URL scheme (only http and https are supported).
    * It then creates a stream context with a `tcp://` wrapper to open the connection because the `https://` and `ssl://` wrappers would block until the SSL handshake is complete.
    * After opening the connection using `stream_socket_client`, it switches the stream to non-blocking mode using `stream_set_blocking`.
* **Asynchronous HTTP request sending:**
    * `streams_http_requests_send` iterates over the provided streams and enables encryption (crypto) on each one using `stream_socket_enable_crypto`.
    * It then uses `stream_select` to wait for the streams to become writable and sends the HTTP request headers using `fwrite`.
* **Reading the response:**
    * `streams_http_response_await_bytes` utilizes `stream_select` to wait for a specified number of bytes to become available on any of the streams.
    * `streams_http_response_await_headers` retrieves the full HTTP response headers iteratively. It reads bytes from the streams until the end-of-headers marker (`\r\n\r\n`). The rest of the response stream, which is the response body, is available for the consumer code to read.
    * Reading from each async stream triggers `stream_select` to buffer any data available on other concurrent connections. This is implemented via a `stream_read` method of a custom stream wrapper.
* **Progress monitoring:**
    * `stream_monitor_progress` taps into the `stream_read` operation using a stream wrapper and reports the number of read bytes to a callback function.

  ## Related issues

* #71
  • Loading branch information
adamziel committed Mar 18, 2024
1 parent 62d460c commit a9f10c1
Show file tree
Hide file tree
Showing 28 changed files with 1,256 additions and 409 deletions.
7 changes: 3 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"symfony/event-dispatcher": "*",
"symfony/filesystem": "*",
"symfony/process": "*",
"symfony/http-client": "*",
"symfony/http-kernel": "*",
"pimple/pimple": "*",
"psr/simple-cache": "*",
"opis/json-schema": "*",
Expand All @@ -14,7 +12,7 @@
},
"require-dev": {
"phpunit/phpunit": "*",
"squizlabs/php_codesniffer":"*",
"squizlabs/php_codesniffer": "*",
"nette/php-generator": "*",
"jane-php/json-schema": "*",
"bamarni/composer-bin-plugin": "*",
Expand Down Expand Up @@ -43,7 +41,8 @@
"files": [
"src/WordPress/Blueprints/functions.php",
"src/WordPress/Zip/functions.php",
"src/WordPress/Streams/stream_str_replace.php"
"src/WordPress/Streams/stream_str_replace.php",
"src/WordPress/AsyncHttp/async_http_streams.php"
]
},
"autoload-dev": {
Expand Down
56 changes: 56 additions & 0 deletions http_api.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

use WordPress\AsyncHttp\Client;
use WordPress\AsyncHttp\Request;

require __DIR__ . '/vendor/autoload.php';

$client = new Client();
$client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
echo "$request->url – Downloaded: $downloaded / $total\n";
} );
$streams1 = $client->enqueue( [
new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
] );
// Enqueuing another request here is instant and won't start the download yet.
$streams2 = $client->enqueue( [
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
] );

// Stream a single file, while streaming all the files
file_put_contents( 'output-round1-0.zip', stream_get_contents( $streams1[0] ) );
file_put_contents( 'output-round1-1.zip', stream_get_contents( $streams1[1] ) );

// Initiate more HTTPS requests
$streams3 = $client->enqueue( [
new Request( "https://downloads.wordpress.org/plugin/akismet.4.1.12.zip" ),
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
] );

// Download the rest of the files. Foreach() seems like downloading things
// sequentially, but we're actually streaming all the files in parallel.
$streams = array_merge( $streams2, $streams3 );
foreach ( $streams as $k => $stream ) {
file_put_contents( 'output-round2-' . $k . '.zip', stream_get_contents( $stream ) );
}

echo "Done! :)";

// ----------------------------
//
// Previous explorations:

// Non-blocking parallel processing – the fastest method.
//while ( $results = sockets_http_response_await_bytes( $streams, 8096 ) ) {
// foreach ( $results as $k => $chunk ) {
// file_put_contents( 'output' . $k . '.zip', $chunk, FILE_APPEND );
// }
//}

// Blocking sequential processing – the slowest method.
//foreach ( $streams as $k => $stream ) {
// stream_set_blocking( $stream, 1 );
// file_put_contents( 'output' . $k . '.zip', stream_get_contents( $stream ) );
//}
289 changes: 289 additions & 0 deletions src/WordPress/AsyncHttp/Client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
<?php

namespace WordPress\AsyncHttp;

use Exception;
use WordPress\Util\Map;
use function WordPress\Streams\stream_monitor_progress;
use function WordPress\Streams\streams_http_response_await_bytes;
use function WordPress\Streams\streams_send_http_requests;

/**
* An asynchronous HTTP client library designed for WordPress. Main features:
*
* **Streaming support**
* Enqueuing a request returns a PHP resource that can be read by PHP functions like `fopen()`
* and `stream_get_contents()`
*
* ```php
* $client = new AsyncHttpClient();
* $fp = $client->enqueue(
* new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
* );
* // Read some data
* $first_4_kilobytes = fread($fp, 4096);
* // We've only waited for the first four kilobytes. The download
* // is still in progress at this point, and yet we're free to do
* // other work.
* ```
*
* **Delayed execution and concurrent downloads**
* The actual socket are not open until the first time the stream is read from:
*
* ```php
* $client = new AsyncHttpClient();
* // Enqueuing the requests does not start the data transmission yet.
* $batch = $client->enqueue( [
* new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
* new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
* ] );
* // Even though stream_get_contents() will return just the response body for
* // one request, it also opens the network sockets and starts streaming
* // both enqueued requests. The response data for $batch[1] is buffered.
* $gutenberg_zip = stream_get_contents( $batch[0] )
*
* // At least a chunk of the pendant.zip have already been downloaded, let's
* // wait for the rest of the data:
* $pendant_zip = stream_get_contents( $batch[1] )
* ```
*
* **Concurrency limits**
* The `AsyncHttpClient` will only keep up to `$concurrency` connections open. When one of the
* requests finishes, it will automatically start the next one.
*
* For example:
* ```php
* $client = new AsyncHttpClient();
* // Process at most 10 concurrent request at a time.
* $client->set_concurrency_limit( 10 );
* ```
*
* **Progress monitoring**
* A developer-provided callback (`AsyncHttpClient->set_progress_callback()`) receives progress
* information about every HTTP request.
*
* ```php
* $client = new AsyncHttpClient();
* $client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
* // $total is computed based on the Content-Length response header and
* // null if it's missing.
* echo "$request->url – Downloaded: $downloaded / $total\n";
* } );
* ```
*
* **HTTPS support**
* TLS connections work out of the box.
*
* **Non-blocking sockets**
* The act of opening each socket connection is non-blocking and happens nearly
* instantly. The streams themselves are also set to non-blocking mode via `stream_set_blocking($fp, 0);`
*
* **Asynchronous downloads**
* Start downloading now, do other work in your code, only block once you need the data.
*
* **PHP 7.0 support and no dependencies**
* `AsyncHttpClient` works on any WordPress installation with vanilla PHP only.
* It does not require any PHP extensions, CURL, or any external PHP libraries.
*
* **Supports custom request headers and body**
*/
class Client {
protected $concurrency = 10;
protected Map $requests;
protected $onProgress;
protected $queue_needs_processing = false;

public function __construct() {
$this->requests = new Map();
$this->onProgress = function () {
};
}

/**
* Sets the limit of concurrent connections this client will open.
*
* @param int $concurrency
*/
public function set_concurrency_limit( $concurrency ) {
$this->concurrency = $concurrency;
}

/**
* Sets the callback called when response bytes are received on any of the enqueued
* requests.
*
* @param callable $onProgress A function of three arguments:
* Request $request, int $downloaded, int $total.
*/
public function set_progress_callback( $onProgress ) {
$this->onProgress = $onProgress;
}

/**
* Enqueues one or multiple HTTP requests for asynchronous processing.
* It does not open the network sockets, only adds the Request objects to
* an internal queue. Network transmission is delayed until one of the returned
* streams is read from.
*
* @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests.
*
* @return resource|array The enqueued streams.
*/
public function enqueue( $requests ) {
if ( ! is_array( $requests ) ) {
return $this->enqueue_request( $requests );
}

$enqueued_streams = array();
foreach ( $requests as $request ) {
$enqueued_streams[] = $this->enqueue_request( $request );
}

return $enqueued_streams;
}

/**
* Returns the response stream associated with the given Request object.
* Enqueues the Request if it hasn't been enqueued yet.
*
* @param Request $request
*
* @return resource
*/
public function get_stream( Request $request ) {
if ( ! isset( $this->requests[ $request ] ) ) {
$this->enqueue_request( $request );
}

if ( $this->queue_needs_processing ) {
$this->process_queue();
}

return $this->requests[ $request ]->stream;
}

protected function enqueue_request( Request $request ) {
$stream = StreamWrapper::create_resource(
new StreamData( $request, $this )
);
$this->requests[ $request ] = new RequestInfo( $stream );
$this->queue_needs_processing = true;

return $stream;
}

/**
* Starts n enqueued request up to the $concurrency_limit.
*/
public function process_queue() {
$this->queue_needs_processing = false;

$active_requests = count( $this->get_streamed_requests() );
$backfill = $this->concurrency - $active_requests;
if ( $backfill <= 0 ) {
return;
}

$enqueued = array_slice( $this->get_enqueued_request(), 0, $backfill );
list( $streams, $response_headers ) = streams_send_http_requests( $enqueued );

foreach ( $streams as $k => $stream ) {
$request = $enqueued[ $k ];
$total = $response_headers[ $k ]['headers']['content-length'];
$this->requests[ $request ]->state = RequestInfo::STATE_STREAMING;
$this->requests[ $request ]->stream = stream_monitor_progress(
$stream,
function ( $downloaded ) use ( $request, $total ) {
$onProgress = $this->onProgress;
$onProgress( $request, $downloaded, $total );
}
);
}
}

protected function get_enqueued_request() {
$enqueued_requests = [];
foreach ( $this->requests as $request => $info ) {
if ( $info->state === RequestInfo::STATE_ENQUEUED ) {
$enqueued_requests[] = $request;
}
}

return $enqueued_requests;
}

protected function get_streamed_requests() {
$active_requests = [];
foreach ( $this->requests as $request => $info ) {
if ( $info->state !== RequestInfo::STATE_ENQUEUED ) {
$active_requests[] = $request;
}
}

return $active_requests;
}

/**
* Reads up to $length bytes from the stream while polling all the active streams.
*
* @param Request $request
* @param $length
*
* @return false|string
* @throws Exception
*/
public function read_bytes( Request $request, $length ) {
if ( ! isset( $this->requests[ $request ] ) ) {
return false;
}

if ( $this->queue_needs_processing ) {
$this->process_queue();
}

$request_info = $this->requests[ $request ];
$stream = $request_info->stream;

$active_requests = $this->get_streamed_requests();
$active_streams = array_map( function ( $request ) {
return $this->requests[ $request ]->stream;
}, $active_requests );

if ( ! count( $active_streams ) ) {
return false;
}

while ( true ) {
if ( ! $request_info->isFinished() && feof( $stream ) ) {
$request_info->state = RequestInfo::STATE_FINISHED;
fclose( $stream );
$this->queue_needs_processing = true;
}

if ( strlen( $request_info->buffer ) >= $length ) {
$buffered = substr( $request_info->buffer, 0, $length );
$request_info->buffer = substr( $request_info->buffer, $length );

return $buffered;
} elseif ( $request_info->isFinished() ) {
unset( $this->requests[ $request ] );

return $request_info->buffer;
}

$active_streams = array_filter( $active_streams, function ( $stream ) {
return ! feof( $stream );
} );
if ( ! count( $active_streams ) ) {
continue;
}
$bytes = streams_http_response_await_bytes(
$active_streams,
$length - strlen( $request_info->buffer )
);
foreach ( $bytes as $k => $chunk ) {
$this->requests[ $active_requests[ $k ] ]->buffer .= $chunk;
}
}
}
}
16 changes: 16 additions & 0 deletions src/WordPress/AsyncHttp/Request.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace WordPress\AsyncHttp;

class Request {

public string $url;

/**
* @param string $url
*/
public function __construct( string $url ) {
$this->url = $url;
}

}
Loading

0 comments on commit a9f10c1

Please sign in to comment.