Skip to content

Commit

Permalink
Added possibility to encode pages as dictionary (#646)
Browse files Browse the repository at this point in the history
* Added possibility to encode pages as dictionary

* Static analyze fixes
  • Loading branch information
norberttech authored Oct 25, 2023
1 parent 49c243f commit ebb0aa0
Show file tree
Hide file tree
Showing 22 changed files with 767 additions and 181 deletions.
4 changes: 2 additions & 2 deletions src/lib/parquet/resources/python/generators/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Color(Enum):
json_col = pd.Series([json.dumps({'key': random.randint(1, 10)}) for _ in range(n_rows)], dtype='string')
date_col = pd.Series([datetime.now().date() + timedelta(days=i) for i in range(n_rows)], dtype='object')
timestamp_col = pd.Series([pd.Timestamp(datetime.now() + timedelta(seconds=i * 10)) for i in range(n_rows)], dtype='datetime64[ns]')
time_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) for i in range(n_rows)], dtype='object')
time_col = pd.Series([time(hour=(i+1) % 24, minute=((i+1) * 2) % 60, second=((i+1) * 3) % 60) for i in range(n_rows)], dtype='object')
uuid_col = pd.Series([str(uuid.uuid4()) for _ in range(n_rows)], dtype='string')
enum_col = pd.Series([random.choice(list(Color)).name for _ in range(n_rows)], dtype='string')
float_col = pd.Series([random.uniform(0, 100) for _ in range(n_rows)], dtype='float32')
Expand All @@ -40,7 +40,7 @@ class Color(Enum):
json_nullable_col = pd.Series([json.dumps({'key': random.randint(1, 10)}) if i % 2 == 0 else None for i in range(n_rows)], dtype='string')
date_nullable_col = pd.Series([datetime.now().date() + timedelta(days=i) if i % 2 == 0 else None for i in range(n_rows)], dtype='object')
timestamp_nullable_col = pd.Series([pd.Timestamp(datetime.now() + timedelta(seconds=i * 10)) if i % 2 == 0 else None for i in range(n_rows)], dtype='object')
time_nullable_col = pd.Series([time(hour=i % 24, minute=(i * 2) % 60, second=(i * 3) % 60) if i % 2 == 0 else None for i in range(n_rows)], dtype='object')
time_nullable_col = pd.Series([time(hour=(i+1) % 24, minute=((i+1) * 2) % 60, second=((i+1) * 3) % 60) if i % 2 == 0 else None for i in range(n_rows)], dtype='object')
uuid_nullable_col = pd.Series([str(uuid.uuid4()) if i % 2 == 0 else None for i in range(n_rows)], dtype='string')
enum_nullable_col = pd.Series([random.choice(list(Color)).name if i % 2 == 0 else None for i in range(n_rows)], dtype='string')
float_nullable_col = pd.Series([random.uniform(0, 100) if i % 2 == 0 else None for i in range(n_rows)], dtype='float32')
Expand Down
22 changes: 11 additions & 11 deletions src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function fromParquetType(mixed $data) : \DateInterval

