Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entry factory moved from extractors to FlowContext #616

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ Please follow the instructions for your specific version to ensure a smooth upgr

---

## Upgrading from 0.4.x to 0.5.x

### 1) Entry factory moved from extractors to `FlowContext`

To improve code quality and reduce code coupling `EntryFactory` was removed from all constructors of extractors, in favor of passing it into `FlowContext` & re-using same entry factory in a whole pipeline.

---

## Upgrading from 0.3.x to 0.4.x

### 1) Transformers replaced with expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@

final class AvroExtractor implements Extractor
{
/**
* @param Path $path
*/
public function __construct(
private readonly Path $path,
private readonly int $rowsInBach = 1000,
private readonly Row\EntryFactory $entryFactory = new Row\Factory\NativeEntryFactory()
private readonly int $rowsInBach = 1000
) {
}

Expand Down Expand Up @@ -56,15 +52,15 @@ public function extract(FlowContext $context) : \Generator
}

if (\count($rows) >= $this->rowsInBach) {
yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());
/** @var array<Row> $rows */
$rows = [];
}
}
}

if ([] !== $rows) {
yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());
}
}
}
12 changes: 2 additions & 10 deletions src/adapter/etl-adapter-avro/src/Flow/ETL/DSL/Avro.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\EntryFactory;
use Flow\ETL\Row\Factory\NativeEntryFactory;
use Flow\ETL\Row\Schema;

