Skip to content

Commit

Permalink
Parquet - added support for GZIP and SNAPPY compressions
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Oct 30, 2023
1 parent 758e68c commit 42ab5b0
Show file tree
Hide file tree
Showing 19 changed files with 337 additions and 41 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"ext-json": "*",
"ext-mbstring": "*",
"ext-xmlreader": "*",
"ext-zlib": "*",
"amphp/process": "^2",
"amphp/socket": "^2",
"codename/parquet": "~0.6.2 || ~0.7.0",
Expand Down
8 changes: 8 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ enum Option
*/
case DICTIONARY_PAGE_SIZE;

/**
* Compression level for GZIP codec. This option is going to be passed to gzcompress function when Compression is set to GZIP.
* Lower level means faster compression, but bigger file size.
*
* Default value is 9
*/
case GZIP_COMPRESSION_LEVEL;

/**
* When this option is set to true, reader will try to convert INT96 logical type to DateTimeImmutable object.
* Some parquet writers due to historical reasons might still use INT96 to store timestamps with nanoseconds precision
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public function __construct()
Option::ROW_GROUP_SIZE_BYTES->name => Consts::MB_SIZE * 128,
Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
];
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function __construct(
private $stream,
private readonly ByteOrder $byteOrder,
private readonly DataConverter $dataConverter,
private readonly Options $options,
private readonly LoggerInterface $logger = new NullLogger()
) {
}
Expand Down Expand Up @@ -90,7 +91,7 @@ public function readChunks(FlatColumn $column, ?int $limit = null) : \Generator
{
$reader = new WholeChunkReader(
new DataBuilder($this->dataConverter, $this->logger),
new PageReader($column, $this->byteOrder, $this->logger),
new PageReader($column, $this->byteOrder, $this->options, $this->logger),
$this->logger
);

Expand Down
29 changes: 29 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,38 @@
namespace Flow\Parquet\ParquetFile;

use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\Option;
use Flow\Parquet\Options;

final class Codec
{
public function __construct(
private readonly Options $options
) {
}

public function compress(string $data, Compressions $compression) : string
{
/**
* @var false|string $result
*
* @psalm-suppress PossiblyInvalidArgument
*/
$result = match ($compression) {
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_compress($data),
/** @phpstan-ignore-next-line */
Compressions::GZIP => \gzencode($data, $this->options->get(Option::GZIP_COMPRESSION_LEVEL)),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};

if ($result === false) {
throw new RuntimeException('Failed to decompress data');
}

return $result;
}

public function decompress(string $data, Compressions $compression) : string
{
/** @var false|string $result */
Expand Down
6 changes: 4 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Flow\Parquet\ByteOrder;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Page\ColumnData;
use Flow\Parquet\ParquetFile\Page\Dictionary;
use Flow\Parquet\ParquetFile\Page\PageHeader;
Expand All @@ -16,6 +17,7 @@ final class PageReader
public function __construct(
private readonly FlatColumn $column,
private readonly ByteOrder $byteOrder,
private readonly Options $options,
private readonly LoggerInterface $logger = new NullLogger()
) {
}
Expand All @@ -29,7 +31,7 @@ public function readData(PageHeader $pageHeader, Compressions $codec, ?Dictionar
{
return (new DataCoder($this->byteOrder, $this->logger))
->decodeData(
(new Codec())
(new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize()),
Expand Down Expand Up @@ -59,7 +61,7 @@ public function readDictionary(PageHeader $pageHeader, Compressions $codec, $str

return (new DataCoder($this->byteOrder, $this->logger))
->decodeDictionary(
(new Codec())
(new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : se
);
}

// public function __debugInfo() : ?array
// {
// return $this->normalize();
// }

public function codec() : Compressions
{
return $this->codec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ final class RowGroupBuilder

public function __construct(
private readonly Schema $schema,
private readonly Compressions $compression,
private readonly Options $options,
private readonly DataConverter $dataConverter,
private readonly PageSizeCalculator $calculator
) {
$this->flattener = new Flattener();

$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema);
$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema, $this->compression);
$this->statistics = RowGroupStatistics::fromBuilders($this->chunkBuilders);
}

Expand Down Expand Up @@ -78,7 +79,7 @@ public function flush(int $fileOffset) : RowGroupContainer
new RowGroup($chunks, $this->statistics->rowsCount())
);

$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema);
$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema, $this->compression);
$this->statistics = RowGroupStatistics::fromBuilders($this->chunkBuilders);

return $rowGroupContainer;
Expand All @@ -102,12 +103,12 @@ public function statistics() : RowGroupStatistics
/**
* @return array<string, ColumnChunkBuilder>
*/
private function createColumnChunkBuilders(Schema $schema) : array
private function createColumnChunkBuilders(Schema $schema, Compressions $compression) : array
{
$builders = [];

foreach ($schema->columnsFlat() as $column) {
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator, $this->options);
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $compression, $this->dataConverter, $this->calculator, $this->options);
}

