From a9f10c13d27d92f9f343031c81486d7970f115b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Mon, 18 Mar 2024 18:36:49 +0100 Subject: [PATCH] Asynchronous HTTP Client Library (#72) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An asynchronous HTTP client library designed for WordPress. Main features: ### Streaming support Enqueuing a request returns a PHP resource that can be read by PHP functions like `fopen()` and `stream_get_contents()` ```php $client = new AsyncHttpClient(); $fp = $client->enqueue( new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), ); // Read some data $first_4_kilobytes = fread($fp, 4096); // We've only waited for the first four kilobytes. The download // is still in progress at this point, and yet we're free to do // other work. ``` ### Delayed execution and concurrent downloads The actual socket are not open until the first time the stream is read from: ```php $client = new AsyncHttpClient(); // Enqueuing the requests does not start the data transmission yet. $batch = $client->enqueue( [ new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), new Request( "https://downloads.wordpress.org/theme/pendant.zip" ), ] ); // Even though stream_get_contents() will return just the response body for // one request, it also opens the network sockets and starts streaming // both enqueued requests. The response data for $batch[1] is buffered. $gutenberg_zip = stream_get_contents( $batch[0] ) // At least a chunk of the pendant.zip have already been downloaded, let's // wait for the rest of the data: $pendant_zip = stream_get_contents( $batch[1] ) ``` ### Concurrency limits The `AsyncHttpClient` will only keep up to `$concurrency` connections open. When one of the requests finishes, it will automatically start the next one. For example: ```php $client = new AsyncHttpClient(); // Process at most 10 concurrent request at a time. $client->set_concurrency_limit( 10 ); ``` ### Progress monitoring A developer-provided callback (`AsyncHttpClient->set_progress_callback()`) receives progress information about every HTTP request. ```php $client = new AsyncHttpClient(); $client->set_progress_callback( function ( Request $request, $downloaded, $total ) { // $total is computed based on the Content-Length response header and // null if it's missing. echo "$request->url – Downloaded: $downloaded / $total\n"; } ); ``` ### HTTPS support TLS connections work out of the box. ### Non-blocking sockets The act of opening each socket connection is non-blocking and happens nearly instantly. The streams themselves are also set to non-blocking mode via `stream_set_blocking($fp, 0);` ### Asynchronous downloads Start downloading now, do other work in your code, only block once you need the data. ### PHP 7.0 support and no dependencies `AsyncHttpClient` works on any WordPress installation with vanilla PHP only. It does not require any PHP extensions, CURL, or any external PHP libraries. ### Supports custom request headers and body ## Implementation details * **Non-blocking stream opening:** * `streams_http_open_nonblocking` utilizes `stream_http_open_nonblocking` to open streams for the provided URLs. * `stream_http_open_nonblocking` first validates the URL scheme (only http and https are supported). * It then creates a stream context with a `tcp://` wrapper to open the connection because the `https://` and `ssl://` wrappers would block until the SSL handshake is complete. * After opening the connection using `stream_socket_client`, it switches the stream to non-blocking mode using `stream_set_blocking`. * **Asynchronous HTTP request sending:** * `streams_http_requests_send` iterates over the provided streams and enables encryption (crypto) on each one using `stream_socket_enable_crypto`. * It then uses `stream_select` to wait for the streams to become writable and sends the HTTP request headers using `fwrite`. * **Reading the response:** * `streams_http_response_await_bytes` utilizes `stream_select` to wait for a specified number of bytes to become available on any of the streams. * `streams_http_response_await_headers` retrieves the full HTTP response headers iteratively. It reads bytes from the streams until the end-of-headers marker (`\r\n\r\n`). The rest of the response stream, which is the response body, is available for the consumer code to read. * Reading from each async stream triggers `stream_select` to buffer any data available on other concurrent connections. This is implemented via a `stream_read` method of a custom stream wrapper. * **Progress monitoring:** * `stream_monitor_progress` taps into the `stream_read` operation using a stream wrapper and reports the number of read bytes to a callback function. ## Related issues * https://github.com/WordPress/blueprints-library/issues/71 --- composer.json | 7 +- http_api.php | 56 ++++ src/WordPress/AsyncHttp/Client.php | 289 ++++++++++++++++ src/WordPress/AsyncHttp/Request.php | 16 + src/WordPress/AsyncHttp/RequestInfo.php | 24 ++ src/WordPress/AsyncHttp/StreamData.php | 18 + src/WordPress/AsyncHttp/StreamWrapper.php | 106 ++++++ .../AsyncHttp/async_http_streams.php | 308 ++++++++++++++++++ .../Blueprints/Compile/BlueprintCompiler.php | 6 +- src/WordPress/Blueprints/ContainerBuilder.php | 85 +++-- .../Blueprints/Model/BlueprintBuilder.php | 18 +- .../Model/DataClass/InstallPluginStep.php | 23 +- .../Blueprints/Resources/ResourceManager.php | 5 +- .../Blueprints/Resources/ResourceMap.php | 60 ---- .../DataSource/PlaygroundFetchSource.php | 5 - src/WordPress/DataSource/UrlSource.php | 136 ++------ src/WordPress/Streams/StreamPeeker.php | 132 -------- ...PeekerContext.php => StreamPeekerData.php} | 3 +- src/WordPress/Streams/StreamPeekerWrapper.php | 74 +++++ .../Streams/StreamWrapperInterface.php | 59 ++++ .../Streams/VanillaStreamWrapper.php | 98 ++++++ .../Streams/VanillaStreamWrapperData.php | 11 + src/WordPress/Util/ArrayPairIterator.php | 37 +++ src/WordPress/Util/Map.php | 58 ++++ src/WordPress/Zip/ZipStreamReader.php | 19 +- src/WordPress/Zip/functions.php | 2 +- .../Runner/Step/UnzipStepRunnerTest.php | 2 +- tests/Unit/StreamPeeker.php | 8 +- 28 files changed, 1256 insertions(+), 409 deletions(-) create mode 100644 http_api.php create mode 100644 src/WordPress/AsyncHttp/Client.php create mode 100644 src/WordPress/AsyncHttp/Request.php create mode 100644 src/WordPress/AsyncHttp/RequestInfo.php create mode 100644 src/WordPress/AsyncHttp/StreamData.php create mode 100644 src/WordPress/AsyncHttp/StreamWrapper.php create mode 100644 src/WordPress/AsyncHttp/async_http_streams.php delete mode 100644 src/WordPress/Blueprints/Resources/ResourceMap.php delete mode 100644 src/WordPress/Streams/StreamPeeker.php rename src/WordPress/Streams/{StreamPeekerContext.php => StreamPeekerData.php} (60%) create mode 100644 src/WordPress/Streams/StreamPeekerWrapper.php create mode 100644 src/WordPress/Streams/StreamWrapperInterface.php create mode 100644 src/WordPress/Streams/VanillaStreamWrapper.php create mode 100644 src/WordPress/Streams/VanillaStreamWrapperData.php create mode 100644 src/WordPress/Util/ArrayPairIterator.php create mode 100644 src/WordPress/Util/Map.php diff --git a/composer.json b/composer.json index 0f5d244b..2421ffe6 100644 --- a/composer.json +++ b/composer.json @@ -4,8 +4,6 @@ "symfony/event-dispatcher": "*", "symfony/filesystem": "*", "symfony/process": "*", - "symfony/http-client": "*", - "symfony/http-kernel": "*", "pimple/pimple": "*", "psr/simple-cache": "*", "opis/json-schema": "*", @@ -14,7 +12,7 @@ }, "require-dev": { "phpunit/phpunit": "*", - "squizlabs/php_codesniffer":"*", + "squizlabs/php_codesniffer": "*", "nette/php-generator": "*", "jane-php/json-schema": "*", "bamarni/composer-bin-plugin": "*", @@ -43,7 +41,8 @@ "files": [ "src/WordPress/Blueprints/functions.php", "src/WordPress/Zip/functions.php", - "src/WordPress/Streams/stream_str_replace.php" + "src/WordPress/Streams/stream_str_replace.php", + "src/WordPress/AsyncHttp/async_http_streams.php" ] }, "autoload-dev": { diff --git a/http_api.php b/http_api.php new file mode 100644 index 00000000..12a4be00 --- /dev/null +++ b/http_api.php @@ -0,0 +1,56 @@ +set_progress_callback( function ( Request $request, $downloaded, $total ) { + echo "$request->url – Downloaded: $downloaded / $total\n"; +} ); +$streams1 = $client->enqueue( [ + new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), + new Request( "https://downloads.wordpress.org/theme/pendant.zip" ), +] ); +// Enqueuing another request here is instant and won't start the download yet. +$streams2 = $client->enqueue( [ + new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), +] ); + +// Stream a single file, while streaming all the files +file_put_contents( 'output-round1-0.zip', stream_get_contents( $streams1[0] ) ); +file_put_contents( 'output-round1-1.zip', stream_get_contents( $streams1[1] ) ); + +// Initiate more HTTPS requests +$streams3 = $client->enqueue( [ + new Request( "https://downloads.wordpress.org/plugin/akismet.4.1.12.zip" ), + new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), + new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ), +] ); + +// Download the rest of the files. Foreach() seems like downloading things +// sequentially, but we're actually streaming all the files in parallel. +$streams = array_merge( $streams2, $streams3 ); +foreach ( $streams as $k => $stream ) { + file_put_contents( 'output-round2-' . $k . '.zip', stream_get_contents( $stream ) ); +} + +echo "Done! :)"; + +// ---------------------------- +// +// Previous explorations: + +// Non-blocking parallel processing – the fastest method. +//while ( $results = sockets_http_response_await_bytes( $streams, 8096 ) ) { +// foreach ( $results as $k => $chunk ) { +// file_put_contents( 'output' . $k . '.zip', $chunk, FILE_APPEND ); +// } +//} + +// Blocking sequential processing – the slowest method. +//foreach ( $streams as $k => $stream ) { +// stream_set_blocking( $stream, 1 ); +// file_put_contents( 'output' . $k . '.zip', stream_get_contents( $stream ) ); +//} diff --git a/src/WordPress/AsyncHttp/Client.php b/src/WordPress/AsyncHttp/Client.php new file mode 100644 index 00000000..fb53d58a --- /dev/null +++ b/src/WordPress/AsyncHttp/Client.php @@ -0,0 +1,289 @@ +enqueue( + * new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), + * ); + * // Read some data + * $first_4_kilobytes = fread($fp, 4096); + * // We've only waited for the first four kilobytes. The download + * // is still in progress at this point, and yet we're free to do + * // other work. + * ``` + * + * **Delayed execution and concurrent downloads** + * The actual socket are not open until the first time the stream is read from: + * + * ```php + * $client = new AsyncHttpClient(); + * // Enqueuing the requests does not start the data transmission yet. + * $batch = $client->enqueue( [ + * new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ), + * new Request( "https://downloads.wordpress.org/theme/pendant.zip" ), + * ] ); + * // Even though stream_get_contents() will return just the response body for + * // one request, it also opens the network sockets and starts streaming + * // both enqueued requests. The response data for $batch[1] is buffered. + * $gutenberg_zip = stream_get_contents( $batch[0] ) + * + * // At least a chunk of the pendant.zip have already been downloaded, let's + * // wait for the rest of the data: + * $pendant_zip = stream_get_contents( $batch[1] ) + * ``` + * + * **Concurrency limits** + * The `AsyncHttpClient` will only keep up to `$concurrency` connections open. When one of the + * requests finishes, it will automatically start the next one. + * + * For example: + * ```php + * $client = new AsyncHttpClient(); + * // Process at most 10 concurrent request at a time. + * $client->set_concurrency_limit( 10 ); + * ``` + * + * **Progress monitoring** + * A developer-provided callback (`AsyncHttpClient->set_progress_callback()`) receives progress + * information about every HTTP request. + * + * ```php + * $client = new AsyncHttpClient(); + * $client->set_progress_callback( function ( Request $request, $downloaded, $total ) { + * // $total is computed based on the Content-Length response header and + * // null if it's missing. + * echo "$request->url – Downloaded: $downloaded / $total\n"; + * } ); + * ``` + * + * **HTTPS support** + * TLS connections work out of the box. + * + * **Non-blocking sockets** + * The act of opening each socket connection is non-blocking and happens nearly + * instantly. The streams themselves are also set to non-blocking mode via `stream_set_blocking($fp, 0);` + * + * **Asynchronous downloads** + * Start downloading now, do other work in your code, only block once you need the data. + * + * **PHP 7.0 support and no dependencies** + * `AsyncHttpClient` works on any WordPress installation with vanilla PHP only. + * It does not require any PHP extensions, CURL, or any external PHP libraries. + * + * **Supports custom request headers and body** + */ +class Client { + protected $concurrency = 10; + protected Map $requests; + protected $onProgress; + protected $queue_needs_processing = false; + + public function __construct() { + $this->requests = new Map(); + $this->onProgress = function () { + }; + } + + /** + * Sets the limit of concurrent connections this client will open. + * + * @param int $concurrency + */ + public function set_concurrency_limit( $concurrency ) { + $this->concurrency = $concurrency; + } + + /** + * Sets the callback called when response bytes are received on any of the enqueued + * requests. + * + * @param callable $onProgress A function of three arguments: + * Request $request, int $downloaded, int $total. + */ + public function set_progress_callback( $onProgress ) { + $this->onProgress = $onProgress; + } + + /** + * Enqueues one or multiple HTTP requests for asynchronous processing. + * It does not open the network sockets, only adds the Request objects to + * an internal queue. Network transmission is delayed until one of the returned + * streams is read from. + * + * @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests. + * + * @return resource|array The enqueued streams. + */ + public function enqueue( $requests ) { + if ( ! is_array( $requests ) ) { + return $this->enqueue_request( $requests ); + } + + $enqueued_streams = array(); + foreach ( $requests as $request ) { + $enqueued_streams[] = $this->enqueue_request( $request ); + } + + return $enqueued_streams; + } + + /** + * Returns the response stream associated with the given Request object. + * Enqueues the Request if it hasn't been enqueued yet. + * + * @param Request $request + * + * @return resource + */ + public function get_stream( Request $request ) { + if ( ! isset( $this->requests[ $request ] ) ) { + $this->enqueue_request( $request ); + } + + if ( $this->queue_needs_processing ) { + $this->process_queue(); + } + + return $this->requests[ $request ]->stream; + } + + protected function enqueue_request( Request $request ) { + $stream = StreamWrapper::create_resource( + new StreamData( $request, $this ) + ); + $this->requests[ $request ] = new RequestInfo( $stream ); + $this->queue_needs_processing = true; + + return $stream; + } + + /** + * Starts n enqueued request up to the $concurrency_limit. + */ + public function process_queue() { + $this->queue_needs_processing = false; + + $active_requests = count( $this->get_streamed_requests() ); + $backfill = $this->concurrency - $active_requests; + if ( $backfill <= 0 ) { + return; + } + + $enqueued = array_slice( $this->get_enqueued_request(), 0, $backfill ); + list( $streams, $response_headers ) = streams_send_http_requests( $enqueued ); + + foreach ( $streams as $k => $stream ) { + $request = $enqueued[ $k ]; + $total = $response_headers[ $k ]['headers']['content-length']; + $this->requests[ $request ]->state = RequestInfo::STATE_STREAMING; + $this->requests[ $request ]->stream = stream_monitor_progress( + $stream, + function ( $downloaded ) use ( $request, $total ) { + $onProgress = $this->onProgress; + $onProgress( $request, $downloaded, $total ); + } + ); + } + } + + protected function get_enqueued_request() { + $enqueued_requests = []; + foreach ( $this->requests as $request => $info ) { + if ( $info->state === RequestInfo::STATE_ENQUEUED ) { + $enqueued_requests[] = $request; + } + } + + return $enqueued_requests; + } + + protected function get_streamed_requests() { + $active_requests = []; + foreach ( $this->requests as $request => $info ) { + if ( $info->state !== RequestInfo::STATE_ENQUEUED ) { + $active_requests[] = $request; + } + } + + return $active_requests; + } + + /** + * Reads up to $length bytes from the stream while polling all the active streams. + * + * @param Request $request + * @param $length + * + * @return false|string + * @throws Exception + */ + public function read_bytes( Request $request, $length ) { + if ( ! isset( $this->requests[ $request ] ) ) { + return false; + } + + if ( $this->queue_needs_processing ) { + $this->process_queue(); + } + + $request_info = $this->requests[ $request ]; + $stream = $request_info->stream; + + $active_requests = $this->get_streamed_requests(); + $active_streams = array_map( function ( $request ) { + return $this->requests[ $request ]->stream; + }, $active_requests ); + + if ( ! count( $active_streams ) ) { + return false; + } + + while ( true ) { + if ( ! $request_info->isFinished() && feof( $stream ) ) { + $request_info->state = RequestInfo::STATE_FINISHED; + fclose( $stream ); + $this->queue_needs_processing = true; + } + + if ( strlen( $request_info->buffer ) >= $length ) { + $buffered = substr( $request_info->buffer, 0, $length ); + $request_info->buffer = substr( $request_info->buffer, $length ); + + return $buffered; + } elseif ( $request_info->isFinished() ) { + unset( $this->requests[ $request ] ); + + return $request_info->buffer; + } + + $active_streams = array_filter( $active_streams, function ( $stream ) { + return ! feof( $stream ); + } ); + if ( ! count( $active_streams ) ) { + continue; + } + $bytes = streams_http_response_await_bytes( + $active_streams, + $length - strlen( $request_info->buffer ) + ); + foreach ( $bytes as $k => $chunk ) { + $this->requests[ $active_requests[ $k ] ]->buffer .= $chunk; + } + } + } +} diff --git a/src/WordPress/AsyncHttp/Request.php b/src/WordPress/AsyncHttp/Request.php new file mode 100644 index 00000000..c083f356 --- /dev/null +++ b/src/WordPress/AsyncHttp/Request.php @@ -0,0 +1,16 @@ +url = $url; + } + +} diff --git a/src/WordPress/AsyncHttp/RequestInfo.php b/src/WordPress/AsyncHttp/RequestInfo.php new file mode 100644 index 00000000..b2ddc71c --- /dev/null +++ b/src/WordPress/AsyncHttp/RequestInfo.php @@ -0,0 +1,24 @@ +stream = $stream; + } + + public function isFinished() { + return $this->state === self::STATE_FINISHED; + } + +} diff --git a/src/WordPress/AsyncHttp/StreamData.php b/src/WordPress/AsyncHttp/StreamData.php new file mode 100644 index 00000000..267cd33d --- /dev/null +++ b/src/WordPress/AsyncHttp/StreamData.php @@ -0,0 +1,18 @@ +request = $request; + $this->client = $group; + } + +} diff --git a/src/WordPress/AsyncHttp/StreamWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper.php new file mode 100644 index 00000000..18492a9e --- /dev/null +++ b/src/WordPress/AsyncHttp/StreamWrapper.php @@ -0,0 +1,106 @@ +stream ) { + $this->stream = $this->client->get_stream( $this->wrapper_data->request ); + } + } + + public function stream_open( $path, $mode, $options, &$opened_path ) { + if ( ! parent::stream_open( $path, $mode, $options, $opened_path ) ) { + return false; + } + + if ( ! $this->wrapper_data->client ) { + return false; + } + $this->client = $this->wrapper_data->client; + + return true; + } + + public function stream_cast( int $cast_as ) { + $this->initialize(); + + return parent::stream_cast( $cast_as ); + } + + public function stream_read( $count ) { + $this->initialize(); + + return $this->client->read_bytes( $this->wrapper_data->request, $count ); + } + + public function stream_write( $data ) { + $this->initialize(); + + return parent::stream_write( $data ); + } + + public function stream_tell() { + if ( ! $this->stream ) { + return false; + } + + return parent::stream_tell(); + } + + public function stream_close() { + if ( ! $this->stream ) { + return false; + } + + if ( ! $this->has_valid_stream() ) { + return false; + } + + return parent::stream_close(); + } + + public function stream_eof() { + if ( ! $this->stream ) { + return false; + } + + if ( ! $this->has_valid_stream() ) { + return true; + } + + return parent::stream_eof(); + } + + public function stream_seek( $offset, $whence ) { + if ( ! $this->stream ) { + return false; + } + + return parent::stream_seek( $offset, $whence ); + } + + /* + * This stream_close call could be initiated not by the developer, + * but by the PHP internal request shutdown handler (written in C). + * + * The underlying resource ($this->stream) may have already been closed + * and freed independently from the resource represented by $this stream + * wrapper. In this case, the type of $this->stream will be "Unknown", + * and the fclose() call will trigger a fatal error. + * + * Let's refuse to call fclose() in that scenario. + */ + protected function has_valid_stream() { + return get_resource_type( $this->stream ) !== "Unknown"; + } + +} diff --git a/src/WordPress/AsyncHttp/async_http_streams.php b/src/WordPress/AsyncHttp/async_http_streams.php new file mode 100644 index 00000000..ac4fdc2b --- /dev/null +++ b/src/WordPress/AsyncHttp/async_http_streams.php @@ -0,0 +1,308 @@ + $url ) { + $streams[ $k ] = stream_http_open_nonblocking( $url ); + } + + return $streams; +} + +/** + * Opens a HTTP or HTTPS stream using stream_socket_client() without blocking, + * and returns nearly immediately. + * + * The act of opening a stream is non-blocking itself. This function uses + * a tcp:// stream wrapper, because both https:// and ssl:// wrappers would block + * until the SSL handshake is complete. + * The actual socket it then switched to non-blocking mode using stream_set_blocking(). + * + * @param string $url The URL to open the stream for. + * + * @return resource|false The opened stream resource or false on failure. + * @throws InvalidArgumentException If the URL scheme is invalid. + * @throws Exception If unable to open the stream. + */ +function stream_http_open_nonblocking( $url ) { + $parts = parse_url( $url ); + $scheme = $parts['scheme']; + if ( ! in_array( $scheme, [ 'http', 'https' ] ) ) { + throw new InvalidArgumentException( 'Invalid scheme – only http:// and https:// URLs are supported' ); + } + + $port = $parts['port'] ?? ( $scheme === 'https' ? 443 : 80 ); + $host = $parts['host']; + + // Create stream context + $context = stream_context_create( + [ + 'socket' => [ + 'isSsl' => $scheme === 'https', + 'originalUrl' => $url, + 'socketUrl' => 'tcp://' . $host . ':' . $port, + ], + ] + ); + + $stream = stream_socket_client( + 'tcp://' . $host . ':' . $port, + $errno, + $errstr, + 30, + STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT, + $context + ); + if ( $stream === false ) { + throw new Exception( 'stream_socket_client() was unable to open a stream to ' . $url ); + } + + stream_set_blocking( $stream, 0 ); + + return $stream; +} + +/** + * Sends HTTP requests using streams. + * + * Takes an array of asynchronous streams open using stream_http_open_nonblocking(), + * enables crypto on the streams, and sends the request headers asynchronously. + * + * @param array $streams An array of streams to send the requests. + * + * @throws Exception If there is an error enabling crypto or if stream_select times out. + */ +function streams_http_requests_send( $streams ) { + $read = $except = null; + $remaining_streams = $streams; + while ( count( $remaining_streams ) ) { + $write = $remaining_streams; + $ready = @stream_select( $read, $write, $except, 0, 5000000 ); + if ( $ready === false ) { + throw new Exception( "Error: " . error_get_last()['message'] ); + } elseif ( $ready <= 0 ) { + throw new Exception( "stream_select timed out" ); + } + + foreach ( $write as $k => $stream ) { + $enabled_crypto = stream_socket_enable_crypto( $stream, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT ); + if ( false === $enabled_crypto ) { + throw new Exception( "Failed to enable crypto: " . error_get_last()['message'] ); + } elseif ( 0 === $enabled_crypto ) { + // Wait for the handshake to complete + } else { + // SSL handshake complete, send the request headers + $context = stream_context_get_options( $stream ); + $request = stream_http_prepare_request_bytes( $context['socket']['originalUrl'] ); + fwrite( $stream, $request ); + unset( $remaining_streams[ $k ] ); + } + } + } +} + + +/** + * Waits for response bytes to be available in the given streams. + * + * @param array $streams The array of streams to wait for. + * @param int $length The number of bytes to read from each stream. + * @param int $timeout_microseconds The timeout in microseconds for the stream_select function. + * + * @return array|false An array of chunks read from the streams, or false if no streams are available. + * @throws Exception If an error occurs during the stream_select operation or if the operation times out. + */ +function streams_http_response_await_bytes( $streams, $length, $timeout_microseconds = 5000000 ) { + $read = $streams; + if ( count( $read ) === 0 ) { + return false; + } + $write = []; + $except = null; + $ready = @stream_select( $read, $write, $except, 0, $timeout_microseconds ); + if ( $ready === false ) { + throw new Exception( "Could not retrieve response bytes: " . error_get_last()['message'] ); + } elseif ( $ready <= 0 ) { + throw new Exception( "stream_select timed out" ); + } + + $chunks = []; + foreach ( $read as $k => $stream ) { + $chunks[ $k ] = fread( $stream, $length ); + } + + return $chunks; +} + +/** + * Parses an HTTP headers string into an array containing the status and headers. + * + * @param string $headers The HTTP headers to parse. + * + * @return array An array containing the parsed status and headers. + */ + +function parse_http_headers( string $headers ) { + $lines = explode( "\r\n", $headers ); + $status = array_shift( $lines ); + $status = explode( ' ', $status ); + $status = [ + 'protocol' => $status[0], + 'code' => $status[1], + 'message' => $status[2], + ]; + $headers = []; + foreach ( $lines as $line ) { + if ( ! str_contains( $line, ': ' ) ) { + continue; + } + $line = explode( ': ', $line ); + $headers[ strtolower( $line[0] ) ] = $line[1]; + } + + return [ + 'status' => $status, + 'headers' => $headers, + ]; +} + +/** + * Prepares an HTTP request string for a given URL. + * + * @param string $url The URL to prepare the request for. + * + * @return string The prepared HTTP request string. + */ + +function stream_http_prepare_request_bytes( $url ) { + $parts = parse_url( $url ); + $host = $parts['host']; + $path = $parts['path'] . ( isset( $parts['query'] ) ? '?' . $parts['query'] : '' ); + $request = << $stream ) { + $headers[ $k ] = ''; + } + $remaining_streams = $streams; + while ( true ) { + $bytes = streams_http_response_await_bytes( $remaining_streams, 1 ); + if ( false === $bytes ) { + break; + } + foreach ( $bytes as $k => $byte ) { + $headers[ $k ] .= $byte; + if ( str_ends_with( $headers[ $k ], "\r\n\r\n" ) ) { + unset( $remaining_streams[ $k ] ); + } + } + } + + foreach ( $headers as $k => $header ) { + $headers[ $k ] = parse_http_headers( $header ); + } + + return $headers; +} + +/** + * Monitors the progress of a stream while reading its content. + * + * @param resource $stream The stream to monitor. + * @param callable $onProgress The callback function to be called on each progress update. + * It should accept a single parameters: the number of bytes streamed so far. + * + * @return resource The wrapped stream resource. + */ +function stream_monitor_progress( $stream, $onProgress ) { + return StreamPeekerWrapper::create_resource( + new StreamPeekerData( + $stream, + function ( $data ) use ( $onProgress ) { + static $streamedBytes = 0; + $streamedBytes += strlen( $data ); + $onProgress( $streamedBytes ); + } + ) + ); +} + +/** + * Sends multiple HTTP requests asynchronously and returns the response streams. + * + * @param array $requests An array of HTTP requests. + * + * @return array An array containing the final streams and response headers. + * @throws Exception If any of the requests fail with a non-successful HTTP code. + */ +function streams_send_http_requests( array $requests ) { + $urls = []; + foreach ( $requests as $k => $request ) { + $urls[ $k ] = $request->url; + } + $redirects = $urls; + $final_streams = []; + $response_headers = []; + do { + $streams = streams_http_open_nonblocking( $redirects ); + streams_http_requests_send( $streams ); + + $redirects = []; + $headers = streams_http_response_await_headers( $streams ); + foreach ( array_keys( $headers ) as $k ) { + $code = $headers[ $k ]['status']['code']; + if ( $code > 399 || $code < 200 ) { + throw new Exception( "Failed to download file " . $requests[ $k ]->url . ": Server responded with HTTP code " . $code ); + } + if ( isset( $headers[ $k ]['headers']['location'] ) ) { + $redirects[ $k ] = $headers[ $k ]['headers']['location']; + fclose( $streams[ $k ] ); + continue; + } + + $final_streams[ $k ] = $streams[ $k ]; + $response_headers[ $k ] = $headers[ $k ]; + } + } while ( count( $redirects ) ); + + return [ $final_streams, $response_headers ]; +} diff --git a/src/WordPress/Blueprints/Compile/BlueprintCompiler.php b/src/WordPress/Blueprints/Compile/BlueprintCompiler.php index 4e4a6df0..632268e5 100644 --- a/src/WordPress/Blueprints/Compile/BlueprintCompiler.php +++ b/src/WordPress/Blueprints/Compile/BlueprintCompiler.php @@ -5,7 +5,6 @@ use WordPress\Blueprints\Model\DataClass\Blueprint; use WordPress\Blueprints\Model\DataClass\DefineWpConfigConstsStep; use WordPress\Blueprints\Model\DataClass\DownloadWordPressStep; -use WordPress\Blueprints\Model\DataClass\InstallPluginStep; use WordPress\Blueprints\Model\DataClass\InstallSqliteIntegrationStep; use WordPress\Blueprints\Model\DataClass\ResourceDefinitionInterface; use WordPress\Blueprints\Model\DataClass\RunWordPressInstallerStep; @@ -17,8 +16,8 @@ use WordPress\Blueprints\Progress\Tracker; class BlueprintCompiler { - protected $stepRunnerFactory; - protected $resourceResolver; + protected $stepRunnerFactory; + protected ResourceResolverInterface $resourceResolver; public function __construct( $stepRunnerFactory, @@ -60,6 +59,7 @@ protected function expandShorthandsIntoSteps( Blueprint $blueprint ) { ( new UrlResource() ) ->setUrl( 'https://downloads.wordpress.org/plugin/sqlite-database-integration.zip' ) ); +// @TODO: stream_select times out here: $additional_steps[] = ( new WriteFileStep() ) ->setPath( 'wp-cli.phar' ) ->setData( ( new UrlResource() )->setUrl( 'https://playground.wordpress.net/wp-cli.phar' ) ); diff --git a/src/WordPress/Blueprints/ContainerBuilder.php b/src/WordPress/Blueprints/ContainerBuilder.php index 5fe51199..09212605 100644 --- a/src/WordPress/Blueprints/ContainerBuilder.php +++ b/src/WordPress/Blueprints/ContainerBuilder.php @@ -4,7 +4,7 @@ use InvalidArgumentException; use Pimple\Container; -use Symfony\Component\HttpClient\HttpClient; +use WordPress\AsyncHttp\Client; use WordPress\Blueprints\Cache\FileCache; use WordPress\Blueprints\Compile\BlueprintCompiler; use WordPress\Blueprints\Model\DataClass\ActivatePluginStep; @@ -59,17 +59,15 @@ use WordPress\Blueprints\Runner\Step\WPCLIStepRunner; use WordPress\Blueprints\Runner\Step\WriteFileStepRunner; use WordPress\Blueprints\Runtime\RuntimeInterface; -use WordPress\DataSource\FileSource; use WordPress\DataSource\PlaygroundFetchSource; -use WordPress\DataSource\DataSourceProgressEvent; use WordPress\DataSource\UrlSource; class ContainerBuilder { - const ENVIRONMENT_NATIVE = 'native'; + const ENVIRONMENT_NATIVE = 'native'; const ENVIRONMENT_PLAYGROUND = 'playground'; - const ENVIRONMENT_WP_NOW = 'wp-now'; - const ENVIRONMENTS = array( + const ENVIRONMENT_WP_NOW = 'wp-now'; + const ENVIRONMENTS = array( self::ENVIRONMENT_NATIVE, self::ENVIRONMENT_PLAYGROUND, self::ENVIRONMENT_WP_NOW, @@ -83,22 +81,17 @@ public function __construct() { public function build( string $environment, RuntimeInterface $runtime ) { - $container = $this->container; + $container = $this->container; $container['runtime'] = function () use ( $runtime ) { return $runtime; }; if ( $environment === static::ENVIRONMENT_NATIVE ) { - $container['downloads_cache'] = function ( $c ) { + $container['downloads_cache'] = function ( $c ) { return new FileCache(); }; - $container['http_client'] = function ( $c ) { - return HttpClient::create(); - }; - $container['progress_reporter'] = function ( $c ) { - return function ( DataSourceProgressEvent $event ) { - echo $event->url . ' ' . $event->downloadedBytes . '/' . $event->totalBytes . " \r"; - }; + $container['http_client'] = function ( $c ) { + return new Client(); }; $container[ 'resource.resolver.' . UrlResource::DISCRIMINATOR ] = function ( $c ) { return new UrlResourceResolver( $c['data_source.url'] ); @@ -107,11 +100,6 @@ public function build( string $environment, RuntimeInterface $runtime ) { $container[ 'resource.resolver.' . UrlResource::DISCRIMINATOR ] = function ( $c ) { return new UrlResourceResolver( $c['data_source.playground_fetch'] ); }; - $container['progress_reporter'] = function ( $c ) { - return function ( DataSourceProgressEvent $event ) { - echo $event->url . ' ' . $event->downloadedBytes . '/' . $event->totalBytes . " \r"; - }; - }; } else { throw new InvalidArgumentException( 'Not implemented yet' ); } @@ -149,7 +137,7 @@ function () use ( $c ) { $container['blueprint.json_schema_path'] = function () { return __DIR__ . '/schema.json'; }; - $container['blueprint.json_schema'] = function ( $c ) { + $container['blueprint.json_schema'] = function ( $c ) { return json_decode( file_get_contents( $c['blueprint.json_schema_path'] ) ); }; @@ -158,10 +146,10 @@ function () use ( $c ) { $c['blueprint.json_schema_path'] ); }; - $container['blueprint.mapper'] = function ( $c ) { + $container['blueprint.mapper'] = function ( $c ) { return new BlueprintMapper(); }; - $container['blueprint.parser'] = function ( $c ) { + $container['blueprint.parser'] = function ( $c ) { return new BlueprintParser( $c['blueprint.validator'], $c['blueprint.mapper'] @@ -171,71 +159,71 @@ function () use ( $c ) { $container[ 'step.runner.' . InstallSqliteIntegrationStep::DISCRIMINATOR ] = function () { return new InstallSqliteIntegrationStepRunner(); }; - $container[ 'step.runner.' . DownloadWordPressStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . DownloadWordPressStep::DISCRIMINATOR ] = function () { return new DownloadWordPressStepRunner(); }; - $container[ 'step.runner.' . UnzipStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . UnzipStep::DISCRIMINATOR ] = function () { return new UnzipStepRunner(); }; - $container[ 'step.runner.' . WriteFileStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . WriteFileStep::DISCRIMINATOR ] = function () { return new WriteFileStepRunner(); }; - $container[ 'step.runner.' . RunPHPStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . RunPHPStep::DISCRIMINATOR ] = function () { return new RunPHPStepRunner(); }; - $container[ 'step.runner.' . DefineWpConfigConstsStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . DefineWpConfigConstsStep::DISCRIMINATOR ] = function () { return new DefineWpConfigConstsStepRunner(); }; - $container[ 'step.runner.' . EnableMultisiteStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . EnableMultisiteStep::DISCRIMINATOR ] = function () { return new EnableMultisiteStepRunner(); }; - $container[ 'step.runner.' . DefineSiteUrlStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . DefineSiteUrlStep::DISCRIMINATOR ] = function () { return new DefineSiteUrlStepRunner(); }; - $container[ 'step.runner.' . MkdirStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . MkdirStep::DISCRIMINATOR ] = function () { return new MkdirStepRunner(); }; - $container[ 'step.runner.' . RmStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . RmStep::DISCRIMINATOR ] = function () { return new RmStepRunner(); }; - $container[ 'step.runner.' . MvStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . MvStep::DISCRIMINATOR ] = function () { return new MvStepRunner(); }; - $container[ 'step.runner.' . CpStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . CpStep::DISCRIMINATOR ] = function () { return new CpStepRunner(); }; - $container[ 'step.runner.' . WPCLIStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . WPCLIStep::DISCRIMINATOR ] = function () { return new WPCLIStepRunner(); }; - $container[ 'step.runner.' . SetSiteOptionsStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . SetSiteOptionsStep::DISCRIMINATOR ] = function () { return new SetSiteOptionsStepRunner(); }; - $container[ 'step.runner.' . ActivatePluginStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . ActivatePluginStep::DISCRIMINATOR ] = function () { return new ActivatePluginStepRunner(); }; - $container[ 'step.runner.' . ActivateThemeStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . ActivateThemeStep::DISCRIMINATOR ] = function () { return new ActivateThemeStepRunner(); }; - $container[ 'step.runner.' . InstallPluginStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . InstallPluginStep::DISCRIMINATOR ] = function () { return new InstallPluginStepRunner(); }; - $container[ 'step.runner.' . InstallThemeStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . InstallThemeStep::DISCRIMINATOR ] = function () { return new InstallThemeStepRunner(); }; - $container[ 'step.runner.' . ImportFileStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . ImportFileStep::DISCRIMINATOR ] = function () { return new ImportFileStepRunner(); }; - $container[ 'step.runner.' . RunWordPressInstallerStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . RunWordPressInstallerStep::DISCRIMINATOR ] = function () { return new RunWordPressInstallerStepRunner(); }; - $container[ 'step.runner.' . RunSQLStep::DISCRIMINATOR ] = function () { + $container[ 'step.runner.' . RunSQLStep::DISCRIMINATOR ] = function () { return new RunSQLStepRunner(); }; $container[ 'resource.resolver.' . FilesystemResource::DISCRIMINATOR ] = function () { return new FilesystemResourceResolver(); }; - $container[ 'resource.resolver.' . InlineResource::DISCRIMINATOR ] = function () { + $container[ 'resource.resolver.' . InlineResource::DISCRIMINATOR ] = function () { return new InlineResourceResolver(); }; @@ -264,18 +252,19 @@ function ( $c ) { $container['step.runner_factory'] = function ( $c ) { return function ( $slug ) use ( $c ) { - if ( ! isset( $c[ "step.runner.$slug" ] ) ) { + if ( ! isset( $c["step.runner.$slug"] ) ) { throw new InvalidArgumentException( "No runner registered for step {$slug}" ); } - return $c[ "step.runner.$slug" ]; + return $c["step.runner.$slug"]; }; }; - $container['data_source.url'] = function ( $c ) { + $container['data_source.url'] = function ( $c ) { return new UrlSource( $c['http_client'], $c['downloads_cache'] ); }; - $container['data_source.playground_fetch'] = function ( $c ) { + + $container['data_source.playground_fetch'] = function () { return new PlaygroundFetchSource(); }; diff --git a/src/WordPress/Blueprints/Model/BlueprintBuilder.php b/src/WordPress/Blueprints/Model/BlueprintBuilder.php index f6e8b92a..a9d09436 100644 --- a/src/WordPress/Blueprints/Model/BlueprintBuilder.php +++ b/src/WordPress/Blueprints/Model/BlueprintBuilder.php @@ -128,7 +128,7 @@ public function withFiles( $files ) { public function withFile( $path, $data ) { return $this->addStep( ( new WriteFileStep() ) - ->setPath( 'WordPress.txt' ) + ->setPath( $path ) ->setData( $data ) ); } @@ -136,9 +136,9 @@ public function withFile( $path, $data ) { public function downloadWordPress( $wpZip = null ) { $this->prependStep( ( new DownloadWordPressStep() ) - ->setWordPressZip( - $wpZip ?? 'https://wordpress.org/latest.zip' - ) + ->setWordPressZip( + $wpZip ?? 'https://wordpress.org/latest.zip' + ) ); return $this; @@ -155,9 +155,9 @@ public function useSqlite( ) { $this->addStep( ( new InstallSqliteIntegrationStep() ) - ->setSqlitePluginZip( - $sqlitePluginSource - ) + ->setSqlitePluginZip( + $sqlitePluginSource + ) ); return $this; @@ -166,8 +166,8 @@ public function useSqlite( public function downloadWpCli() { return $this->addStep( ( new WriteFileStep() ) - ->setPath( 'wp-cli.phar' ) - ->setData( ( new UrlResource() )->setUrl( 'https://playground.wordpress.net/wp-cli.phar' ) ) + ->setPath( 'wp-cli.phar' ) + ->setData( ( new UrlResource() )->setUrl( 'https://playground.wordpress.net/wp-cli.phar' ) ) ); } diff --git a/src/WordPress/Blueprints/Model/DataClass/InstallPluginStep.php b/src/WordPress/Blueprints/Model/DataClass/InstallPluginStep.php index d4ea14ca..7c9a1ecc 100644 --- a/src/WordPress/Blueprints/Model/DataClass/InstallPluginStep.php +++ b/src/WordPress/Blueprints/Model/DataClass/InstallPluginStep.php @@ -2,8 +2,7 @@ namespace WordPress\Blueprints\Model\DataClass; -class InstallPluginStep implements StepDefinitionInterface -{ +class InstallPluginStep implements StepDefinitionInterface { const DISCRIMINATOR = 'installPlugin'; /** @var Progress */ @@ -28,37 +27,37 @@ class InstallPluginStep implements StepDefinitionInterface public $activate = true; - public function setProgress(Progress $progress) - { + public function setProgress( Progress $progress ) { $this->progress = $progress; + return $this; } - public function setContinueOnError(bool $continueOnError) - { + public function setContinueOnError( bool $continueOnError ) { $this->continueOnError = $continueOnError; + return $this; } - public function setStep(string $step) - { + public function setStep( string $step ) { $this->step = $step; + return $this; } - public function setPluginZipFile($pluginZipFile) - { + public function setPluginZipFile( $pluginZipFile ) { $this->pluginZipFile = $pluginZipFile; + return $this; } - public function setActivate(bool $activate) - { + public function setActivate( bool $activate ) { $this->activate = $activate; + return $this; } } diff --git a/src/WordPress/Blueprints/Resources/ResourceManager.php b/src/WordPress/Blueprints/Resources/ResourceManager.php index 4bfffd87..75df95af 100644 --- a/src/WordPress/Blueprints/Resources/ResourceManager.php +++ b/src/WordPress/Blueprints/Resources/ResourceManager.php @@ -5,11 +5,12 @@ use Symfony\Component\Filesystem\Filesystem; use WordPress\Blueprints\Compile\CompiledResource; use WordPress\Blueprints\Resources\Resolver\ResourceResolverCollection; +use WordPress\Util\Map; class ResourceManager { protected Filesystem $fs; - protected ResourceMap $map; + protected Map $map; protected ResourceResolverCollection $resource_resolvers; public function __construct( @@ -17,7 +18,7 @@ public function __construct( ) { $this->resource_resolvers = $resource_resolvers; $this->fs = new Filesystem(); - $this->map = new ResourceMap(); + $this->map = new Map(); } public function enqueue( array $compiledResources ) { diff --git a/src/WordPress/Blueprints/Resources/ResourceMap.php b/src/WordPress/Blueprints/Resources/ResourceMap.php deleted file mode 100644 index 4a1482c2..00000000 --- a/src/WordPress/Blueprints/Resources/ResourceMap.php +++ /dev/null @@ -1,60 +0,0 @@ -fs = new Filesystem(); - } - - public function offsetExists($offset): bool - { - foreach ($this->pairs as $pair) { - if ($pair[0] === $offset) { - return true; - } - } - return false; - } - - public function offsetGet($offset) - { - foreach ($this->pairs as $pair) { - if ($pair[0] === $offset) { - return $pair[1]; - } - } - - // TODO Evaluate waring: 'ext-json' is missing in composer.json - throw new \Exception("Stream for resource " . json_encode($offset) . " not found"); - } - - public function offsetSet($offset, $value): void - { - foreach ($this->pairs as $k => $pair) { - if ($pair[0] === $offset) { - $this->pairs[ $k ] = [ $offset, $value ]; - - return; - } - } - $this->pairs[] = [ $offset, $value ]; - } - - public function offsetUnset($offset): void - { - foreach ($this->pairs as $i => $pair) { - if ($pair[0] === $offset) { - unset($this->pairs[ $i ]); - } - } - } -} diff --git a/src/WordPress/DataSource/PlaygroundFetchSource.php b/src/WordPress/DataSource/PlaygroundFetchSource.php index 569cf5d9..a67eb032 100644 --- a/src/WordPress/DataSource/PlaygroundFetchSource.php +++ b/src/WordPress/DataSource/PlaygroundFetchSource.php @@ -2,13 +2,8 @@ namespace WordPress\DataSource; -use Psr\SimpleCache\CacheInterface; -use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\HttpClient\Response\StreamWrapper; -use Symfony\Contracts\EventDispatcher\Event; use Symfony\Contracts\HttpClient\HttpClientInterface; -use WordPress\Streams\StreamPeeker; -use WordPress\Streams\StreamPeekerContext; class PlaygroundFetchSource extends BaseDataSource { diff --git a/src/WordPress/DataSource/UrlSource.php b/src/WordPress/DataSource/UrlSource.php index c12e2808..c397e008 100644 --- a/src/WordPress/DataSource/UrlSource.php +++ b/src/WordPress/DataSource/UrlSource.php @@ -3,38 +3,31 @@ namespace WordPress\DataSource; use Psr\SimpleCache\CacheInterface; -use Symfony\Component\EventDispatcher\EventDispatcher; -use Symfony\Component\HttpClient\Response\StreamWrapper; -use Symfony\Contracts\EventDispatcher\Event; -use Symfony\Contracts\HttpClient\HttpClientInterface; -use WordPress\Streams\StreamPeeker; -use WordPress\Streams\StreamPeekerContext; - -class ResponseStreamPeekerContext extends StreamPeekerContext { - - public function __construct( $fp, $onChunk, $onClose, protected $response ) { - parent::__construct( $fp, $onChunk, $onClose ); - } - - public function getResponse() { - return $this->response; - } - -} - +use WordPress\AsyncHttp\Client; +use WordPress\AsyncHttp\Request; +use WordPress\Streams\StreamPeekerData; +use WordPress\Streams\StreamPeekerWrapper; class UrlSource extends BaseDataSource { public function __construct( - protected HttpClientInterface $client, + protected Client $client, protected CacheInterface $cache ) { parent::__construct(); + $client->set_progress_callback( function ( Request $request, $downloaded, $total ) { + $this->events->dispatch( new DataSourceProgressEvent( + $request->url, + $downloaded, + $total + ) ); + } ); } public function stream( $resourceIdentifier ) { $url = $resourceIdentifier; - if ( $this->cache->has( $url ) ) { + // @TODO: Enable cache + if ( false && $this->cache->has( $url ) ) { // Return a stream resource. // @TODO: Stream directly from the cache $cached = $this->cache->get( $url ); @@ -51,20 +44,12 @@ public function stream( $resourceIdentifier ) { return $stream; } - $response = $this->client->request( 'GET', $url, [ - 'on_progress' => function ( int $dlNow, int $dlSize, array $info ) use ( $url ): void { - $this->events->dispatch( new DataSourceProgressEvent( - $url, - $dlNow, - $dlSize - ) ); - }, - ] ); - $stream = StreamWrapper::createResource( $response, $this->client ); - if ( ! $stream ) { - throw new \Exception( 'Failed to download file' ); - } - $onChunk = function ( $chunk ) use ( $url, $response, $stream ) { + $stream = $this->client->enqueue( new Request( $url ) ); + + return $stream; + + // Cache + $onChunk = function ( $chunk ) use ( $url, $stream ) { // Handle response caching static $bufferedChunks = []; $bufferedChunks[] = $chunk; @@ -73,90 +58,13 @@ public function stream( $resourceIdentifier ) { $bufferedChunks = []; } }; - $onClose = function () use ( $response ) { - $response->cancel(); - }; - return StreamPeeker::wrap( - new ResponseStreamPeekerContext( + return StreamPeekerWrapper::create_resource( + new StreamPeekerData( $stream, $onChunk, - $onClose, - $response ) ); } - - /** - * This much simpler version throws a "Idle timeout reached" exception - * when reading from the stream :( - * - * @param string $url - * - * @return mixed - */ - public function fetchBugWithIdleTimeout( string $url ) { - $passthru = function ( ChunkInterface $chunk, AsyncContext $context ) use ( $url ): \Generator { - static $bufferedChunks = []; - $bufferedChunks[] = $chunk->getContent(); - if ( $chunk->isLast() ) { - $this->cache->set( $url, implode( '', $bufferedChunks ) ); - } - // do what you want with chunks, e.g. split them - // in smaller chunks, group them, skip some, etc. - yield $chunk; - }; - - $response = new AsyncResponse( $this->client, 'GET', $url, [ - 'timeout' => 60000, - 'on_progress' => function ( int $dlNow, int $dlSize, array $info ) use ( $url ): void { - $this->events->dispatch( new DataSourceProgressEvent( - $url, - $dlNow, - $dlSize - ) ); - }, - ], $passthru ); - - return $response->toStream(); - } -} - -function get_header( array $headers, string $header ) { - foreach ( $headers as $line ) { - $parts = explode( ': ', $line ); - if ( count( $parts ) === 2 && strtolower( $parts[0] ) === strtolower( $header ) ) { - return $parts[1]; - } - } -} -/* -class HttpDownloader { - - public EventDispatcher $events; - - public function __construct( protected HttpClientInterface $client ) { - $this->events = new EventDispatcher(); - } - - public function fetch( string $url ) { - $response = $this->client->request( 'GET', $url, [ - 'on_progress' => function ( int $dlNow, int $dlSize, array $info ) use ( $url ): void { - $this->events->dispatch( new ProgressEvent( - $url, - $dlNow, - $dlSize - ) ); - }, - ] ); - $stream = StreamWrapper::createResource( $response, $this->client ); - if ( ! $stream ) { - throw new \Exception( 'Failed to download file' ); - } - - return $stream; - } - } - */ diff --git a/src/WordPress/Streams/StreamPeeker.php b/src/WordPress/Streams/StreamPeeker.php deleted file mode 100644 index e3efacaa..00000000 --- a/src/WordPress/Streams/StreamPeeker.php +++ /dev/null @@ -1,132 +0,0 @@ - [ - 'wrapper_data' => $data, - ], - ] ); - - return fopen( 'peek://', 'r', false, $context ); - } - - public function stream_set_option( int $option, int $arg1, ?int $arg2 ): bool { - if ( \STREAM_OPTION_BLOCKING === $option ) { - return stream_set_blocking( $this->stream, (bool) $arg1 ); - } elseif ( \STREAM_OPTION_READ_TIMEOUT === $option ) { - return stream_set_timeout( $this->stream, $arg1, $arg2 ); - } - - return false; - } - - // Opens the stream - public function stream_open( $path, $mode, $options, &$opened_path ) { - $contextOptions = stream_context_get_options( $this->context ); - - if ( isset( $contextOptions['peek']['wrapper_data']->fp ) ) { - $this->stream = $contextOptions['peek']['wrapper_data']->fp; - } else { - return false; - } - - if ( isset( $contextOptions['peek']['wrapper_data']->onChunk ) && is_callable( $contextOptions['peek']['wrapper_data']->onChunk ) ) { - $this->onChunk = $contextOptions['peek']['wrapper_data']->onChunk; - } else { - // Default onChunk function if none provided - $this->onChunk = function ( $data ) { - }; - } - - if ( isset( $contextOptions['peek']['wrapper_data']->onClose ) && is_callable( $contextOptions['peek']['wrapper_data']->onClose ) ) { - $this->onClose = $contextOptions['peek']['wrapper_data']->onClose; - } else { - // Default onClose function if none provided - $this->onClose = function () { - }; - } - - $this->position = 0; - - return true; - } - - public function stream_cast( int $cast_as ) { - return $this->stream; - } - - // Reads from the stream - public function stream_read( $count ) { - $ret = fread( $this->stream, $count ); - $this->position += strlen( $ret ); - - $onChunk = $this->onChunk; - $onChunk( $ret ); - - return $ret; - } - - // Writes to the stream - public function stream_write( $data ) { - $written = fwrite( $this->stream, $data ); - $this->position += $written; - - return $written; - } - - // Closes the stream - public function stream_close() { - fclose( $this->stream ); - $onClose = $this->onClose; - $onClose(); - } - - // Returns the current position of the stream - public function stream_tell() { - return $this->position; - } - - // Checks if the end of the stream has been reached - public function stream_eof() { - return feof( $this->stream ); - } - - // Seeks to a specific position in the stream - public function stream_seek( $offset, $whence ) { - $result = fseek( $this->stream, $offset, $whence ); - if ( $result == 0 ) { - $this->position = ftell( $this->stream ); - - return true; - } else { - return false; - } - } - - // Stat information about the stream; providing dummy data - public function stream_stat() { - return []; - } -} - -StreamPeeker::register(); diff --git a/src/WordPress/Streams/StreamPeekerContext.php b/src/WordPress/Streams/StreamPeekerData.php similarity index 60% rename from src/WordPress/Streams/StreamPeekerContext.php rename to src/WordPress/Streams/StreamPeekerData.php index d3c50ea6..a8e46234 100644 --- a/src/WordPress/Streams/StreamPeekerContext.php +++ b/src/WordPress/Streams/StreamPeekerData.php @@ -2,8 +2,9 @@ namespace WordPress\Streams; -class StreamPeekerContext { +class StreamPeekerData extends VanillaStreamWrapperData { public function __construct( public $fp, public $onChunk = null, public $onClose = null ) { + parent::__construct( $fp ); } } diff --git a/src/WordPress/Streams/StreamPeekerWrapper.php b/src/WordPress/Streams/StreamPeekerWrapper.php new file mode 100644 index 00000000..6bfb6f4b --- /dev/null +++ b/src/WordPress/Streams/StreamPeekerWrapper.php @@ -0,0 +1,74 @@ +wrapper_data->fp ) ) { + $this->stream = $this->wrapper_data->fp; + } else { + return false; + } + + if ( isset( $this->wrapper_data->onChunk ) && is_callable( $this->wrapper_data->onChunk ) ) { + $this->onChunk = $this->wrapper_data->onChunk; + } else { + // Default onChunk function if none provided + $this->onChunk = function ( $data ) { + }; + } + + if ( isset( $this->wrapper_data->onClose ) && is_callable( $this->wrapper_data->onClose ) ) { + $this->onClose = $this->wrapper_data->onClose; + } else { + // Default onClose function if none provided + $this->onClose = function () { + }; + } + + $this->position = 0; + + return true; + } + + // Reads from the stream + public function stream_read( $count ) { + $ret = fread( $this->stream, $count ); + $this->position += strlen( $ret ); + + $onChunk = $this->onChunk; + $onChunk( $ret ); + + return $ret; + } + + // Writes to the stream + public function stream_write( $data ) { + $written = fwrite( $this->stream, $data ); + $this->position += $written; + + return $written; + } + + // Closes the stream + public function stream_close() { + fclose( $this->stream ); + $onClose = $this->onClose; + $onClose(); + } + + // Returns the current position of the stream + public function stream_tell() { + return $this->position; + } + +} diff --git a/src/WordPress/Streams/StreamWrapperInterface.php b/src/WordPress/Streams/StreamWrapperInterface.php new file mode 100644 index 00000000..322a3a4b --- /dev/null +++ b/src/WordPress/Streams/StreamWrapperInterface.php @@ -0,0 +1,59 @@ + [ + 'wrapper_data' => $data, + ], + ] ); + + return fopen( static::SCHEME . '://', 'r', false, $context ); + } + + static public function register() { + if ( in_array( static::SCHEME, stream_get_wrappers() ) ) { + return; + } + + if ( ! stream_wrapper_register( static::SCHEME, static::class ) ) { + throw new \Exception( 'Failed to register protocol' ); + } + } + + static public function unregister() { + stream_wrapper_unregister( 'async' ); + } + + + public function stream_set_option( int $option, int $arg1, ?int $arg2 ): bool { + if ( \STREAM_OPTION_BLOCKING === $option ) { + return stream_set_blocking( $this->stream, (bool) $arg1 ); + } elseif ( \STREAM_OPTION_READ_TIMEOUT === $option ) { + return stream_set_timeout( $this->stream, $arg1, $arg2 ); + } + + return false; + } + + public function stream_open( $path, $mode, $options, &$opened_path ) { + $contextOptions = stream_context_get_options( $this->context ); + + if ( ! isset( $contextOptions[ static::SCHEME ]['wrapper_data'] ) || ! is_object( $contextOptions[ static::SCHEME ]['wrapper_data'] ) ) { + return false; + } + + $this->wrapper_data = $contextOptions[ static::SCHEME ]['wrapper_data']; + + if ( $this->wrapper_data->fp ) { + $this->stream = $this->wrapper_data->fp; + } + + return true; + } + + public function stream_cast( int $cast_as ) { + return $this->stream; + } + + public function stream_read( $count ) { + return fread( $this->stream, $count ); + } + + public function stream_write( $data ) { + return fwrite( $this->stream, $data ); + } + + public function stream_close() { + return fclose( $this->stream ); + } + + public function stream_tell() { + return ftell( $this->stream ); + } + + public function stream_eof() { + return feof( $this->stream ); + } + + public function stream_seek( $offset, $whence ) { + return fseek( $this->stream, $offset, $whence ); + } + + public function stream_stat() { + return []; + } +} diff --git a/src/WordPress/Streams/VanillaStreamWrapperData.php b/src/WordPress/Streams/VanillaStreamWrapperData.php new file mode 100644 index 00000000..31cdf459 --- /dev/null +++ b/src/WordPress/Streams/VanillaStreamWrapperData.php @@ -0,0 +1,11 @@ +fp = $fp; + } +} diff --git a/src/WordPress/Util/ArrayPairIterator.php b/src/WordPress/Util/ArrayPairIterator.php new file mode 100644 index 00000000..acef8157 --- /dev/null +++ b/src/WordPress/Util/ArrayPairIterator.php @@ -0,0 +1,37 @@ +array = $array; + } + + #[\ReturnTypeWillChange] + public function current() { + return $this->array[ $this->position ][1]; + } + + #[\ReturnTypeWillChange] + public function key() { + return $this->array[ $this->position ][0]; + } + + #[\ReturnTypeWillChange] + public function next() { + ++ $this->position; + } + + #[\ReturnTypeWillChange] + public function rewind() { + $this->position = 0; + } + + #[\ReturnTypeWillChange] + public function valid() { + return isset( $this->array[ $this->position ] ); + } +} diff --git a/src/WordPress/Util/Map.php b/src/WordPress/Util/Map.php new file mode 100644 index 00000000..69c35216 --- /dev/null +++ b/src/WordPress/Util/Map.php @@ -0,0 +1,58 @@ +pairs as $pair ) { + if ( $pair[0] === $offset ) { + return true; + } + } + + return false; + } + + public function offsetGet( $offset ): mixed { + foreach ( $this->pairs as $pair ) { + if ( $pair[0] === $offset ) { + return $pair[1]; + } + } + + // TODO Evaluate waring: 'ext-json' is missing in composer.json + throw new \Exception( "Stream for resource " . json_encode( $offset ) . " not found" ); + } + + public function offsetSet( $offset, $value ): void { + foreach ( $this->pairs as $k => $pair ) { + if ( $pair[0] === $offset ) { + $this->pairs[ $k ] = [ $offset, $value ]; + + return; + } + } + $this->pairs[] = [ $offset, $value ]; + } + + public function offsetUnset( $offset ): void { + foreach ( $this->pairs as $i => $pair ) { + if ( $pair[0] === $offset ) { + unset( $this->pairs[ $i ] ); + } + } + } + + public function getIterator(): Traversable { + return new ArrayPairIterator( array_values( $this->pairs ) ); + } +} diff --git a/src/WordPress/Zip/ZipStreamReader.php b/src/WordPress/Zip/ZipStreamReader.php index 6d32671c..053fde5f 100644 --- a/src/WordPress/Zip/ZipStreamReader.php +++ b/src/WordPress/Zip/ZipStreamReader.php @@ -198,21 +198,14 @@ static protected function read_bytes( $stream, $length ) { } $data = ''; - while ( true ) { - $chunk = fread( $stream, $length ); - if ( false === $chunk ) { - return false; + $remaining_length = $length; + while ( $remaining_length > 0 ) { + $chunk = fread( $stream, $remaining_length ); + if ( false === $chunk || ( '' === $chunk && feof( $stream ) ) ) { + return strlen( $data ) ? $data : false; } - $length -= strlen( $chunk ); + $remaining_length -= strlen( $chunk ); $data .= $chunk; - - if ( $length === 0 ) { - break; - } - - if ( feof( $stream ) ) { - return false; - } } return $data; diff --git a/src/WordPress/Zip/functions.php b/src/WordPress/Zip/functions.php index 4f02a0f2..eaa84fcc 100644 --- a/src/WordPress/Zip/functions.php +++ b/src/WordPress/Zip/functions.php @@ -14,7 +14,7 @@ function zip_extract_to( $fp, $to_path ) { continue; } - $path = Path::canonicalize( $to_path . '/' . $entry->path ); + $path = Path::canonicalize( $to_path . '/' . $entry->path ); $parent = Path::getDirectory( $path ); if ( ! is_dir( $parent ) ) { mkdir( $parent, 0777, true ); diff --git a/tests/Blueprints/Runner/Step/UnzipStepRunnerTest.php b/tests/Blueprints/Runner/Step/UnzipStepRunnerTest.php index 835717c0..581b2b69 100644 --- a/tests/Blueprints/Runner/Step/UnzipStepRunnerTest.php +++ b/tests/Blueprints/Runner/Step/UnzipStepRunnerTest.php @@ -44,7 +44,7 @@ class UnzipStepRunnerTest extends TestCase { */ public function before() { $this->document_root = Path::makeAbsolute( 'test', sys_get_temp_dir() ); - $this->runtime = new Runtime( $this->document_root ); + $this->runtime = new Runtime( $this->document_root ); $this->resource_manager = $this->createStub( ResourceManager::class ); diff --git a/tests/Unit/StreamPeeker.php b/tests/Unit/StreamPeeker.php index 3fd8bdc8..f014757a 100644 --- a/tests/Unit/StreamPeeker.php +++ b/tests/Unit/StreamPeeker.php @@ -1,7 +1,7 @@