PMG Queue

pmg/queue is a production ready queue framework that powers many internal projects at PMG.

It’s simple and extensible a number of features we’ve found to be the most useful including automatic retries and multi-queue support.

Contents

Messages

Messages are objects that implement the PMG\Queue\Message interface. These objects are meant to be serializable and contain everything you need for a handler to do its job.

A message to send an alert to a user might look something like this:

Example Message

<?php
use PMG\Queue\Message;

final class SendAlert implements Message
{
    private $userId;

    public function __construct($userId)
    {
        $this->userId = $userId;
    }

    public function getUserId()
    {
        return $this->userId;
    }
}

Because messages are serialized to be put in a persistent backend they shouldn’t include objects that require state. In the example above the message just contains a user’s identifier rather than the full object. Tlhe handler would then look up the user.

See Consumers and Producers for more information about handlers and messages fit into the system as a whole.

Producers

Producers add messages to a driver backed for the consumer to pick up and handle.

interface Producer
Namespace:PMG\Queue
send(PMG\Queue\Message $message)

Send a message to a driver backend.

Parameters:
  • $message – The message to send into the queue
Throws:

PMG\Queue\Exception\QueueNotFound if the message can’t be routed to an appropriate queue.

The default producer implementation takes a driver and a router as its constructor arguments and uses the router (explained below) to send its messages into a drivers specific queue.

<?php

use PMG\Queue\DefaultProducer;
use PMG\Queue\Router\SimpleRouter;

$router = new SimpleRouter('queueName');

/** @var PMG\Queue\Driver $driver */
$producer = new DefaultProdicer($driver, $router);

Routers

pmg/queue is built with multi-queue support in in mind. To accomplish that on the producer side of things an implementation of PMG\Queue\Router is used.

interface Router
Namespace:PMG\Queue
queueFor(PMG\Queue\Message $message)

Looks a queue name for a given message.

Parameters:
  • $message – the message to route
Returns:

A string queue name if found, null otherwise.

Return type:

string or null

Routing all Message to a Single Queue

Use PMG\Queue\SimpleRouter, which takes a queue name in the constructor and always returns it.

<?php
use PMG\Queue\Router\SimpleRouter;

// all message will go in the "queueName" queue
$router = new SimpleRouter('queueName');
Routing Messages Based on Their Name

Use PMG\Queue\MappingRouter, which takes a map of message name => queue name pairs to its constructor.

<?php

use PMG\Queue\Router\MappingRouter;

$router = new MappingRouter([
    // the `SendAlert` message will go into the `Alerts` queue
    'SendAlert' => 'Alerts',
]);
Falling Back to a Default Queue

To avoid QueueNotFound exceptions, it’s often a good idea to use PMG\Queue\Router\FallbackRouter.

<?php

use PMG\Queue\DefaultProducer;
use PMG\Queue\SimpleMessage;
use PMG\Queue\Router\FallbackRouter;
use PMG\Queue\Router\MappingRouter;

$router = new FallbackRouter(new MappingRouter([
    'SendAlert' => 'Alerts',
]), 'defaultQueue');

$producer = new DefaultProducer($driver, $router);

// goes into the `Alerts` queue
$producer->send(new SimpleMessage('SendAlert'));

// goes into `defaultQueue`
$producer->send(new SimpleMessage('OtherThing'));

Consumers

Implementations of PMG\Queue\Consumer pull message out of a driver backend and handle (process) them in some way. The default consumer accomplishes this a message handler.

In all cases $queueName in the consume should correspond to queues into which your producer put messages.

interface Consumer
Namespace:PMG\Queue
run($queueName)

Consume and handle messages from $queueName indefinitely.

Parameters:
  • $queueName (string) – The queue from which the messages will be processed.
Throws:

PMG\Queue\Exception\DriverError If some things goes wrong with the underlying driver. Generally this happens if the persistent backend goes down or is unreachable. Without the driver the consumer can’t do its work.

Returns:

An exit code

Return type:

int

once($queueName)