return $builders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ final class ColumnChunkBuilder

public function __construct(
private readonly FlatColumn $column,
private readonly Compressions $compression,
private readonly DataConverter $dataConverter,
private readonly PageSizeCalculator $calculator,
private readonly Options $options
Expand All @@ -31,23 +32,24 @@ public function addRow(mixed $row) : void

public function flush(int $fileOffset) : ColumnChunkContainer
{
$pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator, $this->options))->build($this->column, $this->rows, $this->statistics);
$pageContainers = (new PagesBuilder($this->dataConverter, $this->compression, $this->calculator, $this->options))
->build($this->column, $this->rows, $this->statistics);

$this->statistics->reset();

return new ColumnChunkContainer(
$pageContainers->buffer(),
new ColumnChunk(
type: $this->column->type(),
codec: Compressions::UNCOMPRESSED,
codec: $this->compression,
valuesCount: $pageContainers->valuesCount(),
fileOffset: $fileOffset,
path: $this->column->path(),
encodings: $pageContainers->encodings(),
totalCompressedSize: $pageContainers->size(),
totalUncompressedSize: $pageContainers->size(),
totalCompressedSize: $pageContainers->compressedSize(),
totalUncompressedSize: $pageContainers->uncompressedSize(),
dictionaryPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset : null,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalSize() : $fileOffset,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalCompressedSize() : $fileOffset,
indexPageOffset: null,
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
use Flow\Dremel\Dremel;
use Flow\Parquet\BinaryWriter\BinaryBufferWriter;
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Codec;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\Data\RLEBitPackedHybrid;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\DataPageHeader;
Expand All @@ -19,6 +22,8 @@ final class DataPageBuilder
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly Compressions $compression,
private readonly Options $options,
) {
}

Expand All @@ -45,9 +50,11 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null
$pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $shredded->values));
}

$compressedBuffer = (new Codec($this->options))->compress($pageBuffer, $this->compression);

