diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php index 086903090..636168f21 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetLoader.php @@ -2,7 +2,6 @@ namespace Flow\ETL\Adapter\Parquet; -use Flow\ETL\Exception\RuntimeException; use Flow\ETL\Filesystem\Path; use Flow\ETL\Filesystem\Stream\Mode; use Flow\ETL\FlowContext; @@ -11,6 +10,7 @@ use Flow\ETL\Row\Schema; use Flow\ETL\Rows; use Flow\Parquet\Options; +use Flow\Parquet\ParquetFile\Compressions; use Flow\Parquet\Writer; /** @@ -24,11 +24,15 @@ final class ParquetLoader implements Closure, Loader, Loader\FileLoader private ?Schema $inferredSchema = null; - private ?Writer $writer = null; + /** + * @var array + */ + private array $writers = []; public function __construct( private readonly Path $path, private readonly Options $options, + private readonly Compressions $compressions = Compressions::SNAPPY, private readonly ?Schema $schema = null, ) { $this->converter = new SchemaConverter(); @@ -52,9 +56,14 @@ public function __unserialize(array $data) : void public function closure(Rows $rows, FlowContext $context) : void { - $this->writer($context)->close(); + if (\count($this->writers)) { + foreach ($this->writers as $writer) { + $writer->close(); + } + } + $context->streams()->close($this->path); - $this->writer = null; + $this->writers = []; } public function destination() : Path @@ -64,16 +73,45 @@ public function destination() : Path public function load(Rows $rows, FlowContext $context) : void { - if (\count($context->partitionEntries())) { - throw new RuntimeException('Partitioning is not supported yet'); - } - if ($this->schema === null) { $this->inferSchema($rows); } - foreach ($rows as $row) { - $this->writer($context)->writeRow($row->toArray()); + $streams = $context->streams(); + + if ($context->partitionEntries()->count()) { + foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitions) { + + $stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe(), $partitions->partitions); + + if (!\array_key_exists($stream->path()->uri(), $this->writers)) { + $this->writers[$stream->path()->uri()] = new Writer( + compression: $this->compressions, + options: $this->options + ); + + $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); + } + + foreach ($partitions->rows as $row) { + $this->writers[$stream->path()->uri()]->writeRow($row->toArray()); + } + } + } else { + $stream = $streams->open($this->path, 'parquet', Mode::WRITE_BINARY, $context->threadSafe()); + + if (!\array_key_exists($stream->path()->uri(), $this->writers)) { + $this->writers[$stream->path()->uri()] = new Writer( + compression: $this->compressions, + options: $this->options + ); + + $this->writers[$stream->path()->uri()]->openForStream($stream->resource(), $this->converter->toParquet($this->schema())); + } + + foreach ($rows as $row) { + $this->writers[$stream->path()->uri()]->writeRow($row->toArray()); + } } } @@ -95,26 +133,4 @@ private function schema() : Schema /** @phpstan-ignore-next-line */ return $this->schema ?? $this->inferredSchema; } - - private function writer(FlowContext $context) : Writer - { - if ($this->writer !== null) { - return $this->writer; - } - - $this->writer = new Writer( - options: $this->options - ); - $this->writer->openForStream( - $context->streams()->open( - $this->path, - 'parquet', - Mode::WRITE_BINARY, - $context->threadSafe() - )->resource(), - $this->converter->toParquet($this->schema()) - ); - - return $this->writer; - } } diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php index 0939fa6a4..029495020 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/DSL/Parquet.php @@ -12,6 +12,7 @@ use Flow\ETL\Row\Schema; use Flow\Parquet\ByteOrder; use Flow\Parquet\Options; +use Flow\Parquet\ParquetFile\Compressions; /** * @infection-ignore-all @@ -65,6 +66,7 @@ final public static function from( final public static function to( string|Path $path, ?Options $options = null, + Compressions $compressions = Compressions::SNAPPY, ?Schema $schema = null, ) : Loader { if ($options === null) { @@ -74,6 +76,7 @@ final public static function to( return new ParquetLoader( \is_string($path) ? Path::realpath($path) : $path, $options, + $compressions, $schema, ); } diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php index d3119c42c..217129158 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php @@ -1,6 +1,4 @@ -expectExceptionMessage("ParquetLoader path can't be pattern, given: /path/*/pattern.parquet"); - - Parquet::to(new Path('/path/*/pattern.parquet')); - } + $path = \sys_get_temp_dir() . '/file.snappy.parquet'; + $this->removeFile($path); - public function test_writing_and_reading_only_given_fields() : void - { - $this->removeFile($path = \sys_get_temp_dir() . '/file.parquet'); - - (new Flow) - ->read(From::rows( - $rows = new Rows( - ...\array_map(function (int $i) : Row { - return Row::create( - Entry::integer('integer', $i), - Entry::float('float', 1.5), - Entry::string('string', 'name_' . $i), - Entry::boolean('boolean', true), - Entry::datetime('datetime', new \DateTimeImmutable()), - Entry::json_object('json_object', ['id' => 1, 'name' => 'test']), - Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]), - Entry::list_of_string('list_of_strings', ['a', 'b', 'c']), - Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]), - Entry::structure( - 'address', - Entry::string('street', 'street_' . $i), - Entry::string('city', 'city_' . $i), - Entry::string('zip', 'zip_' . $i), - Entry::string('country', 'country_' . $i), - Entry::structure( - 'location', - Entry::float('lat', 1.5), - Entry::float('lon', 1.5) - ) - ), - ); - }, \range(1, 100)) - ) - )) + (new Flow()) + ->read(From::rows($rows = $this->createRows(10))) ->write(Parquet::to($path)) ->run(); - $this->assertFileExists($path); - $this->assertEquals( - new Rows( - ...\array_map(function (int $i) : Row { - return Row::create(Entry::integer('integer', $i)); - }, \range(1, 100)) - ), + $rows, (new Flow()) - ->read(Parquet::from($path, ['integer'])) + ->read(Parquet::from($path)) ->fetch() ); - $this->removeFile($path); - } + $parquetFile = (new Reader())->read($path); + $this->assertNotEmpty($parquetFile->metadata()->columnChunks()); - public function test_writing_and_reading_parquet_with_all_supported_types() : void - { - $this->removeFile($path = __DIR__ . '/file.parquet'); - - (new Flow) - ->read(From::rows( - $rows = new Rows( - ...\array_map(function (int $i) : Row { - return Row::create( - Entry::integer('integer', $i), - Entry::float('float', 1.5), - Entry::string('string', 'name_' . $i), - Entry::boolean('boolean', true), - Entry::datetime('datetime', new \DateTimeImmutable()), - Entry::json_object('json_object', ['id' => 1, 'name' => 'test']), - Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]), - Entry::list_of_string('list_of_strings', ['a', 'b', 'c']), - Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]), - Entry::structure( - 'address', - Entry::string('street', 'street_' . $i), - Entry::string('city', 'city_' . $i), - Entry::string('zip', 'zip_' . $i), - Entry::string('country', 'country_' . $i), - Entry::structure( - 'location', - Entry::float('lat', 1.5), - Entry::float('lon', 1.5) - ) - ), - ); - }, \range(1, 100)) - ) - )) - ->write(Parquet::to($path)) - ->run(); + foreach ($parquetFile->metadata()->columnChunks() as $columnChunk) { + $this->assertSame(Compressions::SNAPPY, $columnChunk->codec()); + } $this->assertFileExists($path); - - $this->assertEquals( - $rows, - (new Flow()) - ->read( - Parquet::from($path) - ) - ->fetch() - ); - $this->removeFile($path); } - public function test_writing_safe_and_reading_parquet_with_all_supported_types() : void + public function test_writing_with_partitioning() : void { - $this->cleanDirectory($path = \sys_get_temp_dir() . '/directory.parquet'); - - (new Flow) - ->read(From::rows( - $rows = new Rows( - ...\array_map(function (int $i) : Row { - return Row::create( - Entry::integer('integer', $i), - Entry::float('float', 1.5), - Entry::string('string', 'name_' . $i), - Entry::boolean('boolean', true), - Entry::datetime('datetime', new \DateTimeImmutable()), - Entry::json_object('json_object', ['id' => 1, 'name' => 'test']), - Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]), - Entry::list_of_string('list_of_strings', ['a', 'b', 'c']), - Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]) - ); - }, \range(1, 100)) - ) - )) - ->threadSafe() - ->write(Parquet::to($path)) - ->run(); + $path = \sys_get_temp_dir() . '/partitioned'; + $this->cleanDirectory($path); - $this->assertFileExists($path); + $dataFrame = (new Flow()) + ->read(From::rows($rows = new Rows( + $this->createRow(1, new \DateTimeImmutable('2020-01-01 00:01:00')), + $this->createRow(1, new \DateTimeImmutable('2020-01-01 00:02:00')), + $this->createRow(1, new \DateTimeImmutable('2020-01-02 00:01:00')), + $this->createRow(1, new \DateTimeImmutable('2020-01-02 00:02:00')), + $this->createRow(1, new \DateTimeImmutable('2020-01-03 00:01:00')), + ))) + ->withEntry('date', ref('datetime')->toDate(\DateTimeInterface::RFC3339)->dateFormat()) + ->partitionBy(ref('date')) + ->write(Parquet::to($path)); - $paths = \array_map( - fn (string $fileName) : Path => new Path($path . '/' . $fileName), - \array_values(\array_diff(\scandir($path), ['..', '.'])) - ); + $dataFrame->run(); $this->assertEquals( $rows, (new Flow()) - ->read(Parquet::from( - $paths, - )) - ->sortBy(ref('integer')) + ->read(Parquet::from($path)) + ->drop('date') + ->sortBy(ref('datetime')->asc()) ->fetch() ); + $this->assertSame( + ['date=2020-01-01', 'date=2020-01-02', 'date=2020-01-03'], + $this->listDirectoryFiles($path) + ); + $this->assertDirectoryExists($path); $this->cleanDirectory($path); } @@ -193,13 +99,60 @@ private function cleanDirectory(string $path) : void } } + private function createRow(int $index, ?\DateTimeImmutable $dateTime = null) : Row + { + return Row::create( + Entry::integer('integer', $index), + Entry::float('float', 1.5), + Entry::string('string', 'name_' . $index), + Entry::boolean('boolean', true), + Entry::datetime('datetime', $dateTime ?: new \DateTimeImmutable()), + Entry::json_object('json_object', ['id' => 1, 'name' => 'test']), + Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]), + Entry::list_of_string('list_of_strings', ['a', 'b', 'c']), + Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]), + Entry::structure( + 'address', + Entry::string('street', 'street_' . $index), + Entry::string('city', 'city_' . $index), + Entry::string('zip', 'zip_' . $index), + Entry::string('country', 'country_' . $index), + Entry::structure( + 'location', + Entry::float('lat', 1.5), + Entry::float('lng', 1.5) + ) + ), + ); + } + + private function createRows(int $count) : Rows + { + $rows = []; + + for ($i = 0; $i < $count; $i++) { + $rows[] = $this->createRow($i); + } + + return new Rows(...$rows); + } + + private function listDirectoryFiles(string $path) : array + { + return \array_values(\array_diff(\scandir($path), ['.', '..'])); + } + /** * @param string $path */ private function removeFile(string $path) : void { if (\file_exists($path)) { - \unlink($path); + if (\is_dir($path)) { + $this->cleanDirectory($path); + } else { + \unlink($path); + } } } } diff --git a/src/core/etl/src/Flow/ETL/Filesystem/Stream/Mode.php b/src/core/etl/src/Flow/ETL/Filesystem/Stream/Mode.php index 5b590f5d0..055cb27a8 100644 --- a/src/core/etl/src/Flow/ETL/Filesystem/Stream/Mode.php +++ b/src/core/etl/src/Flow/ETL/Filesystem/Stream/Mode.php @@ -8,6 +8,8 @@ enum Mode : string { case APPEND = 'a'; + case APPEND_BINARY = 'ab'; + case READ = 'r'; case READ_BINARY = 'rb'; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Metadata.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Metadata.php index 5e95deb76..cdac0519b 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Metadata.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Metadata.php @@ -2,6 +2,7 @@ namespace Flow\Parquet\ParquetFile; +use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk; use Flow\Parquet\Thrift\FileMetaData; final class Metadata @@ -26,6 +27,22 @@ public static function fromThrift(FileMetaData $thrift) : self ); } + /** + * @return array + */ + public function columnChunks() : array + { + $chunks = []; + + foreach ($this->rowGroups->all() as $rowGroup) { + foreach ($rowGroup->columnChunks() as $columnChunk) { + $chunks[] = $columnChunk; + } + } + + return $chunks; + } + public function createdBy() : ?string { return $this->createdBy;