Skip to content

Commit

Permalink
Streaming readers: Bytes, Filesystem, Zip (#121)
Browse files Browse the repository at this point in the history
Adds streaming readers for:

* Byte sources – both local and remote (with `seek()` support via range
requests)
* Zip – rewrites the original one to pull data from `WP_Byte_Source` as
needed.
* Filesystems – local FS, ZIP FS, a generic interface to add support for
git and others

This work is a pre-requisite for
#121 and will likely
evolve.

 ## Testing instructions

None.

We'll need a good test coverage for all these primitives, but the work
is still highly exploratory and meant to inform a larger refactorization
of the step handlers.
  • Loading branch information
adamziel authored Dec 17, 2024
1 parent 53efc0e commit b1362cb
Show file tree
Hide file tree
Showing 16 changed files with 1,700 additions and 553 deletions.
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
},
"files": [
"src/WordPress/Blueprints/functions.php",
"src/WordPress/Zip/functions.php",
"src/WordPress/Streams/stream_str_replace.php"
]
},
Expand Down
3 changes: 1 addition & 2 deletions src/WordPress/Blueprints/Runner/Step/UnzipStepRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use WordPress\Blueprints\Model\DataClass\UnzipStep;
use WordPress\Blueprints\Progress\Tracker;
use function WordPress\Zip\zip_extract_to;

class UnzipStepRunner extends BaseStepRunner {

Expand All @@ -22,6 +21,6 @@ public function run(
$progress_tracker->set( 10, 'Unzipping...' );

$resolved_to_path = $this->getRuntime()->resolvePath( $input->extractToPath );
zip_extract_to( $this->getResource( $input->zipFile ), $resolved_to_path );
throw new \Exception("Not implemented at the moment. Needs to be updated to use the new ZipStreamReader API.");
}
}
26 changes: 26 additions & 0 deletions src/WordPress/ByteReader/WP_Byte_Reader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace WordPress\ByteReader;

/**
* Experimental interface for streaming, seekable byte readers.
*/
abstract class WP_Byte_Reader {
abstract public function length();
abstract public function tell(): int;
abstract public function seek( int $offset ): bool;
abstract public function is_finished(): bool;
abstract public function next_bytes(): bool;
abstract public function get_bytes(): ?string;
abstract public function get_last_error(): ?string;
public function read_all(): string {
$buffer = '';
while( $this->next_bytes() ) {
$buffer .= $this->get_bytes();
}
if( $this->get_last_error() ) {
return false;
}
return $buffer;
}
}
109 changes: 109 additions & 0 deletions src/WordPress/ByteReader/WP_File_Reader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?php

namespace WordPress\ByteReader;