$pageHeader = new PageHeader(
Type::DATA_PAGE,
\strlen($pageBuffer),
\strlen($compressedBuffer),
\strlen($pageBuffer),
dataPageHeader: new DataPageHeader(
$dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN,
Expand All @@ -60,7 +67,7 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null

return new PageContainer(
$pageHeaderBuffer->getBuffer(),
$pageBuffer,
$compressedBuffer,
$shredded->values,
null,
$pageHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

use Flow\Parquet\BinaryWriter\BinaryBufferWriter;
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Codec;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\DictionaryPageHeader;
use Flow\Parquet\ParquetFile\Page\Header\Type;
Expand All @@ -15,8 +18,11 @@

final class DictionaryPageBuilder
{
public function __construct(private readonly DataConverter $dataConverter)
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly Compressions $compression,
private readonly Options $options,
) {
}

public function build(FlatColumn $column, array $rows) : PageContainer
Expand All @@ -27,9 +33,11 @@ public function build(FlatColumn $column, array $rows) : PageContainer
$pageWriter = new BinaryBufferWriter($pageBuffer);
$pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $dictionary->dictionary));

$compressedBuffer = (new Codec($this->options))->compress($pageBuffer, $this->compression);

$pageHeader = new PageHeader(
Type::DICTIONARY_PAGE,
\strlen($pageBuffer),
\strlen($compressedBuffer),
\strlen($pageBuffer),
dataPageHeader: null,
dataPageHeaderV2: null,
Expand All @@ -42,7 +50,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer

return new PageContainer(
$pageHeaderBuffer->getBuffer(),
$pageBuffer,
$compressedBuffer,
$dictionary->indices,
$dictionary->dictionary,
$pageHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public function headerSize() : int
return \strlen($this->pageHeaderBuffer);
}

public function totalSize() : int
public function totalCompressedSize() : int
{
return $this->headerSize() + $this->dataSize();
return $this->headerSize() + $this->pageHeader->compressedPageSize();
}

public function totalUncompressedSize() : int
{
return $this->headerSize() + $this->pageHeader->uncompressedPageSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public function buffer() : string
return $buffer;
}

public function compressedSize() : int
{
$size = 0;

if ($this->dictionaryPageContainer) {
$size += $this->dictionaryPageContainer->totalCompressedSize();
}

foreach ($this->dataPageContainers as $pageContainer) {
$size += $pageContainer->totalCompressedSize();
}

return $size;
}

public function dataPageContainers() : array
{
return $this->dataPageContainers;
Expand Down Expand Up @@ -82,16 +97,16 @@ public function encodings() : array
return \array_map(static fn (int $encoding) => Encodings::from($encoding), $encodings);
}

public function size() : int
public function uncompressedSize() : int
{
$size = 0;

if ($this->dictionaryPageContainer) {
$size += $this->dictionaryPageContainer->totalSize();
$size += $this->dictionaryPageContainer->totalUncompressedSize();
}

foreach ($this->dataPageContainers as $pageContainer) {
$size += $pageContainer->totalSize();
$size += $pageContainer->totalUncompressedSize();
}

return $size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Option;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DataPageBuilder;
use Flow\Parquet\ParquetFile\RowGroupBuilder\PageBuilder\DictionaryPageBuilder;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
Expand All @@ -13,6 +14,7 @@ final class PagesBuilder
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly Compressions $compression,
private readonly PageSizeCalculator $pageSizeCalculator,
private readonly Options $options
) {
Expand All @@ -23,13 +25,13 @@ public function build(FlatColumn $column, array $rows, ColumnChunkStatistics $st
$containers = new PageContainers();

if ($statistics->cardinalityRation() <= $this->options->get(Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION)) {
$dictionaryPageContainer = (new DictionaryPageBuilder($this->dataConverter))->build($column, $rows);
$dictionaryPageContainer = (new DictionaryPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows);

if ($dictionaryPageContainer->dataSize() <= $this->options->get(Option::DICTIONARY_PAGE_SIZE)) {
$containers->add($dictionaryPageContainer);

$containers->add(
(new DataPageBuilder($this->dataConverter))->build($column, $rows, $dictionaryPageContainer->dictionary, $dictionaryPageContainer->values)
(new DataPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rows, $dictionaryPageContainer->dictionary, $dictionaryPageContainer->values)
);

return $containers;
Expand All @@ -39,7 +41,7 @@ public function build(FlatColumn $column, array $rows, ColumnChunkStatistics $st

/* @phpstan-ignore-next-line */
foreach (\array_chunk($rows, $this->pageSizeCalculator->rowsPerPage($column, $statistics)) as $rowsChunk) {
$containers->add((new DataPageBuilder($this->dataConverter))->build($column, $rowsChunk));
$containers->add((new DataPageBuilder($this->dataConverter, $this->compression, $this->options))->build($column, $rowsChunk));
}

return $containers;
Expand Down
Loading

0 comments on commit 42ab5b0

Please sign in to comment.