Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet - added support for GZIP and SNAPPY compressions #678

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"ext-json": "*",
"ext-mbstring": "*",
"ext-xmlreader": "*",
"ext-zlib": "*",
"amphp/process": "^2",
"amphp/socket": "^2",
"codename/parquet": "~0.6.2 || ~0.7.0",
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"require": {
"php": "~8.1 || ~8.2",
"ext-bcmath": "*",
"ext-zlib": "*",
"flow-php/dremel": "1.x-dev",
"flow-php/snappy": "1.x-dev"
},
Expand Down
8 changes: 8 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Option.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ enum Option
*/
case DICTIONARY_PAGE_SIZE;

/**
* Compression level for GZIP codec. This option is going to be passed to gzcompress function when Compression is set to GZIP.
* Lower level means faster compression, but bigger file size.
*
* Default value is 9
*/
case GZIP_COMPRESSION_LEVEL;

/**
* When this option is set to true, reader will try to convert INT96 logical type to DateTimeImmutable object.
* Some parquet writers due to historical reasons might still use INT96 to store timestamps with nanoseconds precision
Expand Down
1 change: 1 addition & 0 deletions src/lib/parquet/src/Flow/Parquet/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public function __construct()
Option::ROW_GROUP_SIZE_BYTES->name => Consts::MB_SIZE * 128,
Option::DICTIONARY_PAGE_SIZE->name => Consts::MB_SIZE,
Option::DICTIONARY_PAGE_MIN_CARDINALITY_RATION->name => 0.4,
Option::GZIP_COMPRESSION_LEVEL->name => 9,
];
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function __construct(
private $stream,
private readonly ByteOrder $byteOrder,
private readonly DataConverter $dataConverter,
private readonly Options $options,
private readonly LoggerInterface $logger = new NullLogger()
) {
}
Expand Down Expand Up @@ -90,7 +91,7 @@ public function readChunks(FlatColumn $column, ?int $limit = null) : \Generator
{
$reader = new WholeChunkReader(
new DataBuilder($this->dataConverter, $this->logger),
new PageReader($column, $this->byteOrder, $this->logger),
new PageReader($column, $this->byteOrder, $this->options, $this->logger),
$this->logger
);

Expand Down
29 changes: 29 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Codec.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,38 @@
namespace Flow\Parquet\ParquetFile;

use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\Option;
use Flow\Parquet\Options;

final class Codec
{
public function __construct(
private readonly Options $options
) {
}

public function compress(string $data, Compressions $compression) : string
{
/**
* @var false|string $result
*
* @psalm-suppress PossiblyInvalidArgument
*/
$result = match ($compression) {
Compressions::UNCOMPRESSED => $data,
Compressions::SNAPPY => \snappy_compress($data),
/** @phpstan-ignore-next-line */
Compressions::GZIP => \gzencode($data, $this->options->get(Option::GZIP_COMPRESSION_LEVEL)),
default => throw new RuntimeException('Compression ' . $compression->name . ' is not supported yet')
};

if ($result === false) {
throw new RuntimeException('Failed to decompress data');
}

return $result;
}

