Message Handlers

A message handler is used by DefaultConsumer to do the actual work of processing a message. Handlers implement PMG\Queue\MessageHandler which accepts a message and a set of options from the the consumer as its arguments.

Every single message goes through a single handler. It’s up to that handler to figure out how to deal with each message appropriately.

interface MessageHandler
Namespace:PMG\Queue

An object that can handle (process or act upon) a single message.

handle(PMG\Queue\Message $handle, array $options=[])
Parameters:
  • $handle – The message to handle.
  • $options – A set of options from the consumer.
Returns:

A boolean indicated whether the message was handled successfully.

Return type:

boolean

Callable Handler

The simplest handler could just be a callable that invokes the provided callback with the message.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Driver\MemoryDriver;
use PMG\Queue\Handler\CallableHandler;

$handler = new CallableHandler(function (Message $msg) {
    switch ($msg->getName()) {
        case 'SendAlert':
            sendAnAlertSomehow($msg);
            break;
        case 'OtherMessage':
            handleOtherMessageSomehow($msg);
            break;
    }
});

$consumer = new DefaultConsumer(new MemoryDriver(), $handler);

Multiple Handlers with Mapping Handler

The above switch statement is a lot of boilerplaint, so PMG provies a mapping handler that looks up callables for a message based on its name. For example, here’s a callable for the send alert message.

<?php

final class SendAlertHandler
{
    private $users;
    private $mailer;

    public function __construct(UserRepository $users, \Swift_Mailer $mailer)
    {
        $this->users = $users;
        $this->mailer = $mailer;
    }

    public function __invoke(SendAlert $message)
    {
        $user = $this->users->getByIdentifierOrError($message->getUserId());

        $this->mailer->send(
            \Swift_Message::newInstance()
                ->setTo([$user->getEmail()])
                ->setFrom(['help@example.com'])
                ->setSubject('Hello')
                ->setBody('World')
        );
    }
}

Now pull in the mapping handler with composer require pmg/queue-mapping-handler and we can integrate the callable above with it.

<?php

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\MappingHandler;

$handler = MappingHandler::fromArray([
    'SendAlert' => new SendAlertHandler(/*...*/),
    //'OtherMessage' => new OtherMessageHandler()
    // etc
]);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Using Tactician to Handle Messages

Tactician is a command bus from The PHP League. You can use it to do message handling with the queue.

composer install pmg/queue-tactician

Use the same command bus with each message.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TaticianHandler;

$handler = new TacticianHandler(new CommandBus(/* ... */));

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Alternative, you can create a new command bus to handle each message with CreatingTacticianHandler. This is useful if you’re using forking child processes to handle messages.

<?php

use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\CreatingTaticianHandler;

$handler = new TacticianHandler(function () {
    return new CommandBus(/* ... */);
});

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

Handling Messages in Separate Processes

To handle messages in a forked process use the PcntlForkingHandler decorator.

<?php

use PMG\Queue\Handler\MappingHandler;
use PMG\Queue\Handler\PcntlForkingHandler;

// create an actual handler
$realHandler = MappingHandler::fromArray([
    // ...
]);

// decorate it with the forking handler
$handler = new PcntlForkingHandler($realHandler);

Forking is useful for memory management, but requires some consideration. For instance, database connections might need to be re-opened in the forked process. In such cases, the best bet is to simply create the resources on demand. that’s why the TaticianHandler above takes a factory callable by default.