class WP_File_Reader extends WP_Byte_Reader {

const STATE_STREAMING = '#streaming';
const STATE_FINISHED = '#finished';

protected $file_path;
protected $chunk_size;
protected $file_pointer;
protected $offset_in_file;
protected $output_bytes = '';
protected $last_chunk_size = 0;
protected $last_error;
protected $state = self::STATE_STREAMING;

static public function create( $file_path, $chunk_size = 8096 ) {
if(!file_exists($file_path)) {
_doing_it_wrong( __METHOD__, sprintf( 'File %s does not exist', $file_path ), '1.0.0' );
return false;
}
if(!is_file($file_path)) {
_doing_it_wrong( __METHOD__, sprintf( '%s is not a file', $file_path ), '1.0.0' );
return false;
}
return new self( $file_path, $chunk_size );
}

private function __construct( $file_path, $chunk_size ) {
$this->file_path = $file_path;
$this->chunk_size = $chunk_size;
}

public function length(): ?int {
return filesize( $this->file_path );
}

public function tell(): int {
// Save the previous offset, not the current one.
// This way, after resuming, the next read will yield the same $output_bytes
// as we have now.
return $this->offset_in_file - $this->last_chunk_size;
}

public function seek( $offset_in_file ): bool {
if ( ! is_int( $offset_in_file ) ) {
_doing_it_wrong( __METHOD__, 'Cannot set a file reader cursor to a non-integer offset.', '1.0.0' );
return false;
}
$this->offset_in_file = $offset_in_file;
$this->last_chunk_size = 0;
$this->output_bytes = '';
if ( $this->file_pointer ) {
if ( false === fseek( $this->file_pointer, $this->offset_in_file ) ) {
return false;
}
}
return true;
}

public function close() {
if(!$this->file_pointer) {
return false;
}
if(!fclose($this->file_pointer)) {
$this->last_error = 'Failed to close file pointer';
return false;
}
$this->file_pointer = null;
$this->state = static::STATE_FINISHED;
return true;
}

public function is_finished(): bool {
return ! $this->output_bytes && $this->state === static::STATE_FINISHED;
}

public function get_bytes(): string {
return $this->output_bytes;
}

public function get_last_error(): ?string {
return $this->last_error;
}

public function next_bytes(): bool {
$this->output_bytes = '';
$this->last_chunk_size = 0;
if ( $this->last_error || $this->is_finished() ) {
return false;
}
if ( ! $this->file_pointer ) {
$this->file_pointer = fopen( $this->file_path, 'r' );
if ( $this->offset_in_file ) {
fseek( $this->file_pointer, $this->offset_in_file );
}
}
$bytes = fread( $this->file_pointer, $this->chunk_size );
if ( ! $bytes && feof( $this->file_pointer ) ) {
return false;
}
$this->last_chunk_size = strlen( $bytes );
$this->offset_in_file += $this->last_chunk_size;
$this->output_bytes .= $bytes;
return true;
}
}
28 changes: 28 additions & 0 deletions src/WordPress/ByteReader/WP_GZ_File_Reader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

namespace WordPress\ByteReader;

class WP_GZ_File_Reader extends WP_File_Reader {

public function next_bytes(): bool {
$this->output_bytes = '';
if ( $this->last_error || $this->is_finished() ) {
return false;
}
if ( ! $this->file_pointer ) {
$this->file_pointer = gzopen( $this->file_path, 'r' );
if ( $this->offset_in_file ) {
gzseek( $this->file_pointer, $this->offset_in_file );
}
}
$bytes = gzread( $this->file_pointer, $this->chunk_size );
if ( ! $bytes && gzeof( $this->file_pointer ) ) {
gzclose( $this->file_pointer );
$this->state->finish();
return false;
}
$this->offset_in_file += strlen( $bytes );
$this->output_bytes .= $bytes;
return true;
}
}
205 changes: 205 additions & 0 deletions src/WordPress/ByteReader/WP_Remote_File_Ranged_Reader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
<?php

namespace WordPress\ByteReader;

