diff --git a/.travis.yml b/.travis.yml index a6aa170..721e7cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,13 +11,14 @@ env: matrix: include: - - php: '7.2' - env: PREFER_LOWEST='--prefer-lowest' - php: '7.3' + env: PREFER_LOWEST='--prefer-lowest' - php: '7.4' env: SYMFONY_VERSION='~4.4.0' - php: '7.4' env: SYMFONY_VERSION='~5.0.0' + - php: '8.0' + env: SYMFONY_VERSION='~5.0.0' fast_finish: true before_install: diff --git a/composer.json b/composer.json index c11d430..40af64e 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ ], "require": { - "php": "^7.2", + "php": "^7.3 || ^8.0", "symfony/console": "^3.0 || ^4.0 || ^5.0", "symfony/dependency-injection": "^3.0 || ^4.1.12 || ^5.0", "symfony/process": "^3.0 || ^4.0 || ^5.0", diff --git a/src/Configuration.php b/src/Configuration.php new file mode 100644 index 0000000..b36cd05 --- /dev/null +++ b/src/Configuration.php @@ -0,0 +1,99 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use Webmozart\Assert\Assert; +use function ceil; +use function sprintf; + +final class Configuration +{ + private $segmentSize; + private $rounds; + private $batches; + + public function __construct( + bool $numberOfProcessesDefined, + int $numberOfProcesses, + int $numberOfItems, + int $segmentSize, + int $batchSize + ) { + Assert::greaterThan( + $numberOfProcesses, + 0, + sprintf( + 'Expected the number of processes to be 1 or greater. Got "%s"', + $numberOfProcesses + ) + ); + Assert::natural( + $numberOfItems, + sprintf( + 'Expected the number of items to be 0 or greater. Got "%s"', + $numberOfItems + ) + ); + Assert::greaterThan( + $segmentSize, + 0, + sprintf( + 'Expected the segment size to be 1 or greater. Got "%s"', + $segmentSize + ) + ); + Assert::greaterThan( + $batchSize, + 0, + sprintf( + 'Expected the batch size to be 1 or greater. Got "%s"', + $batchSize + ) + ); + + // We always check those (and not the calculated ones) since they come from the command + // configuration so an issue there hints on a misconfiguration which should be fixed. + Assert::greaterThanEq( + $segmentSize, + $batchSize, + sprintf( + 'Expected the segment size ("%s") to be greater or equal to the batch size ("%s")', + $segmentSize, + $batchSize + ) + ); + + $this->segmentSize = 1 === $numberOfProcesses && !$numberOfProcessesDefined + ? $numberOfItems + : $segmentSize + ; + $this->rounds = (int) (1 === $numberOfProcesses ? 1 : ceil($numberOfItems / $segmentSize)); + $this->batches = (int) (ceil($segmentSize / $batchSize) * $this->rounds); + } + + public function getSegmentSize(): int + { + return $this->segmentSize; + } + + public function getNumberOfSegments(): int + { + return $this->rounds; + } + + public function getNumberOfBatches(): int + { + return $this->batches; + } +} diff --git a/src/ItemBatchIterator.php b/src/ItemBatchIterator.php new file mode 100644 index 0000000..5dced0c --- /dev/null +++ b/src/ItemBatchIterator.php @@ -0,0 +1,131 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use Closure; +use Webmozart\Assert\Assert; +use function array_chunk; +use function array_values; +use function count; +use function get_class; +use function gettype; +use function is_numeric; +use function is_object; +use function sprintf; + +final class ItemBatchIterator +{ + private $items; + private $numberOfItems; + private $batchSize; + private $itemsChunks; + + /** + * @param Closure(): list $fetchItems + */ + public static function create(?string $item, Closure $fetchItems, int $batchSize): self + { + if (null !== $item) { + $items = [$item]; + } else { + $items = $fetchItems(); + + Assert::isArray( + $items, + sprintf( + 'Expected the fetched items to be a list of strings. Got "%s"', + gettype($items) + ) + ); + } + + return new self($items, $batchSize); + } + + /** + * @return list + */ + private static function normalizeItems($items): array + { + foreach ($items as $index => $item) { + if (is_numeric($item)) { + $items[$index] = (string) $item; + + continue; + } + + Assert::string( + $item, + sprintf( + 'The items are potentially passed to the child processes via the STDIN. For this reason they are expected to be string values. Got "%s" for the item "%s"', + is_object($item) ? get_class($item) : gettype($item), + $index + ) + ); + } + + return array_values($items); + } + + /** + * @param list $items + * @param int $batchSize + */ + public function __construct(array $items, int $batchSize) + { + Assert::greaterThan( + $batchSize, + 0, + sprintf( + 'Expected the batch size to be 1 or greater. Got "%s"', + $batchSize + ) + ); + + $this->items = self::normalizeItems($items); + $this->itemsChunks = array_chunk( + $this->items, + $batchSize, + false + ); + $this->numberOfItems = count($this->items); + $this->batchSize = $batchSize; + } + + /** + * @return list + */ + public function getItems(): array + { + return $this->items; + } + + public function getNumberOfItems(): int + { + return $this->numberOfItems; + } + + public function getBatchSize(): int + { + return $this->batchSize; + } + + /** + * @return array> + */ + public function getItemBatches(): array + { + return $this->itemsChunks; + } +} diff --git a/src/Parallelization.php b/src/Parallelization.php index 24ae1d2..8fe9287 100644 --- a/src/Parallelization.php +++ b/src/Parallelization.php @@ -18,22 +18,26 @@ use function array_filter; use function array_merge; use function array_slice; +use function explode; +use function getcwd; use function implode; use RuntimeException; +use function realpath; use function sprintf; +use function stream_get_contents; +use const PHP_EOL; use const STDIN; use Symfony\Component\Console\Application; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Helper\ProgressBar; -use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Terminal; use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\ResettableContainerInterface; use Symfony\Component\Process\PhpExecutableFinder; use Symfony\Contracts\Service\ResetInterface; +use Symfony\Component\Process\Process; use Throwable; use function trim; use Webmozart\Assert\Assert; @@ -92,23 +96,6 @@ private static function detectPhpExecutable(): string return $php; } - /** - * Returns the environment variables that are passed to the child processes. - * - * @param ContainerInterface $container The service containers - * - * @return string[] A list of environment variable names and values - */ - private static function getEnvironmentVariables(ContainerInterface $container): array - { - return [ - 'PATH' => getenv('PATH'), - 'HOME' => getenv('HOME'), - 'SYMFONY_DEBUG' => $container->getParameter('kernel.debug'), - 'SYMFONY_ENV' => $container->getParameter('kernel.environment'), - ]; - } - /** * Returns the working directory for the child process. * @@ -118,7 +105,7 @@ private static function getEnvironmentVariables(ContainerInterface $container): */ private static function getWorkingDirectory(ContainerInterface $container): string { - return dirname($container->getParameter('kernel.root_dir')); + return dirname($container->getParameter('kernel.project_dir')); } /** @@ -128,26 +115,7 @@ private static function getWorkingDirectory(ContainerInterface $container): stri */ protected static function configureParallelization(Command $command): void { - $command - ->addArgument( - 'item', - InputArgument::OPTIONAL, - 'The item to process' - ) - ->addOption( - 'processes', - 'p', - InputOption::VALUE_OPTIONAL, - 'The number of parallel processes to run', - null - ) - ->addOption( - 'child', - null, - InputOption::VALUE_NONE, - 'Set on child processes' - ) - ; + ParallelizationInput::configureParallelization($command); } /** @@ -206,6 +174,23 @@ abstract protected function runSingleCommand( */ abstract protected function getItemName(int $count): string; + /** + * Returns the environment variables that are passed to the child processes. + * + * @param ContainerInterface $container The service containers + * + * @return string[] A list of environment variable names and values + */ + protected function getEnvironmentVariables(ContainerInterface $container): array + { + return [ + 'PATH' => getenv('PATH'), + 'HOME' => getenv('HOME'), + 'SYMFONY_DEBUG' => $container->getParameter('kernel.debug'), + 'SYMFONY_ENV' => $container->getParameter('kernel.environment'), + ]; + } + /** * Method executed at the very beginning of the master process. */ @@ -278,13 +263,15 @@ protected function getBatchSize(): int */ protected function execute(InputInterface $input, OutputInterface $output): int { - if ($input->getOption('child')) { + $parallelizationInput = new ParallelizationInput($input); + + if ($parallelizationInput->isChildProcess()) { $this->executeChildProcess($input, $output); return 0; } - $this->executeMasterProcess($input, $output); + $this->executeMasterProcess($parallelizationInput, $input, $output); return 0; } @@ -297,73 +284,64 @@ protected function execute(InputInterface $input, OutputInterface $output): int * items of the processed data set and terminates. As long as there is data * left to process, new child processes are spawned automatically. */ - protected function executeMasterProcess(InputInterface $input, OutputInterface $output): void - { + protected function executeMasterProcess( + ParallelizationInput $parallelizationInput, + InputInterface $input, + OutputInterface $output + ): void { $this->runBeforeFirstCommand($input, $output); - $numberOfProcessesDefined = null !== $input->getOption('processes'); - $numberOfProcesses = $numberOfProcessesDefined ? (int) $input->getOption('processes') : 1; - $hasItem = (bool) $input->getArgument('item'); - $items = $hasItem ? [$input->getArgument('item')] : $this->fetchItems($input); - $count = count($items); - $segmentSize = 1 === $numberOfProcesses && !$numberOfProcessesDefined ? $count : $this->getSegmentSize(); - $batchSize = $this->getBatchSize(); - $rounds = 1 === $numberOfProcesses ? 1 : ceil($count * 1.0 / $segmentSize); - $batches = ceil($segmentSize * 1.0 / $batchSize) * $rounds; - - Assert::greaterThan( + $isNumberOfProcessesDefined = $parallelizationInput->isNumberOfProcessesDefined(); + $numberOfProcesses = $parallelizationInput->getNumberOfProcesses(); + + $itemBatchIterator = ItemBatchIterator::create( + $parallelizationInput->getItem(), + function () use ($input) { + return $this->fetchItems($input); + }, + $this->getBatchSize() + ); + + $numberOfItems = $itemBatchIterator->getNumberOfItems(); + $batchSize = $itemBatchIterator->getBatchSize(); + + $config = new Configuration( + $isNumberOfProcessesDefined, $numberOfProcesses, - 0, - sprintf( - 'Requires at least one process. Got "%s"', - $input->getOption('processes') - ) + $numberOfItems, + $this->getSegmentSize(), + $batchSize ); - if (!$hasItem && 1 !== $numberOfProcesses) { - // Shouldn't check this when only one item has been specified or - // when no child processes is used - Assert::greaterThanEq( - $segmentSize, - $batchSize, - sprintf( - 'The segment size should always be greater or equal to ' - .'the batch size. Got respectively "%d" and "%d"', - $segmentSize, - $batchSize - ) - ); - } + $segmentSize = $config->getSegmentSize(); + $numberOfSegments = $config->getNumberOfSegments(); + $numberOfBatches = $config->getNumberOfBatches(); $output->writeln(sprintf( 'Processing %d %s in segments of %d, batches of %d, %d %s, %d %s in %d %s', - $count, - $this->getItemName($count), + $numberOfItems, + $this->getItemName($numberOfItems), $segmentSize, $batchSize, - $rounds, - 1 === $rounds ? 'round' : 'rounds', - $batches, - 1 === $batches ? 'batch' : 'batches', + $numberOfSegments, + 1 === $numberOfSegments ? 'round' : 'rounds', + $numberOfBatches, + 1 === $numberOfBatches ? 'batch' : 'batches', $numberOfProcesses, 1 === $numberOfProcesses ? 'process' : 'processes' )); $output->writeln(''); - $progressBar = new ProgressBar($output, $count); + $progressBar = new ProgressBar($output, $numberOfItems); $progressBar->setFormat('debug'); $progressBar->start(); - if ($count <= $segmentSize || (1 === $numberOfProcesses && !$numberOfProcessesDefined)) { + if ($numberOfItems <= $segmentSize + || (1 === $numberOfProcesses && !$parallelizationInput->isNumberOfProcessesDefined()) + ) { // Run in the master process - $itemsChunks = array_chunk( - $items, - $this->getBatchSize(), - false - ); - - foreach ($itemsChunks as $items) { + foreach ($itemBatchIterator->getItemBatches() as $items) { $this->runBeforeBatch($input, $output, $items); foreach ($items as $item) { @@ -382,25 +360,27 @@ protected function executeMasterProcess(InputInterface $input, OutputInterface $ sprintf('The bin/console file could not be found at %s', getcwd())) ; - $commandTemplate = implode( - ' ', - array_merge( - array_filter([ - self::detectPhpExecutable(), - $consolePath, - $this->getName(), - implode(' ', array_slice($input->getArguments(), 1)), - '--child', - ]), - $this->serializeInputOptions($input, ['child', 'processes']) - ) + $commandTemplate = array_merge( + array_filter([ + self::detectPhpExecutable(), + $consolePath, + $this->getName(), + implode(' ', array_slice($input->getArguments(), 1)), + '--child', + ]), + $this->serializeInputOptions($input, ['child', 'processes']) ); + $terminalWidth = (new Terminal())->getWidth(); + // @TODO: can be removed once ProcessLauncher accepts command arrays + $tempProcess = new Process($commandTemplate); + $commandString = $tempProcess->getCommandLine(); + $processLauncher = new ProcessLauncher( - $commandTemplate, + $commandString, self::getWorkingDirectory($this->getContainer()), - self::getEnvironmentVariables($this->getContainer()), + $this->getEnvironmentVariables($this->getContainer()), $numberOfProcesses, $segmentSize, $this->getContainer()->get('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE), @@ -409,7 +389,7 @@ function (string $type, string $buffer) use ($progressBar, $output, $terminalWid } ); - $processLauncher->run($items); + $processLauncher->run($itemBatchIterator->getItems()); } $progressBar->finish(); @@ -418,13 +398,13 @@ function (string $type, string $buffer) use ($progressBar, $output, $terminalWid $output->writeln(''); $output->writeln(sprintf( 'Processed %d %s.', - $count, - $this->getItemName($count) + $numberOfItems, + $this->getItemName($numberOfItems) )); $this->runAfterLastCommand($input, $output); } - + /** * Get the path of the executable Symfony bin console. */ @@ -445,7 +425,7 @@ protected function executeChildProcess( ): void { $advancementChar = self::getProgressSymbol(); - $itemsChunks = array_chunk( + $itemBatchIterator = new ItemBatchIterator( array_filter( explode( PHP_EOL, @@ -455,7 +435,7 @@ protected function executeChildProcess( $this->getBatchSize() ); - foreach ($itemsChunks as $items) { + foreach ($itemBatchIterator->getItemBatches() as $items) { $this->runBeforeBatch($input, $output, $items); foreach ($items as $item) { @@ -527,12 +507,14 @@ private function runTolerantSingleCommand( } } } - + /** * @param string[] $blackListParams + * * @return string[] */ - private function serializeInputOptions(InputInterface $input, array $blackListParams) : array { + private function serializeInputOptions(InputInterface $input, array $blackListParams): array + { $options = array_diff_key( array_filter($input->getOptions()), array_fill_keys($blackListParams, '') @@ -543,19 +525,20 @@ private function serializeInputOptions(InputInterface $input, array $blackListPa $definition = $this->getDefinition(); $option = $definition->getOption($name); - $optionString = ""; + $optionString = ''; if (!$option->acceptValue()) { - $optionString .= ' --' . $name; + $optionString .= '--'.$name; } elseif ($option->isArray()) { foreach ($value as $arrayValue) { - $optionString .= ' --'.$name.'='.$this->quoteOptionValue($arrayValue); + $optionString .= '--'.$name.'='.$this->quoteOptionValue($arrayValue); } } else { - $optionString .= ' --'.$name.'='.$this->quoteOptionValue($value); + $optionString .= '--'.$name.'='.$this->quoteOptionValue($value); } $preparedOptionList[] = $optionString; } + return $preparedOptionList; } diff --git a/src/ParallelizationInput.php b/src/ParallelizationInput.php new file mode 100644 index 0000000..20a29d3 --- /dev/null +++ b/src/ParallelizationInput.php @@ -0,0 +1,129 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Webmozart\Assert\Assert; +use function is_numeric; +use function sprintf; + +final class ParallelizationInput +{ + private const ITEM_ARGUMENT = 'item'; + private const PROCESSES_OPTION = 'processes'; + private const CHILD_OPTION = 'child'; + + private $numberOfProcessesDefined; + private $numberOfProcesses = 1; + private $item; + private $childProcess; + + /** + * Adds the command configuration specific to parallelization. + * + * Call this method in your configure() method. + */ + public static function configureParallelization(Command $command): void + { + $command + ->addArgument( + self::ITEM_ARGUMENT, + InputArgument::OPTIONAL, + 'The item to process' + ) + ->addOption( + self::PROCESSES_OPTION, + 'p', + InputOption::VALUE_OPTIONAL, + 'The number of parallel processes to run', + null + ) + ->addOption( + self::CHILD_OPTION, + null, + InputOption::VALUE_NONE, + 'Set on child processes' + ) + ; + } + + public function __construct(InputInterface $input) + { + /** @var string|null $processes */ + $processes = $input->getOption(self::PROCESSES_OPTION); + + $this->numberOfProcessesDefined = null !== $processes; + + if ($this->numberOfProcessesDefined) { + Assert::numeric( + $processes, + sprintf( + 'Expected the number of process defined to be a valid numeric value. Got "%s"', + $processes + ) + ); + + $this->numberOfProcesses = (int) $processes; + + Assert::same( + // We cast it again in string to make sure since it is more convenient to pass an + // int in the tests or when calling the command directly without passing by the CLI + (string) $processes, + (string) $this->numberOfProcesses, + sprintf( + 'Expected the number of process defined to be an integer. Got "%s"', + $processes + ) + ); + } + + /** @var string|null $item */ + $item = $input->getArgument(self::ITEM_ARGUMENT); + + $hasItem = null !== $item; + + if ($hasItem && !is_numeric($item)) { + // Safeguard in case an invalid type is accidentally passed in tests when invoking the + // command directly + Assert::string($item); + } + + $this->item = $hasItem ? (string) $item : null; + + $this->childProcess = (bool) $input->getOption('child'); + } + + public function isNumberOfProcessesDefined(): bool + { + return $this->numberOfProcessesDefined; + } + + public function getNumberOfProcesses(): int + { + return $this->numberOfProcesses; + } + + public function getItem(): ?string + { + return $this->item; + } + + public function isChildProcess(): bool + { + return $this->childProcess; + } +} diff --git a/src/ProcessLauncher.php b/src/ProcessLauncher.php index 82eef3e..ddd22bc 100644 --- a/src/ProcessLauncher.php +++ b/src/ProcessLauncher.php @@ -49,7 +49,7 @@ class ProcessLauncher private $runningProcesses = []; public function __construct( - string $command, + string $command, // @TODO change to array for 2.0 string $workingDirectory, array $environmentVariables, int $processLimit, @@ -128,16 +128,26 @@ public function run(array $items): void */ private function startProcess(InputStream $inputStream): void { - $process = new Process( + $arguments = [ $this->command, $this->workingDirectory, $this->environmentVariables, null, null - ); + ]; + + if(method_exists(Process::class, 'fromShellCommandline')) { + // Symfony >= 4.2 workaround as Symfony 5 requires `Process` to be initiated with an array + // @TODO: can be removed once $this->command was changed to an array + $process = Process::fromShellCommandline(...$arguments); + } else { + $process = new Process(...$arguments); + } $process->setInput($inputStream); - $process->inheritEnvironmentVariables(true); + if(method_exists($process, 'inheritEnvironmentVariables')) { + $process->inheritEnvironmentVariables(true); + } $process->start($this->callback); $this->logger->debug('Command started: '.$this->command); diff --git a/tests/ConfigurationTest.php b/tests/ConfigurationTest.php new file mode 100644 index 0000000..33f684e --- /dev/null +++ b/tests/ConfigurationTest.php @@ -0,0 +1,329 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use InvalidArgumentException; +use PHPUnit\Framework\TestCase; +use function func_get_args; + +/** + * @covers \Webmozarts\Console\Parallelization\Configuration + */ +final class ConfigurationTest extends TestCase +{ + /** + * @dataProvider valuesProvider + */ + public function test_it_can_be_instantiated( + bool $numberOfProcessesDefined, + int $numberOfProcesses, + int $numberOfItems, + int $segmentSize, + int $batchSize, + int $expectedSegmentSize, + int $expectedNumberOfSegments, + int $expectedNumberOfBatches + ): void { + $config = new Configuration( + $numberOfProcessesDefined, + $numberOfProcesses, + $numberOfItems, + $segmentSize, + $batchSize + ); + + $this->assertSame($expectedSegmentSize, $config->getSegmentSize()); + $this->assertSame($expectedNumberOfSegments, $config->getNumberOfSegments()); + $this->assertSame($expectedNumberOfBatches, $config->getNumberOfBatches()); + } + + /** + * @dataProvider invalidValuesProvider + */ + public function test_it_cannot_be_instantiated_with_invalid_values( + int $numberOfProcesses, + int $numberOfItems, + int $segmentSize, + int $batchSize, + string $expectedErrorMessage + ): void { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage($expectedErrorMessage); + + new Configuration( + true, + $numberOfProcesses, + $numberOfItems, + $segmentSize, + $batchSize + ); + } + + public static function valuesProvider(): iterable + { + yield 'empty' => self::createInputArgs( + false, + 1, + 0, + 1, + 1, + 0, + 1, + 1 + ); + + yield 'only one default process: the segment size is the number of items' => self::createInputArgs( + false, + 1, + 50, + 1, + 1, + 50, + 1, + 1 + ); + + yield 'an arbitrary number of processes given: the segment size is the segment size given' => self::createInputArgs( + true, + 7, + 50, + 3, + 1, + 3, + 17, + 51 + ); + + yield 'one process given: the segment size is the segment size given' => self::createInputArgs( + true, + 1, + 50, + 3, + 1, + 3, + 1, + 3 + ); + + // Invalid domain case but we add this test to capture this behaviour nonetheless + yield 'multiple default processes: the segment size is the segment size given' => self::createInputArgs( + true, + 7, + 50, + 3, + 1, + 3, + 17, + 51 + ); + + yield 'there is no rounds if there is no items' => self::createInputArgs( + false, + 1, + 0, + 1, + 1, + 0, + 1, + 1 + ); + + yield 'there is only one round if only one process (default)' => self::createInputArgs( + false, + 1, + 50, + 1, + 1, + 50, + 1, + 1 + ); + + yield 'there is only one round if only one process (arbitrary)' => self::createInputArgs( + true, + 1, + 50, + 1, + 1, + 1, + 1, + 1 + ); + + yield 'there is enough rounds to reach the number of items with the given segment size (half)' => self::createInputArgs( + true, + 2, + 50, + 25, + 1, + 25, + 2, + 50 + ); + + yield 'there is enough rounds to reach the number of items with the given segment size (upper)' => self::createInputArgs( + true, + 2, + 50, + 15, + 1, + 15, + 4, + 60 + ); + + yield 'there is enough rounds to reach the number of items with the given segment size (lower)' => self::createInputArgs( + true, + 2, + 50, + 40, + 1, + 40, + 2, + 80 + ); + + yield 'the batch size used is the batch size given' => self::createInputArgs( + false, + 1, + 0, + 10, + 7, + 0, + 1, + 2 + ); + + yield 'there is enough batches to process all the items of a given segment (half)' => self::createInputArgs( + true, + 2, + 50, + 30, + 15, + 30, + 2, + 4 + ); + + yield 'there is enough batches to process all the items of a given segment (upper)' => self::createInputArgs( + true, + 2, + 50, + 30, + 10, + 30, + 2, + 6 + ); + + yield 'there is enough batches to process all the items of a given segment (lower)' => self::createInputArgs( + true, + 2, + 50, + 30, + 25, + 30, + 2, + 4 + ); + } + + public static function invalidValuesProvider(): iterable + { + yield 'invalid number of processes (limit)' => [ + 0, + 0, + 1, + 1, + 'Expected the number of processes to be 1 or greater. Got "0"', + ]; + + yield 'invalid number of processes' => [ + -1, + 0, + 1, + 1, + 'Expected the number of processes to be 1 or greater. Got "-1"', + ]; + + yield 'invalid number of items (limit)' => [ + 1, + -1, + 1, + 1, + 'Expected the number of items to be 0 or greater. Got "-1"', + ]; + + yield 'invalid number of items' => [ + 1, + -10, + 1, + 1, + 'Expected the number of items to be 0 or greater. Got "-10"', + ]; + + yield 'invalid segment size (limit)' => [ + 1, + 0, + 0, + 1, + 'Expected the segment size to be 1 or greater. Got "0"', + ]; + + yield 'invalid segment size' => [ + 1, + 0, + -1, + 1, + 'Expected the segment size to be 1 or greater. Got "-1"', + ]; + + yield 'invalid batch size (limit)' => [ + 1, + 0, + 1, + 0, + 'Expected the batch size to be 1 or greater. Got "0"', + ]; + + yield 'invalid batch size' => [ + 1, + 0, + 1, + -1, + 'Expected the batch size to be 1 or greater. Got "-1"', + ]; + + yield 'segment size lower than batch size' => [ + 1, + 0, + 1, + 10, + 'Expected the segment size ("1") to be greater or equal to the batch size ("10")', + ]; + } + + private static function createInputArgs( + bool $numberOfProcessesDefined, + int $numberOfProcesses, + int $numberOfItems, + int $segmentSize, + int $batchSize, + int $expectedSegmentSize, + int $expectedNumberOfSegments, + int $expectedNumberOfBatches + ): array { + return func_get_args(); + } +} diff --git a/tests/ItemBatchIteratorTest.php b/tests/ItemBatchIteratorTest.php new file mode 100644 index 0000000..aa0d8b9 --- /dev/null +++ b/tests/ItemBatchIteratorTest.php @@ -0,0 +1,224 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use Closure; +use InvalidArgumentException; +use LogicException; +use PHPUnit\Framework\TestCase; +use stdClass; + +final class ItemBatchIteratorTest extends TestCase +{ + /** + * @dataProvider valuesProvider + * + * @param list $expectedItems + * @param array> $expectedBatches + */ + public function test_it_can_be_instantiated( + array $items, + int $batchSize, + array $expectedItems, + int $expectedNumberOfItems, + array $expectedBatches + ): void { + $iterator = new ItemBatchIterator($items, $batchSize); + + $this->assertBatchItemIteratorStateIs( + $iterator, + $expectedItems, + $expectedNumberOfItems, + $batchSize, + $expectedBatches + ); + } + + /** + * @dataProvider inputProvider + * + * @param Closure(): list $fetchItems + * @param list $expectedItems + * @param array> $expectedBatches + */ + public function test_it_can_be_created_from_an_input( + ?string $item, + Closure $fetchItems, + int $batchSize, + array $expectedItems, + int $expectedNumberOfItems, + array $expectedBatches + ): void { + $iterator = ItemBatchIterator::create($item, $fetchItems, $batchSize); + + $this->assertBatchItemIteratorStateIs( + $iterator, + $expectedItems, + $expectedNumberOfItems, + $batchSize, + $expectedBatches + ); + } + + public function test_it_validates_the_items_provided_by_the_closure(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Expected the fetched items to be a list of strings. Got "object"'); + + ItemBatchIterator::create( + null, + static function () { + yield from []; + }, + 1 + ); + } + + /** + * @dataProvider invalidValuesProvider + */ + public function test_it_cannot_be_instantiated_with_invalid_data( + array $items, + int $batchSize, + string $expectedErrorMessage + ): void { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage($expectedErrorMessage); + + new ItemBatchIterator($items, $batchSize); + } + + public static function valuesProvider(): iterable + { + yield 'nominal' => [ + ['item0', 'item1', 'item3', 'item4'], + 2, + ['item0', 'item1', 'item3', 'item4'], + 4, + [ + ['item0', 'item1'], + ['item3', 'item4'], + ], + ]; + + yield 'numerical items – items are casted to strings' => [ + ['string item', 10, .5, 0x1A, 0b11111111], + 10, + ['string item', '10', '0.5', '26', '255'], + 5, + [ + ['string item', '10', '0.5', '26', '255'], + ], + ]; + + yield 'less items than batch size' => [ + ['item0', 'item1', 'item3'], + 4, + ['item0', 'item1', 'item3'], + 3, + [ + ['item0', 'item1', 'item3'], + ], + ]; + + yield 'same number of items as batch size' => [ + ['item0', 'item1', 'item3'], + 3, + ['item0', 'item1', 'item3'], + 3, + [ + ['item0', 'item1', 'item3'], + ], + ]; + + yield 'more items than batch size' => [ + ['item0', 'item1', 'item3'], + 2, + ['item0', 'item1', 'item3'], + 3, + [ + ['item0', 'item1'], + ['item3'], + ], + ]; + } + + public static function inputProvider(): iterable + { + yield 'one item: the fetch item closure is not evaluated' => [ + 'item0', + self::createFakeClosure(), + 1, + ['item0'], + 1, + [ + ['item0'], + ], + ]; + + yield 'no item: the fetch item closure is evaluated' => [ + null, + static function (): array { + return ['item0', 'item1']; + }, + 2, + ['item0', 'item1'], + 2, + [ + ['item0', 'item1'], + ], + ]; + } + + public static function invalidValuesProvider(): iterable + { + yield 'invalid batch size' => [ + [], + -1, + 'Expected the batch size to be 1 or greater. Got "-1"', + ]; + + yield 'invalid batch size (limit)' => [ + [], + 0, + 'Expected the batch size to be 1 or greater. Got "0"', + ]; + + yield 'invalid item type' => [ + [new stdClass()], + 1, + 'The items are potentially passed to the child processes via the STDIN. For this reason they are expected to be string values. Got "stdClass"', + ]; + } + + private static function createFakeClosure(): Closure + { + return static function () { + throw new LogicException('Did not expect to be called'); + }; + } + + private function assertBatchItemIteratorStateIs( + ItemBatchIterator $iterator, + array $expectedItems, + int $expectedNumberOfItems, + int $expectedBatchSize, + array $expectedBatches + ): void { + $this->assertSame($expectedItems, $iterator->getItems()); + $this->assertSame($expectedNumberOfItems, $iterator->getNumberOfItems()); + $this->assertSame($expectedBatchSize, $iterator->getBatchSize()); + $this->assertSame($expectedBatches, $iterator->getItemBatches()); + } +} diff --git a/tests/ParallelizationInputTest.php b/tests/ParallelizationInputTest.php new file mode 100644 index 0000000..fd8c5dc --- /dev/null +++ b/tests/ParallelizationInputTest.php @@ -0,0 +1,174 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Webmozarts\Console\Parallelization; + +use InvalidArgumentException; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\ArrayInput; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\StringInput; + +/** + * @covers \Webmozarts\Console\Parallelization\ParallelizationInput + */ +final class ParallelizationInputTest extends TestCase +{ + public function test_it_can_configure_a_command(): void + { + $command = new Command(); + + $initialDefinition = $command->getDefinition(); + + // Sanity check + $this->assertFalse($initialDefinition->hasArgument('item')); + $this->assertFalse($initialDefinition->hasOption('processes')); + $this->assertFalse($initialDefinition->hasOption('child')); + + ParallelizationInput::configureParallelization($command); + + $configuredDefinition = $command->getDefinition(); + + $this->assertTrue($configuredDefinition->hasArgument('item')); + $this->assertTrue($configuredDefinition->hasOption('processes')); + $this->assertTrue($configuredDefinition->hasOption('child')); + } + + /** + * @dataProvider inputProvider + */ + public function test_it_can_be_instantiated( + InputInterface $input, + bool $expectedIsNumberOfProcessesDefined, + int $expectedNumberOfProcesses, + ?string $expectedItem, + bool $expectedIsChildProcess + ): void { + self::bindInput($input); + + $parallelizationInput = new ParallelizationInput($input); + + $this->assertSame( + $expectedIsNumberOfProcessesDefined, + $parallelizationInput->isNumberOfProcessesDefined() + ); + $this->assertSame($expectedNumberOfProcesses, $parallelizationInput->getNumberOfProcesses()); + $this->assertSame($expectedItem, $parallelizationInput->getItem()); + $this->assertSame($expectedIsChildProcess, $parallelizationInput->isChildProcess()); + } + + /** + * @dataProvider invalidNumberOfProcessesProvider + */ + public function test_it_cannot_pass_an_invalid_number_of_processes( + InputInterface $input, + string $expectedErrorMessage + ): void { + self::bindInput($input); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage($expectedErrorMessage); + + new ParallelizationInput($input); + } + + public static function inputProvider(): iterable + { + yield 'empty input' => [ + new StringInput(''), + false, + 1, + null, + false, + ]; + + yield 'number of process defined: 1' => [ + new StringInput('--processes=1'), + true, + 1, + null, + false, + ]; + + yield 'number of process defined: 4' => [ + new StringInput('--processes=4'), + true, + 4, + null, + false, + ]; + + yield 'item passed' => [ + new StringInput('item15'), + false, + 1, + 'item15', + false, + ]; + + yield 'integer item passed' => [ + new ArrayInput(['item' => 10]), + false, + 1, + '10', + false, + ]; + + yield 'float item passed' => [ + new ArrayInput(['item' => -.5]), + false, + 1, + '-0.5', + false, + ]; + + yield 'child option' => [ + new StringInput('--child'), + false, + 1, + null, + true, + ]; + + yield 'nominal' => [ + new StringInput('item15 --child --processes 15'), + true, + 15, + 'item15', + true, + ]; + } + + public static function invalidNumberOfProcessesProvider(): iterable + { + yield 'non numeric value' => [ + new StringInput('--processes foo'), + 'Expected the number of process defined to be a valid numeric value. Got "foo"', + ]; + + yield 'non integer value' => [ + new StringInput('--processes 1.5'), + 'Expected the number of process defined to be an integer. Got "1.5"', + ]; + } + + private static function bindInput(InputInterface $input): void + { + $command = new Command(); + + ParallelizationInput::configureParallelization($command); + + $input->bind($command->getDefinition()); + } +} diff --git a/tests/ParallelizationIntegrationTest.php b/tests/ParallelizationIntegrationTest.php index 62bea7f..6f72e33 100644 --- a/tests/ParallelizationIntegrationTest.php +++ b/tests/ParallelizationIntegrationTest.php @@ -78,7 +78,7 @@ public function test_it_can_run_the_command_without_sub_processes(): void if ($this->isSymfony3()) { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 2, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 2, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 1/2 [==============>-------------] 50% < 1 sec/< 1 sec 10.0 MiB @@ -94,7 +94,7 @@ public function test_it_can_run_the_command_without_sub_processes(): void } else { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 2, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 2, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 2/2 [============================] 100% < 1 sec/< 1 sec 10.0 MiB @@ -124,7 +124,7 @@ public function test_it_can_run_the_command_with_a_single_sub_processes(): void if ($this->isSymfony3()) { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 1/2 [==============>-------------] 50% < 1 sec/< 1 sec 10.0 MiB @@ -140,7 +140,7 @@ public function test_it_can_run_the_command_with_a_single_sub_processes(): void } else { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 2/2 [============================] 100% < 1 sec/< 1 sec 10.0 MiB @@ -170,7 +170,7 @@ public function test_it_can_run_the_command_with_multiple_processes(): void if ($this->isSymfony3()) { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 rounds, 1 batches in 2 processes +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 2 processes 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 1/2 [==============>-------------] 50% < 1 sec/< 1 sec 10.0 MiB @@ -186,7 +186,7 @@ public function test_it_can_run_the_command_with_multiple_processes(): void } else { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 rounds, 1 batches in 2 processes +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 2 processes 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 2/2 [============================] 100% < 1 sec/< 1 sec 10.0 MiB @@ -216,7 +216,7 @@ public function test_it_can_run_the_command_with_one_process_as_child_process(): if ($this->isSymfony3()) { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 1/2 [==============>-------------] 50% < 1 sec/< 1 sec 10.0 MiB @@ -232,7 +232,7 @@ public function test_it_can_run_the_command_with_one_process_as_child_process(): } else { $this->assertSame( <<<'EOF' -Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batches in 1 process +Processing 2 movies in segments of 50, batches of 50, 1 round, 1 batch in 1 process 0/2 [>---------------------------] 0% < 1 sec/< 1 sec 10.0 MiB 2/2 [============================] 100% < 1 sec/< 1 sec 10.0 MiB