/**
Expand All @@ -20,14 +18,10 @@ class Avro
{
/**
* @param array<Path|string>|Path|string $path
* @param int $rows_in_batch
*
* @return Extractor
*/
final public static function from(
Path|string|array $path,
int $rows_in_batch = 1000,
EntryFactory $entry_factory = new NativeEntryFactory()
) : Extractor {
if (\is_array($path)) {
/** @var array<Extractor> $extractors */
Expand All @@ -36,8 +30,7 @@ final public static function from(
foreach ($path as $next_path) {
$extractors[] = new AvroExtractor(
\is_string($next_path) ? Path::realpath($next_path) : $next_path,
$rows_in_batch,
$entry_factory
$rows_in_batch
);
}

Expand All @@ -46,8 +39,7 @@ final public static function from(

return new AvroExtractor(
\is_string($path) ? Path::realpath($path) : $path,
$rows_in_batch,
$entry_factory
$rows_in_batch
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use Flow\ETL\Row\EntryFactory;

use Flow\ETL\Row\Factory\NativeEntryFactory;

final class CSVExtractor implements Extractor
{
Expand All @@ -27,8 +24,7 @@ public function __construct(
private readonly string $separator = ',',
private readonly string $enclosure = '"',
private readonly string $escape = '\\',
private readonly int $charactersReadInLine = 1000,
private readonly EntryFactory $entryFactory = new NativeEntryFactory()
private readonly int $charactersReadInLine = 1000
) {
}

Expand Down Expand Up @@ -94,7 +90,7 @@ public function extract(FlowContext $context) : \Generator
}

if (\count($rows) >= $this->rowsInBatch) {
yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());

/** @var array<Row> $rows */
$rows = [];
Expand All @@ -104,7 +100,7 @@ public function extract(FlowContext $context) : \Generator
}

if ([] !== $rows) {
yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());
}

if ($stream->isOpen()) {
Expand Down
29 changes: 3 additions & 26 deletions src/adapter/etl-adapter-csv/src/Flow/ETL/DSL/CSV.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,18 @@

use Flow\ETL\Adapter\CSV\CSVExtractor;
use Flow\ETL\Adapter\CSV\CSVLoader;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Loader;
use Flow\ETL\Row\EntryFactory;
use Flow\ETL\Row\Factory\NativeEntryFactory;

class CSV
{
/**
* @param array<Path|string>|Path|string $uri
* @param int $rows_in_batch
* @param bool $with_header
* @param bool $empty_to_null
* @param string $delimiter
* @param string $enclosure
* @param string $escape
* @param int<0, max> $characters_read_in_line
* @param EntryFactory $entry_factory
*
* @throws \Flow\ETL\Exception\InvalidArgumentException
*
* @return Extractor
* @throws InvalidArgumentException
*/
final public static function from(
string|Path|array $uri,
Expand All @@ -37,8 +27,7 @@ final public static function from(
string $delimiter = ',',
string $enclosure = '"',
string $escape = '\\',
int $characters_read_in_line = 1000,
EntryFactory $entry_factory = new NativeEntryFactory()
int $characters_read_in_line = 1000
) : Extractor {
if (\is_array($uri)) {
$extractors = [];
Expand All @@ -53,7 +42,6 @@ final public static function from(
$enclosure,
$escape,
$characters_read_in_line,
$entry_factory
);
}

Expand All @@ -69,20 +57,9 @@ final public static function from(
$enclosure,
$escape,
$characters_read_in_line,
$entry_factory
);
}

/**
* @param Path|string $uri
* @param bool $with_header
* @param string $separator
* @param string $enclosure
* @param string $escape
* @param string $new_line_separator
*
* @return Loader
*/
final public static function to(
string|Path $uri,
bool $with_header = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;

final class DbalLimitOffsetExtractor implements Extractor
{
Expand All @@ -19,7 +18,6 @@ public function __construct(
private readonly QueryBuilder $queryBuilder,
private readonly int $pageSize = 1000,
private readonly ?int $maximum = null,
private readonly Row\EntryFactory $entryFactory = new Row\Factory\NativeEntryFactory()
) {
}

Expand All @@ -32,7 +30,6 @@ public static function table(
array $orderBy,
int $pageSize = 1000,
?int $maximum = null,
Row\EntryFactory $entryFactory = new Row\Factory\NativeEntryFactory()
) : self {
if (!\count($orderBy)) {
throw new InvalidArgumentException('There must be at least one column to order by, zero given');
Expand All @@ -51,7 +48,6 @@ public static function table(
$queryBuilder,
$pageSize,
$maximum,
$entryFactory
);
}

Expand Down Expand Up @@ -98,7 +94,7 @@ public function extract(FlowContext $context) : \Generator
}
}

yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Doctrine\DBAL\Types\Type;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;

final class DbalQueryExtractor implements Extractor
{
Expand All @@ -27,7 +26,6 @@ public function __construct(
private readonly string $query,
ParametersSet $parametersSet = null,
private readonly array $types = [],
private readonly Row\EntryFactory $entryFactory = new Row\Factory\NativeEntryFactory()
) {
$this->parametersSet = $parametersSet ?: new ParametersSet([]);
}
Expand All @@ -36,9 +34,9 @@ public function __construct(
* @param array<string, mixed>|list<mixed> $parameters
* @param array<int, null|int|string|Type>|array<string, null|int|string|Type> $types
*/
public static function single(Connection $connection, string $query, array $parameters = [], array $types = [], Row\EntryFactory $entryFactory = new Row\Factory\NativeEntryFactory()) : self
public static function single(Connection $connection, string $query, array $parameters = [], array $types = []) : self
{
return new self($connection, $query, new ParametersSet($parameters), $types, $entryFactory);
return new self($connection, $query, new ParametersSet($parameters), $types);
}

public function extract(FlowContext $context) : \Generator
Expand All @@ -50,7 +48,7 @@ public function extract(FlowContext $context) : \Generator
$rows[] = $row;
}

yield array_to_rows($rows, $this->entryFactory);
yield array_to_rows($rows, $context->entryFactory());
}
}
}
14 changes: 0 additions & 14 deletions src/adapter/etl-adapter-doctrine/src/Flow/ETL/DSL/Dbal.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor;
use Flow\ETL\Loader;
use Flow\ETL\Row\EntryFactory;
use Flow\ETL\Row\Factory\NativeEntryFactory;

class Dbal
{
Expand All @@ -47,7 +45,6 @@ final public static function dataframe_factory(
* @param array<OrderBy>|OrderBy $order_by
* @param int $page_size
* @param null|int $maximum
* @param EntryFactory $entry_factory
*
* @throws InvalidArgumentException
*
Expand All @@ -59,23 +56,20 @@ final public static function from_limit_offset(
array|OrderBy $order_by,
int $page_size = 1000,
?int $maximum = null,
EntryFactory $entry_factory = new NativeEntryFactory()
) : Extractor {
return DbalLimitOffsetExtractor::table(
$connection,
\is_string($table) ? new Table($table) : $table,
$order_by instanceof OrderBy ? [$order_by] : $order_by,
$page_size,
$maximum,
$entry_factory
);
}

/**
* @param Connection $connection
* @param int $page_size
* @param null|int $maximum
* @param EntryFactory $entry_factory
*
* @throws InvalidArgumentException
*
Expand All @@ -86,14 +80,12 @@ final public static function from_limit_offset_qb(
QueryBuilder $queryBuilder,
int $page_size = 1000,
?int $maximum = null,
EntryFactory $entry_factory = new NativeEntryFactory()
) : Extractor {
return new DbalLimitOffsetExtractor(
$connection,
$queryBuilder,
$page_size,
$maximum,
$entry_factory
);
}

Expand All @@ -102,7 +94,6 @@ final public static function from_limit_offset_qb(
* @param string $query
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
* @param array<int, null|int|string|Type>|array<string, null|int|string|Type> $types
* @param EntryFactory $entry_factory
*
* @return Extractor
*/
Expand All @@ -111,14 +102,12 @@ final public static function from_queries(
string $query,
ParametersSet $parameters_set = null,
array $types = [],
EntryFactory $entry_factory = new NativeEntryFactory()
) : Extractor {
return new DbalQueryExtractor(
$connection,
$query,
$parameters_set,
$types,
$entry_factory
);
}

Expand All @@ -127,7 +116,6 @@ final public static function from_queries(
* @param string $query
* @param array<string, mixed>|list<mixed> $parameters
* @param array<int, null|int|string|Type>|array<string, null|int|string|Type> $types
* @param EntryFactory $entry_factory
*
* @return Extractor
*/
Expand All @@ -136,14 +124,12 @@ final public static function from_query(
string $query,
array $parameters = [],
array $types = [],
EntryFactory $entry_factory = new NativeEntryFactory()
) : Extractor {
return DbalQueryExtractor::single(
$connection,
$query,
$parameters,
$types,
$entry_factory
);
}

Expand Down
Loading
Loading