public function decompress(string $data, Compressions $compression) : string
{
/** @var false|string $result */
Expand Down
6 changes: 4 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Flow\Parquet\ByteOrder;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Page\ColumnData;
use Flow\Parquet\ParquetFile\Page\Dictionary;
use Flow\Parquet\ParquetFile\Page\PageHeader;
Expand All @@ -16,6 +17,7 @@ final class PageReader
public function __construct(
private readonly FlatColumn $column,
private readonly ByteOrder $byteOrder,
private readonly Options $options,
private readonly LoggerInterface $logger = new NullLogger()
) {
}
Expand All @@ -29,7 +31,7 @@ public function readData(PageHeader $pageHeader, Compressions $codec, ?Dictionar
{
return (new DataCoder($this->byteOrder, $this->logger))
->decodeData(
(new Codec())
(new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize()),
Expand Down Expand Up @@ -59,7 +61,7 @@ public function readDictionary(PageHeader $pageHeader, Compressions $codec, $str

return (new DataCoder($this->byteOrder, $this->logger))
->decodeDictionary(
(new Codec())
(new Codec($this->options))
->decompress(
/** @phpstan-ignore-next-line */
\fread($stream, $pageHeader->compressedPageSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : se
);
}

// public function __debugInfo() : ?array
// {
// return $this->normalize();
// }

public function codec() : Compressions
{
return $this->codec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ final class RowGroupBuilder

public function __construct(
private readonly Schema $schema,
private readonly Compressions $compression,
private readonly Options $options,
private readonly DataConverter $dataConverter,
private readonly PageSizeCalculator $calculator
) {
$this->flattener = new Flattener();

$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema);
$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema, $this->compression);
$this->statistics = RowGroupStatistics::fromBuilders($this->chunkBuilders);
}

Expand Down Expand Up @@ -78,7 +79,7 @@ public function flush(int $fileOffset) : RowGroupContainer
new RowGroup($chunks, $this->statistics->rowsCount())
);

$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema);
$this->chunkBuilders = $this->createColumnChunkBuilders($this->schema, $this->compression);
$this->statistics = RowGroupStatistics::fromBuilders($this->chunkBuilders);

return $rowGroupContainer;
Expand All @@ -102,12 +103,12 @@ public function statistics() : RowGroupStatistics
/**
* @return array<string, ColumnChunkBuilder>
*/
private function createColumnChunkBuilders(Schema $schema) : array
private function createColumnChunkBuilders(Schema $schema, Compressions $compression) : array
{
$builders = [];

foreach ($schema->columnsFlat() as $column) {
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $this->dataConverter, $this->calculator, $this->options);
$builders[$column->flatPath()] = new ColumnChunkBuilder($column, $compression, $this->dataConverter, $this->calculator, $this->options);
}

return $builders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ final class ColumnChunkBuilder

public function __construct(
private readonly FlatColumn $column,
private readonly Compressions $compression,
private readonly DataConverter $dataConverter,
private readonly PageSizeCalculator $calculator,
private readonly Options $options
Expand All @@ -31,23 +32,24 @@ public function addRow(mixed $row) : void

public function flush(int $fileOffset) : ColumnChunkContainer
{
$pageContainers = (new PagesBuilder($this->dataConverter, $this->calculator, $this->options))->build($this->column, $this->rows, $this->statistics);
$pageContainers = (new PagesBuilder($this->dataConverter, $this->compression, $this->calculator, $this->options))
->build($this->column, $this->rows, $this->statistics);

$this->statistics->reset();

return new ColumnChunkContainer(
$pageContainers->buffer(),
new ColumnChunk(
type: $this->column->type(),
codec: Compressions::UNCOMPRESSED,
codec: $this->compression,
valuesCount: $pageContainers->valuesCount(),
fileOffset: $fileOffset,
path: $this->column->path(),
encodings: $pageContainers->encodings(),
totalCompressedSize: $pageContainers->size(),
totalUncompressedSize: $pageContainers->size(),
totalCompressedSize: $pageContainers->compressedSize(),
totalUncompressedSize: $pageContainers->uncompressedSize(),
dictionaryPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset : null,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalSize() : $fileOffset,
dataPageOffset: ($pageContainers->dictionaryPageContainer()) ? $fileOffset + $pageContainers->dictionaryPageContainer()->totalCompressedSize() : $fileOffset,
indexPageOffset: null,
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
use Flow\Dremel\Dremel;
use Flow\Parquet\BinaryWriter\BinaryBufferWriter;
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Codec;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\Data\RLEBitPackedHybrid;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\DataPageHeader;
Expand All @@ -19,6 +22,8 @@ final class DataPageBuilder
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly Compressions $compression,
private readonly Options $options,
) {
}

Expand All @@ -45,9 +50,11 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null
$pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $shredded->values));
}

$compressedBuffer = (new Codec($this->options))->compress($pageBuffer, $this->compression);

$pageHeader = new PageHeader(
Type::DATA_PAGE,
\strlen($pageBuffer),
\strlen($compressedBuffer),
\strlen($pageBuffer),
dataPageHeader: new DataPageHeader(
$dictionary && $indices ? Encodings::RLE_DICTIONARY : Encodings::PLAIN,
Expand All @@ -60,7 +67,7 @@ public function build(FlatColumn $column, array $rows, ?array $dictionary = null

return new PageContainer(
$pageHeaderBuffer->getBuffer(),
$pageBuffer,
$compressedBuffer,
$shredded->values,
null,
$pageHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

use Flow\Parquet\BinaryWriter\BinaryBufferWriter;
use Flow\Parquet\Data\DataConverter;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Codec;
use Flow\Parquet\ParquetFile\Compressions;
use Flow\Parquet\ParquetFile\Encodings;
use Flow\Parquet\ParquetFile\Page\Header\DictionaryPageHeader;
use Flow\Parquet\ParquetFile\Page\Header\Type;
Expand All @@ -15,8 +18,11 @@

final class DictionaryPageBuilder
{
public function __construct(private readonly DataConverter $dataConverter)
{
public function __construct(
private readonly DataConverter $dataConverter,
private readonly Compressions $compression,
private readonly Options $options,
) {
}

public function build(FlatColumn $column, array $rows) : PageContainer
Expand All @@ -27,9 +33,11 @@ public function build(FlatColumn $column, array $rows) : PageContainer
$pageWriter = new BinaryBufferWriter($pageBuffer);
$pageWriter->append((new PlainValuesPacker($this->dataConverter))->packValues($column, $dictionary->dictionary));

$compressedBuffer = (new Codec($this->options))->compress($pageBuffer, $this->compression);

$pageHeader = new PageHeader(
Type::DICTIONARY_PAGE,
\strlen($pageBuffer),
\strlen($compressedBuffer),
\strlen($pageBuffer),
dataPageHeader: null,
dataPageHeaderV2: null,
Expand All @@ -42,7 +50,7 @@ public function build(FlatColumn $column, array $rows) : PageContainer

return new PageContainer(
$pageHeaderBuffer->getBuffer(),
$pageBuffer,
$compressedBuffer,
$dictionary->indices,
$dictionary->dictionary,
$pageHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ public function headerSize() : int
return \strlen($this->pageHeaderBuffer);
}

public function totalSize() : int
public function totalCompressedSize() : int
{
return $this->headerSize() + $this->dataSize();
return $this->headerSize() + $this->pageHeader->compressedPageSize();
}

public function totalUncompressedSize() : int
{
return $this->headerSize() + $this->pageHeader->uncompressedPageSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public function buffer() : string
return $buffer;
}

public function compressedSize() : int
{
$size = 0;

if ($this->dictionaryPageContainer) {
$size += $this->dictionaryPageContainer->totalCompressedSize();
}

foreach ($this->dataPageContainers as $pageContainer) {
$size += $pageContainer->totalCompressedSize();
}

return $size;
}

public function dataPageContainers() : array
{
return $this->dataPageContainers;
Expand Down Expand Up @@ -82,16 +97,16 @@ public function encodings() : array
return \array_map(static fn (int $encoding) => Encodings::from($encoding), $encodings);
}

public function size() : int
public function uncompressedSize() : int
{
$size = 0;

if ($this->dictionaryPageContainer) {
$size += $this->dictionaryPageContainer->totalSize();
$size += $this->dictionaryPageContainer->totalUncompressedSize();
}

foreach ($this->dataPageContainers as $pageContainer) {
$size += $pageContainer->totalSize();
$size += $pageContainer->totalUncompressedSize();
}

return $size;
Expand Down
Loading