/**
* Streams bytes from a remote file. Supports seeking to a specific offset and
* requesting sub-ranges of the file.
*
* Usage:
*
* $file = new WP_Remote_File_Ranged_Reader('https://example.com/file.txt');
* $file->seek(0);
* $file->request_bytes(100);
* while($file->next_chunk()) {
* var_dump($file->get_bytes());
* }
* $file->seek(600);
* $file->request_bytes(40);
* while($file->next_chunk()) {
* var_dump($file->get_bytes());
* }
*
* @TODO: Abort in-progress requests when seeking to a new offset.
*/
class WP_Remote_File_Ranged_Reader extends WP_Byte_Reader {

private $url;
private $remote_file_length;

private $current_reader;
private $offset_in_remote_file = 0;
private $default_expected_chunk_size = 10 * 1024; // 10 KB
private $expected_chunk_size = 10 * 1024; // 10 KB
private $stop_after_chunk = false;

/**
* Creates a seekable reader for the remote file.
* Detects support for range requests and falls back to saving the entire
* file to disk when the remote server does not support range requests.
*/
static public function create( $url ) {
$remote_file_reader = new WP_Remote_File_Ranged_Reader( $url );
/**
* We don't **need** the content-length header to be present.
*
* However, this reader is only used to read remote ZIP files,
* we do need to know the length of the file to be able to read
* the central directory index.
*
* Let's revisit this check once we need to read other types of
* files.
*/
if(false === $remote_file_reader->length()) {
return self::save_to_disk( $url );
}

/**
* Try to read the first two bytes of the file to confirm that
* the remote server supports range requests.
*/
$remote_file_reader->seek_to_chunk(0, 2);
if(false === $remote_file_reader->next_bytes()) {
return self::save_to_disk( $url );
}

$bytes = $remote_file_reader->get_bytes();
if(strlen($bytes) !== 2) {
// Oops! We're streaming the entire file to disk now. Let's
// redirect the output to a local file and provide the caller
// with a regular file reader.
return self::redirect_output_to_disk( $remote_file_reader );
}

// The remote server supports range requests, good! We can use this reader.
// Let's return to the beginning of the file before returning.
$remote_file_reader->seek(0);
return $remote_file_reader;
}

static private function save_to_disk( $url ) {
$remote_file_reader = new WP_Remote_File_Reader( $url );
return self::redirect_output_to_disk( $remote_file_reader );
}

static private function redirect_output_to_disk( WP_Byte_Reader $reader ) {
$file_path = tempnam(sys_get_temp_dir(), 'wp-remote-file-reader-') . '.epub';
$file = fopen($file_path, 'w');
// We may have a bytes chunk available at this point.
if($reader->get_bytes()) {
fwrite($file, $reader->get_bytes());
}
// Keep streaming the file until we're done.
while($reader->next_bytes()) {
fwrite($file, $reader->get_bytes());
}
fclose($file);
if($reader->get_last_error()) {
// How should we log this error?
return false;
}
return WP_File_Reader::create( $file_path );
}

public function __construct( $url ) {
$this->url = $url;
}

public function next_bytes(): bool {
while( true ) {
if ( null === $this->current_reader ) {
$this->create_reader();
}
// Advance the offset by the length of the current chunk.
if ( $this->current_reader->get_bytes() ) {
$this->offset_in_remote_file += strlen( $this->current_reader->get_bytes() );
}

// We've reached the end of the remote file, we're done.
if ( $this->offset_in_remote_file >= $this->length() - 1 ) {
return false;
}

// We've reached the end of the current chunk, request the next one.
if ( false === $this->current_reader->next_bytes() ) {
if ( $this->stop_after_chunk ) {
return false;
}
$this->current_reader = null;
continue;
}

// We've got a chunk, return it.
return true;
}
}

public function length() {
$this->ensure_content_length();
if ( null === $this->remote_file_length ) {
return false;
}
return $this->remote_file_length;
}

private function create_reader() {
$this->current_reader = new WP_Remote_File_Reader(
$this->url,
array(
'headers' => array(
// @TODO: Detect when the remote server doesn't support range requests,
// do something sensible. We could either stream the entire file,
// or give up.
'Range' => 'bytes=' . $this->offset_in_remote_file . '-' . (
$this->offset_in_remote_file + $this->expected_chunk_size - 1
),
),
)
);
}

public function seek_to_chunk($offset, $length) {
$this->current_reader->seek($offset);
$this->expected_chunk_size = $length;
$this->stop_after_chunk = true;
}

public function seek( $offset ): bool {
$this->offset_in_remote_file = $offset;
// @TODO cancel any pending requests
$this->current_reader = null;
$this->expected_chunk_size = $this->default_expected_chunk_size;
$this->stop_after_chunk = false;
return true;
}

public function tell(): int {
return $this->offset_in_remote_file;
}

public function is_finished(): bool {
return false;
}

public function get_bytes(): ?string {
return $this->current_reader->get_bytes();
}

public function get_last_error(): ?string {
// @TODO: Preserve the error information when the current reader
// is reset.
return $this->current_reader->get_last_error();
}

private function ensure_content_length() {
if ( null !== $this->remote_file_length ) {
return $this->remote_file_length;
}
if(null === $this->current_reader) {
$this->current_reader = new WP_Remote_File_Reader( $this->url );
}
$this->remote_file_length = $this->current_reader->length();
return $this->remote_file_length;
}

}
Loading

0 comments on commit b1362cb

Please sign in to comment.