Skip to content

Commit

Permalink
Merge pull request #6 from krakphp/v0.3
Browse files Browse the repository at this point in the history
Overall updates to 0.3.0 system
  • Loading branch information
ragboyjr authored Mar 17, 2017
2 parents e236a32 + c1142c9 commit 457f7ce
Show file tree
Hide file tree
Showing 29 changed files with 364 additions and 409 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [0.3.0] - 2017-03-16
### Added

- Tight Cargo Integration, removed pimple integration
- Kernel implements Dispatch interface
- Simple `complete` and `failed` methods to return specific job results
- New FailJob stacks to properly handle the failing of jobs.
- Consume file locking to allow for scheduled invoking of the `job:consumer` command.
- New default Stub QueueManager

### Changed

- Updated ttl behavior to not actually kill the loop until the queue is empty.
- Updated dependencies
- Removed `Queue::fail` as to move that logic to the FailJob module.

## [0.2.0] - 2017-01-19
### Added

Expand Down
135 changes: 61 additions & 74 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,38 @@ Install with composer at `krak/job`.

### Create the Kernel

The kernel is the core manager of the Job library. It holds the configuration and acts like a factory for the various components.
The kernel is the core manager of the Job library. It's simply a Cargo\\Container decorator with helper methods.

```php
<?php

$kernel = Krak\Job\createKernel(new \Predis\Client());
$kernel = new Krak\Job\Kernel();
```

You can also pass an optional Container instance if you want to use a special container.

#### Configuring the Kernel

You can configure the kernel by wrapping any of the services defined in the container. In addition to the configuration provided by the Cargo\\Container, the kernel provides helper methods to ease customization.

```php
// configure the scheduling loop
$kernel->config([
'name' => 'Jobs Queue',
'sleep' => 10,
'ttl' => 50,
'max_jobs' => 10,
'max_retry' => 3,
]);
// configure the queue manager
$kernel->queueManager(function($qm, $c) {
return Krak\Job\createQueueManager(new Predis\Client());
});
// configure the consumer stack
$kernel->wrap('krak.job.pipeline.consumer', function($consumer) {
return $consumer->push(myConsumer());
});
// and so on...
```

### Define a Job
Expand All @@ -36,6 +62,7 @@ Every Job must implement the empty interface `Krak\Job\Job` and have a `handle`
namespace Acme\Jobs;

use Krak\Job\Job;
use Acme\ServiceA;

class ProcessJob implements Job
{
Expand All @@ -45,21 +72,25 @@ class ProcessJob implements Job
$this->id = $id;
}

public function handle() {
public function handle(ServiceA $service) {
process($this->id);
}
}
```

Arguments will automatically wired into the handle method using the AutoArgs package. The Job instance will be serialized, so make sure that the properties of the Job are serializable. It'd also be a good idea to keep the amount of data in a job as small as possible.

### Dispatch a Job

Dispatching jobs is easy using the `Krak\Job\Dispath`.
Dispatching jobs is easy using the `Krak\Job\Dispatch`.

```php
<?php

use Krak\Job;

// use the kernel to create a dispatch instance
$dispatch = $kernel->createDispatch();
$dispatch = $kernel['dispatch']; // or $kernel[Job\Dispatch::class];

$dispatch->wrap(new Acme\Jobs\ProcessJob(1))
->onQueue('process') // this is optional
Expand All @@ -71,17 +102,7 @@ $dispatch->wrap(new Acme\Jobs\ProcessJob(1))

In order to start consuming jobs, you need to do a few things:

1. Create your jobs.yml file to configure the scheduling of the queues.

```yaml
name: "Jobs Scheduler"
queue: "process"
ttl: 120 # optional, the max time the scheduler will run for before exiting
sleep: 30 # sleep for 30 seconds after every loop
max_jobs: 10 # optional, the max number of jobs to run at once
```
2. Register the Commands with your Symfony/Laravel Console Application
1. Register the Commands with your Symfony Console Application