Consume and handle a single message from $queueName

Parameters:
  • $queueName (string) – The queue from which the messages will be processed.
Throws:

PMG\Queue\Exception\DriverError If some things goes wrong with the underlying driver. Generally this happens if the persistent backend goes down or is unreachable. Without the driver the consumer can’t do its work.

Returns:

True or false to indicate if the message was handled successfully. null if no message was handled.

Return type:

boolean or null

stop($code)

Used on a running consumer this will tell it to gracefully stop on its next iteration.

Parameters:
  • $code (int) – The exit code to return from run

The script to run your consumer might look something like this. Check out the handlers documentation for more information about what $handler is below.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\MemoryDriver;

$driver = new MemoryDriver();

/** @var PMG\Queue\MessageHandler $handler */
$consumer = new DefaultConsumer($driver, $handler);

exit($consumer->run(isset($argv[1]) ? $argv[1] : 'defaultQueue'));

Retrying Messages

When a message fails – by throwing an exception or returns false from a MessageHandler – the consumer puts it back in the queue to retry up to 5 times by default. This behavior can be adjusted by providing a RetrySpec as the third argument to DefaultConsumers constructor. pmg/queue provides a few by default.

Retry specs look at PMG\Queue\Envelope instances, not raw messages. See the internals documentation for more info about them.

interface RetrySpec
Namespace:PMG\Queue
canRetry(PMG\Queue\Envelope $env)

Inspects an envelop to see if it can retry again.

Parameters:
  • $env – The message envelope to check
Returns:

true if the message can be retried, false otherwise.

Return type:

boolean

Limited Retries

Use PMG\\Queue\\Retry\\LimitedSpec.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\LimitedSpec;

// five retries by default. This is what the consumer does automatically
$retry = new LimitedSpec();

// Or limit to a specific number of retries
$retry = new LimitedSpec(2);

// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);
Never Retry a Message

Sometimes you don’t want to retry a message, for those cases use PMG\\Queue\\Retry\\NeverSpec.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\NeverSpec;

$retry = new NeverSpec();

// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);

Logging

When something goes wrong DefaultConsumer logs it with a PSR-3 Logger implementation. The default is to use a NullLogger, but you can provide your own logger as the fourth argument to DefaultConsumer‘s constructor.

<?php

use PMG\Queue\DefaultConsumer;

$monolog = new Monolog\Logger('yourApp');

// $driver, $handler, $retry as above
$consumer = new DefaultConsumer($driver, $handler, $retry, $monolog);

Build Custom Consumers

Extend PMG\\Queue\\AbstractConsumer to make things easy and only have to implement the once method. Here’s an example that decorates another Consumer with events.

<?php

use PMG\Queue\AbstractConsumer;
use PMG\Queue\Consumer;
use PMG\Queue\Message;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

final class EventingConsumer extends AbstractConsumer
{
    /** @var Consumer */
    private $wrapped;

    /** @var EventDispatcherInterface $events */

    // constructor that takes a consumer and dispatcher to set the props ^

    public function once($queueName)
    {
        $this->events->dispatch('queue:before_once', new Event());
        $this->wrapped->once($queueName);
        $this->events->disaptch('queue:after_once', new Event());
    }
}

Message Handlers

A message handler is used by DefaultConsumer to do the actual work of processing a message. Handlers implement PMG\Queue\MessageHandler which accepts a message and a set of options from the the consumer as its arguments.

Every single message goes through a single handler. It’s up to that handler to figure out how to deal with each message appropriately.

interface MessageHandler
Namespace:PMG\Queue

An object that can handle (process or act upon) a single message.

handle(PMG\Queue\Message $handle, array $options=[])
Parameters:
  • $handle – The message to handle.
  • $options – A set of options from the consumer.
Returns:

A boolean indicated whether the message was handled successfully.

Return type:

boolean

Callable Handler

The simplest handler could just be a callable that invokes the provided callback with the message.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Driver\MemoryDriver;
use PMG\Queue\Handler\CallableHandler;

