Consumers

Implementations of PMG\Queue\Consumer pull message out of a driver backend and handle (process) them in some way. The default consumer accomplishes this a message handler.

In all cases $queueName in the consume should correspond to queues into which your producer put messages.

interface Consumer
Namespace:PMG\Queue
run($queueName)

Consume and handle messages from $queueName indefinitely.

Parameters:
  • $queueName (string) – The queue from which the messages will be processed.
Throws:

PMG\Queue\Exception\DriverError If some things goes wrong with the underlying driver. Generally this happens if the persistent backend goes down or is unreachable. Without the driver the consumer can’t do its work.

Returns:

An exit code

Return type:

int

once($queueName)

Consume and handle a single message from $queueName

Parameters:
  • $queueName (string) – The queue from which the messages will be processed.
Throws:

PMG\Queue\Exception\DriverError If some things goes wrong with the underlying driver. Generally this happens if the persistent backend goes down or is unreachable. Without the driver the consumer can’t do its work.

Returns:

True or false to indicate if the message was handled successfully. null if no message was handled.

Return type:

boolean or null

stop($code)

Used on a running consumer this will tell it to gracefully stop on its next iteration.

Parameters:
  • $code (int) – The exit code to return from run

The script to run your consumer might look something like this. Check out the handlers documentation for more information about what $handler is below.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\MemoryDriver;

$driver = new MemoryDriver();

/** @var PMG\Queue\MessageHandler $handler */
$consumer = new DefaultConsumer($driver, $handler);

exit($consumer->run(isset($argv[1]) ? $argv[1] : 'defaultQueue'));

Retrying Messages

When a message fails – by throwing an exception or returns false from a MessageHandler – the consumer puts it back in the queue to retry up to 5 times by default. This behavior can be adjusted by providing a RetrySpec as the third argument to DefaultConsumers constructor. pmg/queue provides a few by default.

Retry specs look at PMG\Queue\Envelope instances, not raw messages. See the internals documentation for more info about them.

interface RetrySpec
Namespace:PMG\Queue
canRetry(PMG\Queue\Envelope $env)

Inspects an envelop to see if it can retry again.

Parameters:
  • $env – The message envelope to check
Returns:

true if the message can be retried, false otherwise.

Return type:

boolean

Limited Retries

Use PMG\\Queue\\Retry\\LimitedSpec.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\LimitedSpec;

// five retries by default. This is what the consumer does automatically
$retry = new LimitedSpec();

// Or limit to a specific number of retries
$retry = new LimitedSpec(2);

// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);

Never Retry a Message

Sometimes you don’t want to retry a message, for those cases use PMG\\Queue\\Retry\\NeverSpec.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\NeverSpec;

$retry = new NeverSpec();

// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);

Logging

When something goes wrong DefaultConsumer logs it with a PSR-3 Logger implementation. The default is to use a NullLogger, but you can provide your own logger as the fourth argument to DefaultConsumer‘s constructor.

<?php

use PMG\Queue\DefaultConsumer;

$monolog = new Monolog\Logger('yourApp');

// $driver, $handler, $retry as above
$consumer = new DefaultConsumer($driver, $handler, $retry, $monolog);

Build Custom Consumers

Extend PMG\\Queue\\AbstractConsumer to make things easy and only have to implement the once method. Here’s an example that decorates another Consumer with events.

<?php

use PMG\Queue\AbstractConsumer;
use PMG\Queue\Consumer;
use PMG\Queue\Message;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

final class EventingConsumer extends AbstractConsumer
{
    /** @var Consumer */
    private $wrapped;

    /** @var EventDispatcherInterface $events */

    // constructor that takes a consumer and dispatcher to set the props ^

    public function once($queueName)
    {
        $this->events->dispatch('queue:before_once', new Event());
        $this->wrapped->once($queueName);
        $this->events->disaptch('queue:after_once', new Event());
    }
}