```php
<?php
Expand All @@ -96,7 +117,7 @@ In order to start consuming jobs, you need to do a few things:
3. Start the consumer

```bash
./bin/console job:consume jobs.yml -vvv
./bin/console job:consume -vvv
```

You can change the verbosity level to suite your needs
Expand Down Expand Up @@ -137,69 +158,35 @@ The Dispatch is a very simple class/interface designed to wrap Job's into `Wrapp

### Queue

The Queuing module handles the actual queueing implementations.

## Cookbook

### Configuring the Kernel
The Queuing module handles the actual queueing implementations. There are two main components: Queue Managers and Queues.

```php
<?php
$kernel = Krak\Job\createKernel();
$kernel->producer(function($stack) {
// add any middleware
$stack->push(autoQueueNameProduce('Acme\Jobs\'));
return $stack;
});
$kernel->consumer(function() {});
$kernel->scheduleLoop(function() {});
```
**Supported Queues**

### Pimple Integration
- Redis
- Stub

You can easily integrate your Job Kernel with pimple which allows you to use pimple services as middleware in any middleware stack and also allows for better invocation of the Job handler.

```php
<?php
$kernel = Krak\Job\createKernel(/* ... */);
$kernel = new Krak\Job\Kernel\PimpleKernel($kernel, new Pimple\Container());
```

Now, with this wrapped kernel, you can do the following:

```php
<?php
$kernel->producer(function($stack) {
$stack->push('some-pimple-service-id-of-a-middleware');
return $stack;
});
// also, you can use the AutoArgs functionality in your job handlers.
$container[AcmeProcessor::class] = function() {};
class AcmeJob implements Krak\Job\Job {
public function handle(AcmeProcessor $processor, Pimple\Container $container) {
}
}
```