$handler = new CallableHandler(function (Message $msg) {
    switch ($msg->getName()) {
        case 'SendAlert':
            sendAnAlertSomehow($msg);
            break;
        case 'OtherMessage':
            handleOtherMessageSomehow($msg);
            break;
    }
});

$consumer = new DefaultConsumer(new MemoryDriver(), $handler);

Multiple Handlers with Mapping Handler

The above switch statement is a lot of boilerplaint, so PMG provies a mapping handler that looks up callables for a message based on its name. For example, here’s a callable for the send alert message.

<?php

final class SendAlertHandler
{
    private $users;
    private $mailer;

    public function __construct(UserRepository $users, \Swift_Mailer $mailer)
    {
        $this->users = $users;
        $this->mailer = $mailer;
    }

    public function __invoke(SendAlert $message)
    {
        $user = $this->users->getByIdentifierOrError($message->getUserId());

        $this->mailer->send(
            \Swift_Message::newInstance()
                ->setTo([$user->getEmail()])
                ->setFrom(['help@example.com'])
                ->setSubject('Hello')
                ->setBody('World')
        );
    }
}

Now pull in the mapping handler with composer require pmg/queue-mapping-handler and we can integrate the callable above with it.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\MappingHandler;

$handler = MappingHandler::fromArray([
    'SendAlert' => new SendAlertHandler(/*...*/),
    //'OtherMessage' => new OtherMessageHandler()
    // etc
]);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Using Tactician to Handle Messages

Tactician is a command bus from The PHP League. You can use it to do message handling with the queue.

composer install pmg/queue-tactician

Use the same command bus with each message.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TaticianHandler;

$handler = new TacticianHandler(new CommandBus(/* ... */));

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Alternative, you can create a new command bus to handle each message with CreatingTacticianHandler. This is useful if you’re using forking child processes to handle messages.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\CreatingTaticianHandler;

$handler = new TacticianHandler(function () {
    return new CommandBus(/* ... */);
});

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Handling Messages in Separate Processes

To handle messages in a forked process use the PcntlForkingHandler decorator.

<?php

use PMG\Queue\Handler\MappingHandler;
use PMG\Queue\Handler\PcntlForkingHandler;

// create an actual handler
$realHandler = MappingHandler::fromArray([
    // ...
]);

// decorate it with the forking handler
$handler = new PcntlForkingHandler($realHandler);

Forking is useful for memory management, but requires some consideration. For instance, database connections might need to be re-opened in the forked process. In such cases, the best bet is to simply create the resources on demand. that’s why the TaticianHandler above takes a factory callable by default.

Drivers & Internals

Behind the scenes consumers and producers use driver and envelopes to do their work.

Drivers

Drivers are the queue backend hidden behind the PMG\Queue\Driver interface. pmg/queue comes with two drivers built in: memory and pheanstalk (beanstalkd).

Drivers have method for enqueuing and dequeueing messages as well as methods for acknowledging a message is complete, retrying a message, or marking a message as failed.

Envelopes

Envelopes wrap up messages to allow drivers to add additional metadata. One example of such metadata is a retry count that the consumers may use to determine if a message should be retried. The pheanstalk driver implements its own envelop class so it can track the beanstalkd job identifier for the message.

Drivers are free to do whatever they need to do as long as their envelope implements PMG\Queue\Envelope.

Driver Implementations

The core pmg/queue library provides a in memory driver and PMG maintains a driver for beanstalkd that uses the pheanstalk library.

The Memory Driver & Testing

The memory driver is provided to make prototyping and testing easy. It uses SplQueue instances and only keeps messages in memory.

<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\MemoryDriver;

// ...

$driver = new MemoryDriver();

// $handler instanceof PMG\Queue\MessageHandler
$consumer = new DefaultConsumer($driver, $handler);

The memory driver isn’t extrodinary useful outside of testing. For instance, while doing end to end tests, you may want to switch out your producers library to use the memory driver then verify the expected messages when into it.

<?php
use PMG\Queue\Driver\MemoryDriver;

