Message Handlers

A message handler is used by DefaultConsumer to do the actual work of processing a message. Handlers implement PMG\Queue\MessageHandler which accepts a message and a set of options from the the consumer as its arguments.

Every single message goes through a single handler. It’s up to that handler to figure out how to deal with each message appropriately.

interface MessageHandler
Namespace:PMG\Queue

An object that can handle (process or act upon) a single message.

handle(PMG\Queue\Message $handle, array $options=[])
Parameters:
  • $handle – The message to handle.
  • $options – A set of options from the consumer.
Returns:

A boolean indicated whether the message was handled successfully.

Return type:

boolean

Callable Handler

The simplest handler could just be a callable that invokes the provided callback with the message.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Driver\MemoryDriver;
use PMG\Queue\Handler\CallableHandler;

$handler = new CallableHandler(function (Message $msg) {
    switch ($msg->getName()) {
        case 'SendAlert':
            sendAnAlertSomehow($msg);
            break;
        case 'OtherMessage':
            handleOtherMessageSomehow($msg);
            break;
    }
});

$consumer = new DefaultConsumer(new MemoryDriver(), $handler);

Multiple Handlers with Mapping Handler

The above switch statement is a lot of boilerplaint, so PMG provies a mapping handler that looks up callables for a message based on its name. For example, here’s a callable for the send alert message.

<?php

final class SendAlertHandler
{
    private $users;
    private $mailer;

    public function __construct(UserRepository $users, \Swift_Mailer $mailer)
    {
        $this->users = $users;
        $this->mailer = $mailer;
    }

    public function __invoke(SendAlert $message)
    {
        $user = $this->users->getByIdentifierOrError($message->getUserId());

        $this->mailer->send(
            \Swift_Message::newInstance()
                ->setTo([$user->getEmail()])
                ->setFrom(['help@example.com'])
                ->setSubject('Hello')
                ->setBody('World')
        );
    }
}

Now pull in the mapping handler with composer require pmg/queue-mapping-handler and we can integrate the callable above with it.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\MappingHandler;

$handler = MappingHandler::fromArray([
    'SendAlert' => new SendAlertHandler(/*...*/),
    //'OtherMessage' => new OtherMessageHandler()
    // etc
]);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Using Tactician to Handle Messages

Tactician is a command bus from The PHP League. You can use it to do message handling with the queue.

composer install pmg/queue-tactician

Use the same command bus with each message.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TaticianHandler;

$handler = new TacticianHandler(new CommandBus(/* ... */));

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Alternative, you can create a new command bus to handle each message with CreatingTacticianHandler. This is useful if you’re using forking child processes to handle messages.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\CreatingTaticianHandler;

$handler = new TacticianHandler(function () {
    return new CommandBus(/* ... */);
});

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Handling Messages in Separate Processes

To handle messages in a forked process use the PcntlForkingHandler decorator.

<?php

use PMG\Queue\Handler\MappingHandler;
use PMG\Queue\Handler\PcntlForkingHandler;

// create an actual handler
$realHandler = MappingHandler::fromArray([
    // ...
]);

// decorate it with the forking handler
$handler = new PcntlForkingHandler($realHandler);

Forking is useful for memory management, but requires some consideration. For instance, database connections might need to be re-opened in the forked process. In such cases, the best bet is to simply create the resources on demand. that’s why the TaticianHandler above takes a factory callable by default.

In cases where a process fails to fork, a PMG\Queue\Exception\CouldNotFork exception will be thrown and the consumer will exit with an unsuccessful status code. Your process manager (supervisord, upstart, systemd, etc) should be configured to restart the consumer when that happens.