Internally, it uses [Krak\\AutoArgs](https://github.com/krakphp/auto-args) to implement the auto arguments.
## Cookbook

### Async Scheduling

To perform schedule multiple queues at a time, you can create a jobs.yml file like this.
To perform schedule multiple queues at a time, update the kernel config like this:

```yaml
name: "Master Scheduler"
sleep: 5
schedulers:
- queue: "jobs"
max_jobs: 10
- queue: "jobs1"
```php
$kernel->config([
'name' => 'Master Scheduler',
'sleep' => 10,
'ttl' => 50,
'schedulers' => [
[
'queue' => 'emails',
'max_jobs' => 20,
],
[
'queue' => 'orders',
'max_retry' => 3,
]
]
]);
```

This will create a master scheduler that will then manage two schedulers which manage a different queue.
This will create a master scheduler that will then manage two schedulers which manage a different queue. This will launch two separate processes that manage each queue, so the processing of each queue will be completely asynchronous.
10 changes: 6 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
"files": ["src/inc.php"]
},
"require": {
"krak/auto-args": "^0.1.0",
"krak/mw": "^0.4.1",
"krak/auto-args": "^0.3.0",
"krak/cargo": "^0.2.0",
"krak/mw": "^0.5.0",
"nikic/iter": "^1.4",
"psr/log": "^1.0",
"symfony/process": "^2.8|^3.0",
"symfony/yaml": "^3.2"
"symfony/filesystem": "^3.2",
"symfony/process": "^2.8|^3.0"
},
"require-dev": {
"krak/php-inc": "^0.1.3",
"monolog/monolog": "^1.22",
"peridot-php/peridot": "^1.18",
"pimple/pimple": "^3.0",
Expand Down
18 changes: 15 additions & 3 deletions example/simple.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@ public function __construct($id) {
public function handle() {}
}

$kernel = Krak\Job\createKernel(new Predis\Client());
$dispatch = $kernel->createDispatch();
$kernel = new Krak\Job\Kernel();
$kernel->config([
'queue' => 'jobs',
'sleep' => 10,
]);
$kernel->queueManager(function() {
return Krak\Job\createQueueManager(new Predis\Client());
});

Krak\Job\registerConsole($app, $kernel);
if ($argv[1] == 'dispatch') {
$kernel->dispatch(new AlertJob(1));
} else {
$app = new Symfony\Component\Console\Application();
Krak\Job\registerConsole($app, $kernel);
$app->run();
}
20 changes: 13 additions & 7 deletions src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@
Symfony\Component\Console\Command\Command,
Symfony\Component\Console\Input,
Symfony\Component\Console\Output,
Symfony\Component\Console\Logger\ConsoleLogger,
Symfony\Component\Yaml\Yaml;
Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Filesystem\LockHandler;

class ConsumeCommand extends Command
{
protected function configure() {
$this->setName('job:consume')
->setDescription('Consumes the jobs by starting scheudlers/workers according to config')
->addArgument(
'config-path',
Input\InputArgument::REQUIRED,
'The path to the jobs.yml file'
'instance-name',
Input\InputArgument::OPTIONAL,
'An identifier for the scheduler instance. Only one instance of a scheduler can be running at a time. Defaults to "scheduler"'
);
}

protected function execute(Input\InputInterface $input, Output\OutputInterface $output) {
$config_path = $input->getArgument('config-path');
$options = Yaml::parse(file_get_contents($config_path));
$instance_name = $input->getArgument('instance-name') ?: 'scheduler';

$lock_handler = new LockHandler($instance_name.'.lock');
if (!$lock_handler->lock()) {
$output->writeln('<error>A scheduler of this instance is already running.</error>');
return 0;
}
$options = $this->getHelper('krak_job')->getKernel()['krak.job.config'];

$bin = $this->getBinFromArgv($_SERVER['argv']);
$options['worker_cmd'] = $bin . ' job:worker ' .$this->getVerbosityString($output);
Expand Down
7 changes: 4 additions & 3 deletions src/Console/SchedulerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ protected function execute(Input\InputInterface $input, Output\OutputInterface $
}
$options = json_decode(stream_get_contents($input->getStream()), true);

$scheduler = $this->getHelper('krak_job')->getKernel()->createScheduler();
$scheduler = $this->getHelper('krak_job')->getKernel()[Job\Scheduler::class];

$logger = new ConsoleLogger($output);
$logger = new PrefixLogger($logger, $this->getPrefixFromOptions($options));

$logger->error('1,2,3');
$logger->error('3,4,5');
$logger->info("Starting Scheduler");
$scheduler->run($output, $logger, $options);
$logger->info("Starting Stopped");
$logger->info("Stopping Scheduler");
}

private function getPrefixFromOptions(array $options) {
Expand Down
2 changes: 1 addition & 1 deletion src/Console/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ protected function configure() {
}

protected function execute(Input\InputInterface $input, Output\OutputInterface $output) {
$worker = $this->getHelper('krak_job')->getKernel()->createWorker();
$worker = $this->getHelper('krak_job')->getKernel()[Job\Worker::class];
$output->write($worker->work(file_get_contents('php://stdin')));
}
}
4 changes: 3 additions & 1 deletion src/Dispatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ interface Dispatch
{
/** @return WrappedJobBuilder */
public function wrap(Job $job);
/** wraps and dispatches a job */
public function dispatch(Job $job);
/** Dispatches a wrapped job */
public function dispatch(WrappedJob $job);
public function dispatchWrappedJob(WrappedJob $job);
}
6 changes: 5 additions & 1 deletion src/Dispatch/ProducerDispatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public function wrap(Job\Job $job) {
return new $cls($this, $job);
}

public function dispatch(Job\WrappedJob $job) {
public function dispatch(Job\Job $job) {
return $this->wrap($job)->dispatch();
}

public function dispatchWrappedJob(Job\WrappedJob $job) {
$produce = $this->produce;
return $produce($job);
}
Expand Down
Loading

0 comments on commit 457f7ce

Please sign in to comment.