Skip to content

Commit

Permalink
Added possibility to easily identify schema root in Parquet Schema (#627
Browse files Browse the repository at this point in the history
)
  • Loading branch information
norberttech authored Oct 22, 2023
1 parent 427b0b5 commit b9c4e63
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 52 deletions.
68 changes: 27 additions & 41 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static function fromThrift(array $schemaElements) : self
public static function with(Column ...$columns) : self
{
return new self(
NestedColumn::struct(
NestedColumn::schemaRoot(
'schema',
$columns,
)
Expand All @@ -62,44 +62,13 @@ public function columns() : array

public function get(string $flatPath) : Column
{
if (\array_key_exists($flatPath, $this->cache)) {
return $this->cache[$flatPath];
}

$getByFlatPath = static function (string $flatPath, array $columns) use (&$getByFlatPath) : ?Column {
/** @var Column $column */
foreach ($columns as $column) {
if ($column instanceof FlatColumn) {
if ($column->flatPath() === $flatPath) {
return $column;
}
} else {
/** @var NestedColumn $column */
if ($column->flatPath() === $flatPath) {
return $column;
}

/**
* @var null|NestedColumn $nestedColumn
*
* @psalm-suppress MixedFunctionCall
*/
$nestedColumn = $getByFlatPath($flatPath, $column->children());

if ($nestedColumn !== null) {
return $nestedColumn;
}
}
if (!\count($this->cache)) {
foreach ($this->columns() as $column) {
$this->cache($column);
}
}

return null;
};

$column = $getByFlatPath($flatPath, $this->schemaRoot->children());

if ($column instanceof Column) {
$this->cache[$flatPath] = $column;

if (\array_key_exists($flatPath, $this->cache)) {
return $this->cache[$flatPath];
}

Expand All @@ -125,6 +94,17 @@ public function toDDL() : array
]];
}

private function cache(Column $column) : void
{
$this->cache[$column->flatPath()] = $column;

if ($column instanceof NestedColumn) {
foreach ($column->children() as $child) {
$this->cache($child);
}
}
}

/**
* @param array<Column> $columns
*/
Expand All @@ -147,6 +127,7 @@ private function generateDDL(array $columns) : array
private static function processSchema(array $schemaElements, int &$index = 0) : array
{
$element = $schemaElements[$index];
$schemaRoot = $index === 0;
$index++;

if ($element->num_children) {
Expand All @@ -157,10 +138,15 @@ private static function processSchema(array $schemaElements, int &$index = 0) :
}

return [
NestedColumn::fromThrift(
$element,
$children,
),
$schemaRoot
? NestedColumn::schemaRoot(
$element->name,
$children,
)
: NestedColumn::fromThrift(
$element,
$children,
),
];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function name() : string;

public function normalize() : array;

public function parent() : ?self;
public function parent() : ?NestedColumn;

public function repetition() : ?Repetition;

Expand Down
14 changes: 11 additions & 3 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,24 @@ public function ddl() : array

public function flatPath() : string
{
$path = [$this->name];
$parent = $this->parent();

if ($parent?->schemaRoot) {
return $this->name;
}

$path = [$this->name];

while ($parent) {
$path[] = $parent->name();
$parent = $parent->parent();

if ($parent && $parent->schemaRoot) {
break;
}
}

$path = \array_reverse($path);
\array_shift($path);

return \implode('.', $path);
}
Expand Down Expand Up @@ -281,7 +289,7 @@ public function normalize() : array
];
}

public function parent() : ?Column
public function parent() : ?NestedColumn
{
return $this->parent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public function __construct(
private readonly ?Repetition $repetition,
private readonly array $children,
private readonly ?LogicalType $logicalType = null,
public readonly bool $schemaRoot = false
) {
foreach ($children as $child) {
$child->setParent($this);
Expand Down Expand Up @@ -82,6 +83,14 @@ public static function map(string $name, MapKey $key, MapValue $value) : self
);
}

/**
* @param array<Column> $children
*/
public static function schemaRoot(string $name, array $children) : self
{
return new self($name, Repetition::REQUIRED, $children, null, true);
}

/**
* @param array<Column> $children
*/
Expand All @@ -103,6 +112,24 @@ public function children() : array
return $this->children;
}

/**
* @return array<string, Column>
*/
public function childrenFlat() : array
{
$flat = [];

foreach ($this->children as $child) {
if ($child instanceof self) {
$flat = \array_merge($flat, $child->childrenFlat());
} else {
$flat[$child->flatPath()] = $child;
}
}

return $flat;
}

public function ddl() : array
{
$ddlArray = [
Expand All @@ -120,16 +147,24 @@ public function ddl() : array

public function flatPath() : string
{
$path = [$this->name];
$parent = $this->parent();

if ($parent?->schemaRoot) {
return $this->name;
}

$path = [$this->name];

while ($parent) {
$path[] = $parent->name();
$parent = $parent->parent();

if ($parent && $parent->schemaRoot) {
break;
}
}

$path = \array_reverse($path);
\array_shift($path);

return \implode('.', $path);
}
Expand Down Expand Up @@ -321,7 +356,7 @@ public function normalizeChildren() : array
return $normalized;
}

public function parent() : ?Column
public function parent() : ?self
{
return $this->parent;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\Tests\Unit\ParquetFile\Schema;

use Flow\Parquet\ParquetFile\Schema;
use PHPUnit\Framework\TestCase;

final class NestedColumnTest extends TestCase
{
public function test_children_flat() : void
{
$column = Schema\NestedColumn::create('nested', [
Schema\NestedColumn::create('nested', [
Schema\NestedColumn::create('nested', [
Schema\FlatColumn::int32('int'),
Schema\FlatColumn::string('string'),
Schema\FlatColumn::boolean('bool'),
]),
]),
]);

$this->assertSame(
[
'nested.nested.nested.int',
'nested.nested.nested.string',
'nested.nested.nested.bool',
],
\array_keys($column->childrenFlat())
);
}

public function test_flat_path_for_direct_root_child() : void
{
$schema = Schema::with(
Schema\FlatColumn::int32('int'),
Schema\FlatColumn::string('string'),
Schema\FlatColumn::boolean('bool'),
);

$this->assertSame('int', $schema->get('int')->flatPath());
$this->assertSame('string', $schema->get('string')->flatPath());
$this->assertSame('bool', $schema->get('bool')->flatPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ public function test_calculating_repetition_and_definition_for_nested_fields() :
),
);

$this->assertSame(2, $schema->get('int')->maxDefinitionsLevel());
$this->assertSame(1, $schema->get('int')->maxDefinitionsLevel());
$this->assertSame(0, $schema->get('int')->maxRepetitionsLevel());
$this->assertSame(3, $schema->get('nested.int')->maxDefinitionsLevel());
$this->assertSame(2, $schema->get('nested.int')->maxDefinitionsLevel());
$this->assertSame(0, $schema->get('nested.int')->maxRepetitionsLevel());
$this->assertSame(4, $schema->get('nested.nested.bool')->maxDefinitionsLevel());
$this->assertSame(3, $schema->get('nested.nested.bool')->maxDefinitionsLevel());
$this->assertSame(0, $schema->get('nested.nested.bool')->maxRepetitionsLevel());
$this->assertSame(5, $schema->get('nested.list_of_ints.list.element')->maxDefinitionsLevel());
$this->assertSame(4, $schema->get('nested.list_of_ints.list.element')->maxDefinitionsLevel());
$this->assertSame(1, $schema->get('nested.list_of_ints.list.element')->maxRepetitionsLevel());
}
}

0 comments on commit b9c4e63

Please sign in to comment.