class SomeTest extends \PHPUnit_Framework_TestCase
{
    const TESTQ = 'TestQueue';

    /** @var MemoryDriver $driver */
    private $driver;

    public function testSomething()
    {
        // imagine some stuff happened before this, now we need to verify that

        $envelope = $this->driver->dequeue(self::TESTQ);

        $this->assertNotNull($envelope);
        $msg = $envelope->unwrap();
        $this->assertInstanceOf(SendAlert::class, $msg);
        $this->assertEquals(123, $msg->getUserId());
    }

}
Pheanstalk Driver

The pheanstalk driver is backed by beanstalkd and is a persistent driver: messages persist across multiple requests or queue runs.

To use it, use composer to install pmg/queue-pheanstalk and pass an instance of Pheanstalk\Pheanstalk and a serializer to its constructor.

<?php
use Pheanstalk\Pheanstalk;
use PMG\Queue\Driver\PheanstalkDriver;
use PMG\Queue\Driver\Serializer\NativeSerializer;

$driver = new PheanstalkDriver(
    new Pheanstalk('localhost', 11300),
    new NativeSerializer('this is a key used to sign messages')
);

See the pheanstalk driver repository for more information and examples.

Serializers

Persistent drivers require some translation from envelopes and messages to something the persistent backend can store. Similarly, whatever is stored in the queue backend needs to be turned back into a message. Serializers make that happen.

All serializers implements PMG\Queue\Serializer\Serializer and one implementation is provied by default: NativeSerializer.

NativeSerializer uses PHP’s build in serialize and unserialize functions. Serialized envelopes are base64 encoded and signed (via an HMAC) with a key given to NativeSerializer in its constructor. The signature is a way to authenticate the message (make sure it came from a source known to use).

<?php
use PMG\Queue\Serializer\NativeSerializer;

$serializer = new NativeSerializer('this is the key');

// ...
Allowed Classes in PHP 7

NativeSerializer supports PHP 7’s allowed_classes option in unserialize to whitelist classes. Just pass an array of message class names as the second argument to NativeSerializer‘s constructor.

Because drivers have their own envelope classes, the pheanstalk driver (or any other drivers that extend PMG\Queue\Driver\AbstractPersistanceDriver) provides a static allowedClasses method that returns an array of envelope classes to whitelist.

<?php
use PMG\Queue\Serializer\NativeSerializer;
use PMG\Queue\Driver\PheanstalkDriver;

$serializer = new NativeSerializer('YourSecretKeyHere', array_merge([
    // your message classes
    SendAlert::class,
    // ...
], PheanstalkDriver::allowedClasses()));

Implementing Your Own Drivers

Persistent drivers are not required to use serializers (or anything else), but if they do PMG\Queue\Driver\AbstractPersistanceDriver provides helpers for the usage of serializers.

Installation & Examples

You should require the driver library of your choice with composer rather than pmg/queue directly. If you’re planning to use beanstalkd as your backend:

composer require pmg/queue-pheanstalk:~1.0

See the core examples directory on the pheanstalk examples for some code samples on gluing everything together.

READ THIS: Glossary & Core Concepts

  • A message is a serializable object that goes into the queue for later processing.
  • A producer adds messages to the queue backend via a driver and a router.
  • A consumer pulls messages out of the queue via driver and executes them with handlers and executors.
  • A driver is PHP representation of the queue backend. There are two built in: memory and beanstalkd. Drivers implement PMG\Queue\Driver.
  • A driver is PHP representation of the queue backend. There is an in memory driver included in this library as an example (and for testing), and an implementation of a beanstalkd driver available.
  • A router looks up the correct queue name for a message based on its name.
  • An executor runs the message handler. This is a simple abstraction to allow folks to fork and run jobs if they desire.
  • A handler is a callable that does the work defined by a message.
  • handler resolvers find handlers based on the message name.
  • An envelope is used internally to wrap up messages with retry information as well as metadata specific to drivers. Users need not worry about this unless they are implementing their own driver.