diff --git a/src/lib/parquet/resources/python/generators/primitives.py b/src/lib/parquet/resources/python/generators/primitives.py index e92de3e87..b046a809a 100644 --- a/src/lib/parquet/resources/python/generators/primitives.py +++ b/src/lib/parquet/resources/python/generators/primitives.py @@ -26,7 +26,7 @@ class Color(Enum): json_col = pd.Series([json.dumps({'key': random.randint(1, 10)}) for _ in range(n_rows)], dtype='string') date_col = pd.Series([datetime.now().date() + timedelta(days=i) for i in range(n_rows)], dtype='object') timestamp_col = pd.Series([pd.Timestamp(datetime.now() + timedelta(seconds=i * 10)) for i in range(n_rows)], dtype='datetime64[ns]') -time_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) for i in range(n_rows)], dtype='object') +time_col = pd.Series([time(hour=(i+1) % 24, minute=((i+1) * 2) % 60, second=((i+1) * 3) % 60) for i in range(n_rows)], dtype='object') uuid_col = pd.Series([str(uuid.uuid4()) for _ in range(n_rows)], dtype='string') enum_col = pd.Series([random.choice(list(Color)).name for _ in range(n_rows)], dtype='string') float_col = pd.Series([random.uniform(0, 100) for _ in range(n_rows)], dtype='float32') @@ -40,7 +40,7 @@ class Color(Enum): json_nullable_col = pd.Series([json.dumps({'key': random.randint(1, 10)}) if i % 2 == 0 else None for i in range(n_rows)], dtype='string') date_nullable_col = pd.Series([datetime.now().date() + timedelta(days=i) if i % 2 == 0 else None for i in range(n_rows)], dtype='object') timestamp_nullable_col = pd.Series([pd.Timestamp(datetime.now() + timedelta(seconds=i * 10)) if i % 2 == 0 else None for i in range(n_rows)], dtype='object') -time_nullable_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) if i % 2 == 0 else None for i in range(n_rows)], dtype='object') +time_nullable_col = pd.Series([time(hour=(i+1) % 24, minute=((i+1) * 2) % 60, second=((i+1) * 3) % 60) if i % 2 == 0 else None for i in range(n_rows)], dtype='object') uuid_nullable_col = pd.Series([str(uuid.uuid4()) if i % 2 == 0 else None for i in range(n_rows)], dtype='string') enum_nullable_col = pd.Series([random.choice(list(Color)).name if i % 2 == 0 else None for i in range(n_rows)], dtype='string') float_nullable_col = pd.Series([random.uniform(0, 100) if i % 2 == 0 else None for i in range(n_rows)], dtype='float32') diff --git a/src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php index 21a7261b6..ac7ca9cfb 100644 --- a/src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php +++ b/src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php @@ -18,7 +18,7 @@ public function fromParquetType(mixed $data) : \DateInterval public function isFor(FlatColumn $column, Options $options) : bool { - if ($column->type() === PhysicalType::INT32 && $column->logicalType()?->name() === LogicalType::TIME) { + if ($column->type() === PhysicalType::INT64 && $column->logicalType()?->name() === LogicalType::TIME) { return true; } @@ -35,8 +35,8 @@ public function toParquetType(mixed $data) : int */ private function toDateInterval(int $microseconds) : \DateInterval { - $seconds = (int) \floor($microseconds / 1000000); - $remainingMicroseconds = $microseconds % 1000000; + $seconds = (int) \floor($microseconds / 100000000); + $remainingMicroseconds = $microseconds % 100000000; $minutes = (int) \floor($seconds / 60); $remainingSeconds = $seconds % 60; @@ -64,7 +64,7 @@ private function toDateInterval(int $microseconds) : \DateInterval $interval->y = 0; $interval->m = 0; $interval->d = 0; - $interval->f = ($remainingMicroseconds / 1000000); + $interval->f = ($remainingMicroseconds / 100000000); return $interval; } @@ -81,13 +81,13 @@ private function toInt(\DateInterval $interval) : int $microseconds = 0; - $microseconds += $interval->y * 365 * 24 * 60 * 60 * 1000000; // years to microseconds - $microseconds += $interval->m * 30 * 24 * 60 * 60 * 1000000; // months to microseconds (approx) - $microseconds += $interval->d * 24 * 60 * 60 * 1000000; // days to microseconds - $microseconds += $interval->h * 60 * 60 * 1000000; // hours to microseconds - $microseconds += $interval->i * 60 * 1000000; // minutes to microseconds - $microseconds += $interval->s * 1000000; // seconds to microseconds - $microseconds += (int) (($interval->f) * 1000000); // microseconds + $microseconds += $interval->y * 365 * 24 * 60 * 60 * 100000000; // years to microseconds + $microseconds += $interval->m * 30 * 24 * 60 * 60 * 100000000; // months to microseconds (approx) + $microseconds += $interval->d * 24 * 60 * 60 * 100000000; // days to microseconds + $microseconds += $interval->h * 60 * 60 * 100000000; // hours to microseconds + $microseconds += $interval->i * 60 * 100000000; // minutes to microseconds + $microseconds += $interval->s * 100000000; // seconds to microseconds + $microseconds += (int) (($interval->f) * 100000000); // microseconds return $microseconds; } diff --git a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php index cee45e3a1..fbbbce2fe 100644 --- a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php +++ b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php @@ -8,6 +8,7 @@ use Flow\Parquet\Data\Converter\Int64DateTimeConverter; use Flow\Parquet\Data\Converter\Int96DateTimeConverter; use Flow\Parquet\Data\Converter\TimeConverter; +use Flow\Parquet\Exception\DataConversionException; use Flow\Parquet\Options; use Flow\Parquet\ParquetFile\Schema\FlatColumn; @@ -56,7 +57,15 @@ public function fromParquetType(FlatColumn $column, mixed $data) : mixed if ($converter->isFor($column, $this->options)) { $this->cache[$column->flatPath()] = $converter; - return $converter->fromParquetType($data); + try { + return $converter->fromParquetType($data); + } catch (\Throwable $e) { + throw new DataConversionException( + "Failed to convert data from parquet type for column '{$column->flatPath()}'. {$e->getMessage()}", + 0, + $e + ); + } } } diff --git a/src/lib/parquet/src/Flow/Parquet/Exception/DataConversionException.php b/src/lib/parquet/src/Flow/Parquet/Exception/DataConversionException.php new file mode 100644 index 000000000..86ce578cc --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/Exception/DataConversionException.php @@ -0,0 +1,7 @@ +schema()->columnsFlat() as $column) { foreach ($this->viewChunksPages($column) as $pageHeader) { - yield new ColumnPageHeader($column, $pageHeader); + yield $pageHeader; } } } @@ -350,7 +349,7 @@ private function readStruct(NestedColumn $structColumn, bool $isCollection = fal } /** - * @return \Generator + * @return \Generator */ private function viewChunksPages(FlatColumn $column) : \Generator { @@ -359,7 +358,7 @@ private function viewChunksPages(FlatColumn $column) : \Generator foreach ($this->getColumnChunks($column) as $columnChunks) { foreach ($columnChunks as $columnChunk) { foreach ($viewer->view($columnChunk, $column, $this->stream) as $pageHeader) { - yield $pageHeader; + yield new ColumnPageHeader($column, $columnChunk, $pageHeader); } } } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnPageHeader.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnPageHeader.php index 00e6bbdc9..fca41d2c9 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnPageHeader.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnPageHeader.php @@ -3,12 +3,14 @@ namespace Flow\Parquet\ParquetFile; use Flow\Parquet\ParquetFile\Page\PageHeader; +use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk; use Flow\Parquet\ParquetFile\Schema\FlatColumn; final class ColumnPageHeader { public function __construct( public readonly FlatColumn $column, + public readonly ColumnChunk $columnChunk, public readonly PageHeader $pageHeader, ) { } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DictionaryPageHeader.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DictionaryPageHeader.php index c8d6e6d92..d8dec273c 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DictionaryPageHeader.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DictionaryPageHeader.php @@ -29,6 +29,15 @@ public function encoding() : Encodings return $this->encoding; } + public function toThrift() : \Flow\Parquet\Thrift\DictionaryPageHeader + { + return new \Flow\Parquet\Thrift\DictionaryPageHeader([ + 'encoding' => $this->encoding->value, + 'num_values' => $this->valuesCount, + 'is_sorted' => false, + ]); + } + public function valuesCount() : int { return $this->valuesCount; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php index bd5136f0f..6e7b28baf 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php @@ -118,7 +118,7 @@ public function toThrift() : \Flow\Parquet\Thrift\PageHeader 'crc' => null, 'data_page_header' => $this->dataPageHeader?->toThrift(), 'data_page_header_v2' => null, - 'dictionary_page_header' => null, + 'dictionary_page_header' => $this->dictionaryPageHeader?->toThrift(), 'index_page_header' => null, ]); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php index 949bafd3c..c96edf755 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php @@ -124,11 +124,6 @@ public function pageOffset() : int return $offset; } - public function rootName() : string - { - return $this->path[0]; - } - public function totalCompressedSize() : int { return $this->totalCompressedSize; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php index 532f19fa8..e797b4adc 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php @@ -49,10 +49,9 @@ public function flush(int $fileOffset) : RowGroupContainer $chunkContainers = []; foreach ($this->chunkBuilders as $chunkBuilder) { - foreach ($chunkBuilder->flush($fileOffset) as $chunkContainer) { - $fileOffset += \strlen($chunkContainer->binaryBuffer); - $chunkContainers[] = $chunkContainer; - } + $chunkContainer = $chunkBuilder->flush($fileOffset); + $fileOffset += \strlen($chunkContainer->binaryBuffer); + $chunkContainers[] = $chunkContainer; } $buffer = ''; diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php index 6f438f9ce..9dbbc7884 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnChunkBuilder.php @@ -3,8 +3,10 @@ namespace Flow\Parquet\ParquetFile\RowGroupBuilder; use Flow\Parquet\Data\DataConverter; +use Flow\Parquet\Exception\RuntimeException; use Flow\Parquet\ParquetFile\Compressions; use Flow\Parquet\ParquetFile\Encodings; +use Flow\Parquet\ParquetFile\Page\Header\Type; use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk; use Flow\Parquet\ParquetFile\Schema\FlatColumn; @@ -21,45 +23,60 @@ public function addRow(mixed $data) : void $this->data[] = $data; } + public function flush(int $fileOffset) : ColumnChunkContainer + { + return $this->createColumnChunkContainer( + (new PagesBuilder($this->dataConverter))->build($this->column, $this->data), + $fileOffset + ); + } + /** - * @return array + * @param array $pageContainers */ - public function flush(int $fileOffset) : array + private function createColumnChunkContainer(array $pageContainers, int $offset) : ColumnChunkContainer { - $offset = $fileOffset; - $columnChunkContainers = []; + $buffer = ''; + $encodings = []; + $valuesCount = 0; + $size = 0; + $dictionaryPageSize = null; + $dictionaryPageOffset = null; + $pageOffset = $offset; - $pageContainer = (new DataPagesBuilder($this->data))->build($this->column, $this->dataConverter); + foreach ($pageContainers as $pageContainer) { + if ($pageContainer->pageHeader->type() === Type::DICTIONARY_PAGE) { + if ($dictionaryPageSize !== null) { + throw new RuntimeException('There can be only one dictionary page in column chunk'); + } - $columnChunkContainers[] = $this->createColumnChunkContainer($pageContainer, $offset); - $offset += $pageContainer->size(); + $dictionaryPageOffset = $pageOffset; + $dictionaryPageSize = $pageContainer->size(); + } - $this->data = []; + $buffer .= $pageContainer->pageHeaderBuffer . $pageContainer->pageBuffer; + $encodings[] = $pageContainer->pageHeader->encoding()->value; + $valuesCount += \count($pageContainer->values); + $size += $pageContainer->size(); + $pageOffset += $pageContainer->size(); + } - return $columnChunkContainers; - } + $encodings = \array_values(\array_unique($encodings)); + $encodings = \array_map(static fn (int $encoding) => Encodings::from($encoding), $encodings); - /** - * @psalm-suppress PossiblyNullArgument - */ - private function createColumnChunkContainer(PageContainer $pageContainer, int $offset) : ColumnChunkContainer - { return new ColumnChunkContainer( - $pageContainer->pageHeaderBuffer . $pageContainer->pageDataBuffer, + $buffer, new ColumnChunk( - $this->column->type(), - Compressions::UNCOMPRESSED, - /** @phpstan-ignore-next-line */ - $pageContainer->pageHeader->dataValuesCount(), - $offset, - $this->column->path(), - [ - Encodings::PLAIN, - ], - \strlen($pageContainer->pageDataBuffer) + \strlen($pageContainer->pageHeaderBuffer), - \strlen($pageContainer->pageDataBuffer) + \strlen($pageContainer->pageHeaderBuffer), - dictionaryPageOffset: null, - dataPageOffset: $offset, + type: $this->column->type(), + codec: Compressions::UNCOMPRESSED, + valuesCount: $valuesCount, + fileOffset: $offset, + path: $this->column->path(), + encodings: $encodings, + totalCompressedSize: $size, + totalUncompressedSize: $size, + dictionaryPageOffset: $dictionaryPageOffset, + dataPageOffset: ($dictionaryPageOffset) ? $offset + $dictionaryPageSize : $offset, indexPageOffset: null, ) ); diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php new file mode 100644 index 000000000..1107febbb --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder.php @@ -0,0 +1,11 @@ +shred($this->rows, $column->maxDefinitionsLevel()); + $shredded = (new Dremel())->shred($rows, $column->maxDefinitionsLevel()); $rleBitPackedHybrid = new RLEBitPackedHybrid(); @@ -46,17 +50,39 @@ public function build(FlatColumn $column, DataConverter $dataConverter) : PageCo $pageWriter->append($definitionsBuffer); } - $valuesBuffer = ''; - $valuesBuffer = $this->writeData($column, $valuesBuffer, $shredded, $dataConverter); - $pageWriter->append($valuesBuffer); + if ($this->dictionary === null) { + $valuesBuffer = ''; + $this->writeData($column, $valuesBuffer, $shredded, $dataConverter); + $pageWriter->append($valuesBuffer); + } else { + $indices = []; + + foreach ($shredded->values as $value) { + $index = \array_search($value, $this->dictionary, true); + + if (!\is_int($index)) { + throw new RuntimeException('Value "' . $value . '" not found in dictionary'); + } + + $indices[] = $index; + } + + $valuesBuffer = ''; + $indicesBitWidth = BitWidth::fromArray($indices); + $indicesWriter = new BinaryBufferWriter($valuesBuffer); + $indicesWriter->writeVarInts32([$indicesBitWidth]); + $rleBitPackedHybrid->encodeHybrid($indicesWriter, $indices); + + $pageWriter->append($valuesBuffer); + } $pageHeader = new PageHeader( Type::DATA_PAGE, \strlen($pageBuffer), \strlen($pageBuffer), dataPageHeader: new DataPageHeader( - Encodings::PLAIN, - $this->valuesCount($this->rows), + $this->dictionary ? Encodings::PLAIN_DICTIONARY : Encodings::PLAIN, + \count($shredded->values), ), dataPageHeaderV2: null, dictionaryPageHeader: null, @@ -66,29 +92,15 @@ public function build(FlatColumn $column, DataConverter $dataConverter) : PageCo return new PageContainer( $pageHeaderBuffer->getBuffer(), $pageBuffer, + $shredded->values, $pageHeader ); } - public function valuesCount(array $rows) : int - { - $valuesCount = 0; - - foreach ($rows as $row) { - if (\is_array($row)) { - $valuesCount += $this->valuesCount($row); - } elseif ($row !== null) { - $valuesCount++; - } - } - - return $valuesCount; - } - /** * @psalm-suppress PossiblyNullArgument */ - private function writeData(FlatColumn $column, string $valuesBuffer, DataShredded $shredded, DataConverter $dataConverter) : string + private function writeData(FlatColumn $column, string &$valuesBuffer, DataShredded $shredded, DataConverter $dataConverter) : void { $values = []; @@ -107,15 +119,16 @@ private function writeData(FlatColumn $column, string $valuesBuffer, DataShredde (new BinaryBufferWriter($valuesBuffer))->writeInts32($values); break; - case null; - (new BinaryBufferWriter($valuesBuffer))->writeInts32($values); + case null: + (new BinaryBufferWriter($valuesBuffer))->writeInts32($values); - break; + break; } break; case PhysicalType::INT64: switch ($column->logicalType()?->name()) { + case LogicalType::TIME: case LogicalType::TIMESTAMP: (new BinaryBufferWriter($valuesBuffer))->writeInts64($values); @@ -166,7 +179,5 @@ private function writeData(FlatColumn $column, string $valuesBuffer, DataShredde default: throw new \RuntimeException('Writing physical type "' . $column->type()->name . '" is not implemented yet'); } - - return $valuesBuffer; } } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php new file mode 100644 index 000000000..ad55edd10 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageBuilder/DictionaryPageBuilder.php @@ -0,0 +1,144 @@ +writeData($column, $dictionaryBuffer, $dictionary, $dataConverter); + $pageWriter->append($dictionaryBuffer); + + $pageHeader = new PageHeader( + Type::DICTIONARY_PAGE, + \strlen($pageBuffer), + \strlen($pageBuffer), + dataPageHeader: null, + dataPageHeaderV2: null, + dictionaryPageHeader: new DictionaryPageHeader( + Encodings::PLAIN, + \count($dictionary) + ), + ); + $pageHeader->toThrift()->write(new TCompactProtocol($pageHeaderBuffer = new TMemoryBuffer())); + + return new PageContainer( + $pageHeaderBuffer->getBuffer(), + $pageBuffer, + $dictionary, + $pageHeader + ); + } + + /** + * @psalm-suppress PossiblyNullArgument + */ + private function writeData(FlatColumn $column, string &$valuesBuffer, array $rawValues, DataConverter $dataConverter) : string + { + $values = []; + + foreach ($rawValues as $value) { + $values[] = $dataConverter->toParquetType($column, $value); + } + + switch ($column->type()) { + case PhysicalType::BOOLEAN: + (new BinaryBufferWriter($valuesBuffer))->writeBooleans($values); + + break; + case PhysicalType::INT32: + switch ($column->logicalType()?->name()) { + case LogicalType::DATE: + (new BinaryBufferWriter($valuesBuffer))->writeInts32($values); + + break; + case null; + (new BinaryBufferWriter($valuesBuffer))->writeInts32($values); + + break; + } + + break; + case PhysicalType::INT64: + switch ($column->logicalType()?->name()) { + case LogicalType::TIMESTAMP: + (new BinaryBufferWriter($valuesBuffer))->writeInts64($values); + + break; + case null: + (new BinaryBufferWriter($valuesBuffer))->writeInts64($values); + + break; + } + + break; + case PhysicalType::FLOAT: + (new BinaryBufferWriter($valuesBuffer))->writeFloats($values); + + break; + case PhysicalType::DOUBLE: + (new BinaryBufferWriter($valuesBuffer))->writeDoubles($values); + + break; + case PhysicalType::FIXED_LEN_BYTE_ARRAY: + switch($column->logicalType()?->name()) { + case LogicalType::DECIMAL: + /** @phpstan-ignore-next-line */ + (new BinaryBufferWriter($valuesBuffer))->writeDecimals($values, $column->typeLength(), $column->precision(), $column->scale()); + + break; + + default: + throw new \RuntimeException('Writing logical type "' . ($column->logicalType()?->name() ?: 'UNKNOWN') . '" is not implemented yet'); + } + + break; + case PhysicalType::BYTE_ARRAY: + switch ($column->logicalType()?->name()) { + case LogicalType::JSON: + case LogicalType::UUID: + case LogicalType::STRING: + (new BinaryBufferWriter($valuesBuffer))->writeStrings($values); + + break; + + default: + throw new \RuntimeException('Writing logical type "' . ($column->logicalType()?->name() ?: 'UNKNOWN') . '" is not implemented yet'); + } + + break; + + default: + throw new \RuntimeException('Writing physical type "' . $column->type()->name . '" is not implemented yet'); + } + + return $valuesBuffer; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php index 7e9e2cdb9..c2db97412 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PageContainer.php @@ -8,13 +8,14 @@ final class PageContainer { public function __construct( public readonly string $pageHeaderBuffer, - public readonly string $pageDataBuffer, + public readonly string $pageBuffer, + public readonly array $values, public readonly PageHeader $pageHeader ) { } public function size() : int { - return \strlen($this->pageHeaderBuffer) + \strlen($this->pageDataBuffer); + return \strlen($this->pageHeaderBuffer) + \strlen($this->pageBuffer); } } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php new file mode 100644 index 000000000..8131af6b8 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/PagesBuilder.php @@ -0,0 +1,33 @@ + + */ + public function build(FlatColumn $column, array $rows) : array + { + if ($column->logicalType()?->name() === LogicalType::STRING) { + $dictionaryPageContainer = (new DictionaryPageBuilder())->build($column, $this->dataConverter, $rows); + + return [ + $dictionaryPageContainer, + (new DataPageBuilder($dictionaryPageContainer->values))->build($column, $this->dataConverter, $rows), + ]; + } + + return [(new DataPageBuilder())->build($column, $this->dataConverter, $rows)]; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/Writer.php b/src/lib/parquet/src/Flow/Parquet/Writer.php index ba21cedcc..126f2d1d8 100644 --- a/src/lib/parquet/src/Flow/Parquet/Writer.php +++ b/src/lib/parquet/src/Flow/Parquet/Writer.php @@ -3,6 +3,7 @@ namespace Flow\Parquet; use Flow\Parquet\Data\DataConverter; +use Flow\Parquet\Exception\InvalidArgumentException; use Flow\Parquet\Exception\RuntimeException; use Flow\Parquet\ParquetFile\Metadata; use Flow\Parquet\ParquetFile\RowGroupBuilder; @@ -26,8 +27,9 @@ public function __construct(private Options $options = new Options()) */ public function write(string $path, Schema $schema, iterable $rows) : void { + // This will be later replaced with append if (\file_exists($path)) { - \unlink($path); + throw new InvalidArgumentException("File {$path} already exists"); } $stream = \fopen($path, 'wb'); diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Fixtures/primitives.parquet b/src/lib/parquet/tests/Flow/Parquet/Tests/Fixtures/primitives.parquet index 7ca0e5efa..1769d1f19 100644 Binary files a/src/lib/parquet/tests/Flow/Parquet/Tests/Fixtures/primitives.parquet and b/src/lib/parquet/tests/Flow/Parquet/Tests/Fixtures/primitives.parquet differ diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/ListsWritingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/ListsWritingTest.php new file mode 100644 index 000000000..fd0d9d5d2 --- /dev/null +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/ListsWritingTest.php @@ -0,0 +1,63 @@ + \array_map(static fn ($i) => $faker->numberBetween(0, Consts::PHP_INT32_MAX), \range(1, \random_int(2, 10))), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertSame( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_list_of_strings() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(NestedColumn::list('list_of_strings', ListElement::string())); + + $faker = Factory::create(); + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'list_of_strings' => \array_map(static fn ($i) => $faker->text(10), \range(1, \random_int(2, 10))), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertSame( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } +} diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesReadingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesReadingTest.php index 3b2df0569..5cc892499 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesReadingTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesReadingTest.php @@ -407,7 +407,7 @@ public function test_reading_time_column() : void $count = 0; foreach ($file->values(['time']) as $row) { - $this->assertIsInt($row['time']); + $this->assertInstanceOf(\DateInterval::class, $row['time']); $count++; } $this->assertSame(100, $count); @@ -426,7 +426,7 @@ public function test_reading_time_nullable_column() : void foreach ($file->values(['time_nullable']) as $rowIndex => $row) { if ($rowIndex % 2 === 0) { - $this->assertIsInt($row['time_nullable']); + $this->assertInstanceOf(\DateInterval::class, $row['time_nullable']); } else { $this->assertNull($row['time_nullable']); } diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php new file mode 100644 index 000000000..f2e741402 --- /dev/null +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SimpleTypesWritingTest.php @@ -0,0 +1,372 @@ + (bool) $i % 2 == 0, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertSame( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_bool_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_date_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::date('date')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'date' => \DateTimeImmutable::createFromMutable($faker->dateTimeThisYear)->setTime(0, 0, 0, 0), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_date_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_decimal_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::decimal('decimal')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'decimal' => \round($faker->randomFloat(5), 2), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_decimal_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_double_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::double('double')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'double' => $faker->randomFloat(), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_double_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_enum_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_float_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::float('float')); + + $inputData = \array_merge(...\array_map(static function (int $i) : array { + return [ + [ + 'float' => 10.25, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_float_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_int32_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::int32('int32')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_int32_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_int64() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::int64('int64')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'int64' => $faker->numberBetween(0, Consts::PHP_INT64_MAX), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_int64_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_json_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::json('json')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'json' => \json_encode(['street' => $faker->streetName, 'city' => $faker->city, 'country' => $faker->country, 'zip' => $faker->postcode], JSON_THROW_ON_ERROR), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_json_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_string_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::string('string')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'string' => $faker->text(50), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_string_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_time_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::time('time')); + + $inputData = \array_merge(...\array_map(static function (int $i) : array { + return [ + [ + 'time' => (new \DateTimeImmutable('2023-01-01 00:00:00 UTC'))->diff(new \DateTimeImmutable('2023-01-01 15:45:00 UTC')), + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_time_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_timestamp_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::dateTime('dateTime')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'dateTime' => $faker->dateTimeThisYear, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_timestamp_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } + + public function test_writing_uuid_column() : void + { + $path = \sys_get_temp_dir() . '/test-writer' . \uniqid('parquet-test-', true) . '.parquet'; + + $writer = new Writer(); + $schema = Schema::with(FlatColumn::uuid('uuid')); + + $faker = Factory::create(); + + $inputData = \array_merge(...\array_map(static function (int $i) use ($faker) : array { + return [ + [ + 'uuid' => $faker->uuid, + ], + ]; + }, \range(1, 100))); + + $writer->write($path, $schema, $inputData); + + $this->assertEquals( + $inputData, + \iterator_to_array((new Reader())->read($path)->values()) + ); + } + + public function test_writing_uuid_nullable_column() : void + { + $this->markTestSkipped('Not implemented yet'); + } +} diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php deleted file mode 100644 index 9be263134..000000000 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/WriterTest.php +++ /dev/null @@ -1,88 +0,0 @@ - $faker->boolean, - 'int32' => $faker->numberBetween(0, Consts::PHP_INT32_MAX), - 'int64' => $faker->numberBetween(0, PHP_INT_MAX), - 'float' => 10.25, - 'double' => $faker->randomFloat(), - 'decimal' => \round($faker->randomFloat(5), 2), - 'string' => $faker->text(50), - 'date' => \DateTimeImmutable::createFromMutable($faker->dateTime)->setTime(0, 0, 0, 0), - 'datetime' => \DateTimeImmutable::createFromMutable($faker->dateTime), - 'list_of_datetimes' => [ - \DateTimeImmutable::createFromMutable($faker->dateTime), - \DateTimeImmutable::createFromMutable($faker->dateTime), - \DateTimeImmutable::createFromMutable($faker->dateTime), - ], - 'map_of_ints' => [ - 'a' => $faker->numberBetween(0, Consts::PHP_INT32_MAX), - 'b' => $faker->numberBetween(0, Consts::PHP_INT32_MAX), - 'c' => $faker->numberBetween(0, Consts::PHP_INT32_MAX), - ], - 'list_of_strings' => \array_map(static fn (int $i) => $faker->text(50), \range(0, \random_int(1, 10))), - 'struct_flat' => [ - 'id' => $i, - 'name' => 'name_' . \str_pad((string) $i, 5, '0', STR_PAD_LEFT), - ], - ], - ]; - }, \range(1, 100)); - - $inputData = \array_merge(...$inputData); - - $writer->write($path, $schema, $inputData); - - $reader = new Reader(); - $file = $reader->read($path); - - $this->assertEquals( - $inputData, - \iterator_to_array($file->values()), - ); - } -}