public function isFor(FlatColumn $column, Options $options) : bool
{
if ($column->type() === PhysicalType::INT32 && $column->logicalType()?->name() === LogicalType::TIME) {
if ($column->type() === PhysicalType::INT64 && $column->logicalType()?->name() === LogicalType::TIME) {
return true;
}

Expand All @@ -35,8 +35,8 @@ public function toParquetType(mixed $data) : int
*/
private function toDateInterval(int $microseconds) : \DateInterval
{
$seconds = (int) \floor($microseconds / 1000000);
$remainingMicroseconds = $microseconds % 1000000;
$seconds = (int) \floor($microseconds / 100000000);
$remainingMicroseconds = $microseconds % 100000000;

$minutes = (int) \floor($seconds / 60);
$remainingSeconds = $seconds % 60;
Expand Down Expand Up @@ -64,7 +64,7 @@ private function toDateInterval(int $microseconds) : \DateInterval
$interval->y = 0;
$interval->m = 0;
$interval->d = 0;
$interval->f = ($remainingMicroseconds / 1000000);
$interval->f = ($remainingMicroseconds / 100000000);

return $interval;
}
Expand All @@ -81,13 +81,13 @@ private function toInt(\DateInterval $interval) : int

$microseconds = 0;

$microseconds += $interval->y * 365 * 24 * 60 * 60 * 1000000; // years to microseconds
$microseconds += $interval->m * 30 * 24 * 60 * 60 * 1000000; // months to microseconds (approx)
$microseconds += $interval->d * 24 * 60 * 60 * 1000000; // days to microseconds
$microseconds += $interval->h * 60 * 60 * 1000000; // hours to microseconds
$microseconds += $interval->i * 60 * 1000000; // minutes to microseconds
$microseconds += $interval->s * 1000000; // seconds to microseconds
$microseconds += (int) (($interval->f) * 1000000); // microseconds
$microseconds += $interval->y * 365 * 24 * 60 * 60 * 100000000; // years to microseconds
$microseconds += $interval->m * 30 * 24 * 60 * 60 * 100000000; // months to microseconds (approx)
$microseconds += $interval->d * 24 * 60 * 60 * 100000000; // days to microseconds
$microseconds += $interval->h * 60 * 60 * 100000000; // hours to microseconds
$microseconds += $interval->i * 60 * 100000000; // minutes to microseconds
$microseconds += $interval->s * 100000000; // seconds to microseconds
$microseconds += (int) (($interval->f) * 100000000); // microseconds

return $microseconds;
}
Expand Down
11 changes: 10 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Flow\Parquet\Data\Converter\Int64DateTimeConverter;
use Flow\Parquet\Data\Converter\Int96DateTimeConverter;
use Flow\Parquet\Data\Converter\TimeConverter;
use Flow\Parquet\Exception\DataConversionException;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

Expand Down Expand Up @@ -56,7 +57,15 @@ public function fromParquetType(FlatColumn $column, mixed $data) : mixed
if ($converter->isFor($column, $this->options)) {
$this->cache[$column->flatPath()] = $converter;

return $converter->fromParquetType($data);
try {
return $converter->fromParquetType($data);
} catch (\Throwable $e) {
throw new DataConversionException(
"Failed to convert data from parquet type for column '{$column->flatPath()}'. {$e->getMessage()}",
0,
$e
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\Exception;

final class DataConversionException extends \Flow\ETL\Exception\RuntimeException
{
}
7 changes: 3 additions & 4 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Flow\Parquet\ParquetFile\ColumnPageHeader;
use Flow\Parquet\ParquetFile\Data\DataBuilder;
use Flow\Parquet\ParquetFile\Metadata;
use Flow\Parquet\ParquetFile\Page\PageHeader;
use Flow\Parquet\ParquetFile\PageReader;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema;
Expand Down Expand Up @@ -82,7 +81,7 @@ public function pageHeaders() : \Generator
{
foreach ($this->schema()->columnsFlat() as $column) {
foreach ($this->viewChunksPages($column) as $pageHeader) {
yield new ColumnPageHeader($column, $pageHeader);
yield $pageHeader;
}
}
}
Expand Down Expand Up @@ -350,7 +349,7 @@ private function readStruct(NestedColumn $structColumn, bool $isCollection = fal
}

/**
* @return \Generator<PageHeader>
* @return \Generator<ColumnPageHeader>
*/
private function viewChunksPages(FlatColumn $column) : \Generator
{
Expand All @@ -359,7 +358,7 @@ private function viewChunksPages(FlatColumn $column) : \Generator
foreach ($this->getColumnChunks($column) as $columnChunks) {
foreach ($columnChunks as $columnChunk) {
foreach ($viewer->view($columnChunk, $column, $this->stream) as $pageHeader) {
yield $pageHeader;
yield new ColumnPageHeader($column, $columnChunk, $pageHeader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
namespace Flow\Parquet\ParquetFile;

use Flow\Parquet\ParquetFile\Page\PageHeader;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

final class ColumnPageHeader
{
public function __construct(
public readonly FlatColumn $column,
public readonly ColumnChunk $columnChunk,
public readonly PageHeader $pageHeader,
) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public function encoding() : Encodings
return $this->encoding;
}

public function toThrift() : \Flow\Parquet\Thrift\DictionaryPageHeader
{
return new \Flow\Parquet\Thrift\DictionaryPageHeader([
'encoding' => $this->encoding->value,
'num_values' => $this->valuesCount,
'is_sorted' => false,
]);
}

public function valuesCount() : int
{
return $this->valuesCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public function toThrift() : \Flow\Parquet\Thrift\PageHeader
'crc' => null,
'data_page_header' => $this->dataPageHeader?->toThrift(),
'data_page_header_v2' => null,
'dictionary_page_header' => null,
'dictionary_page_header' => $this->dictionaryPageHeader?->toThrift(),
'index_page_header' => null,
]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ public function pageOffset() : int
return $offset;
}

public function rootName() : string
{
return $this->path[0];
}

public function totalCompressedSize() : int
{
return $this->totalCompressedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ public function flush(int $fileOffset) : RowGroupContainer
$chunkContainers = [];

foreach ($this->chunkBuilders as $chunkBuilder) {
foreach ($chunkBuilder->flush($fileOffset) as $chunkContainer) {
$fileOffset += \strlen($chunkContainer->binaryBuffer);
$chunkContainers[] = $chunkContainer;
}
$chunkContainer = $chunkBuilder->flush($fileOffset);
$fileOffset += \strlen($chunkContainer->binaryBuffer);
$chunkContainers[] = $chunkContainer;
}

$buffer = '';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
namespace Flow\Parquet\ParquetFile\RowGroupBuilder;

use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\Type;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

Expand All @@ -21,45 +23,60 @@ public function addRow(mixed $data) : void
$this->data[] = $data;
}

public function flush(int $fileOffset) : ColumnChunkContainer
{
return $this->createColumnChunkContainer(
(new PagesBuilder($this->dataConverter))->build($this->column, $this->data),
$fileOffset
);
}

/**
* @return array<ColumnChunkContainer>
* @param array<PageContainer> $pageContainers
*/
public function flush(int $fileOffset) : array
private function createColumnChunkContainer(array $pageContainers, int $offset) : ColumnChunkContainer
{
$offset = $fileOffset;
$columnChunkContainers = [];
$buffer = '';
$encodings = [];
$valuesCount = 0;
$size = 0;
$dictionaryPageSize = null;
$dictionaryPageOffset = null;
$pageOffset = $offset;

$pageContainer = (new DataPagesBuilder($this->data))->build($this->column, $this->dataConverter);
foreach ($pageContainers as $pageContainer) {
if ($pageContainer->pageHeader->type() === Type::DICTIONARY_PAGE) {
if ($dictionaryPageSize !== null) {
throw new RuntimeException('There can be only one dictionary page in column chunk');
}

$columnChunkContainers[] = $this->createColumnChunkContainer($pageContainer, $offset);
$offset += $pageContainer->size();
$dictionaryPageOffset = $pageOffset;
$dictionaryPageSize = $pageContainer->size();
}

$this->data = [];
$buffer .= $pageContainer->pageHeaderBuffer . $pageContainer->pageBuffer;
$encodings[] = $pageContainer->pageHeader->encoding()->value;
$valuesCount += \count($pageContainer->values);
$size += $pageContainer->size();
$pageOffset += $pageContainer->size();
}

return $columnChunkContainers;
}
$encodings = \array_values(\array_unique($encodings));
$encodings = \array_map(static fn (int $encoding) => Encodings::from($encoding), $encodings);

/**
* @psalm-suppress PossiblyNullArgument
*/
private function createColumnChunkContainer(PageContainer $pageContainer, int $offset) : ColumnChunkContainer
{
return new ColumnChunkContainer(
$pageContainer->pageHeaderBuffer . $pageContainer->pageDataBuffer,
$buffer,
new ColumnChunk(
$this->column->type(),
Compressions::UNCOMPRESSED,
/** @phpstan-ignore-next-line */
$pageContainer->pageHeader->dataValuesCount(),
$offset,
$this->column->path(),
[
Encodings::PLAIN,
],
\strlen($pageContainer->pageDataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
\strlen($pageContainer->pageDataBuffer) + \strlen($pageContainer->pageHeaderBuffer),
dictionaryPageOffset: null,
dataPageOffset: $offset,
type: $this->column->type(),
codec: Compressions::UNCOMPRESSED,
valuesCount: $valuesCount,
fileOffset: $offset,
path: $this->column->path(),
encodings: $encodings,
totalCompressedSize: $size,
totalUncompressedSize: $size,
dictionaryPageOffset: $dictionaryPageOffset,
dataPageOffset: ($dictionaryPageOffset) ? $offset + $dictionaryPageSize : $offset,
indexPageOffset: null,
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\RowGroupBuilder;

use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;

interface PageBuilder
{
public function build(FlatColumn $column, DataConverter $dataConverter, array $rows) : PageContainer;
}
Loading

0 comments on commit ebb0aa0

Please sign in to comment.