Message Handlers¶
A message handler is used by DefaultConsumer
to do the actual work of
processing a message. Handlers implement PMG\Queue\MessageHandler
which
accepts a message and a set of options from the the consumer as its arguments.
Every single message goes through a single handler. It’s up to that handler to figure out how to deal with each message appropriately.
-
interface
MessageHandler
¶ Namespace: PMG\Queue An object that can handle (process or act upon) a single message.
-
handle
(PMG\Queue\Message $handle, array $options=[])¶ Parameters: - $handle – The message to handle.
- $options – A set of options from the consumer.
Returns: A boolean indicated whether the message was handled successfully.
Return type: boolean
-
Callable Handler¶
The simplest handler could just be a callable that invokes the provided callback with the message.
<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Driver\MemoryDriver;
use PMG\Queue\Handler\CallableHandler;
$handler = new CallableHandler(function (Message $msg) {
switch ($msg->getName()) {
case 'SendAlert':
sendAnAlertSomehow($msg);
break;
case 'OtherMessage':
handleOtherMessageSomehow($msg);
break;
}
});
$consumer = new DefaultConsumer(new MemoryDriver(), $handler);
Multiple Handlers with Mapping Handler¶
The above switch statement is a lot of boilerplaint, so PMG provies a mapping handler that looks up callables for a message based on its name. For example, here’s a callable for the send alert message.
<?php
final class SendAlertHandler
{
private $users;
private $mailer;
public function __construct(UserRepository $users, \Swift_Mailer $mailer)
{
$this->users = $users;
$this->mailer = $mailer;
}
public function __invoke(SendAlert $message)
{
$user = $this->users->getByIdentifierOrError($message->getUserId());
$this->mailer->send(
\Swift_Message::newInstance()
->setTo([$user->getEmail()])
->setFrom(['help@example.com'])
->setSubject('Hello')
->setBody('World')
);
}
}
Now pull in the mapping handler with composer require pmg/queue-mapping-handler
and we can integrate the callable above with it.
<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\MappingHandler;
$handler = MappingHandler::fromArray([
'SendAlert' => new SendAlertHandler(/*...*/),
//'OtherMessage' => new OtherMessageHandler()
// etc
]);
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
Using Tactician to Handle Messages¶
Tactician is a command bus from The PHP League. You can use it to do message handling with the queue.
composer install pmg/queue-tactician
Use the same command bus with each message.
<?php
use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TaticianHandler;
$handler = new TacticianHandler(new CommandBus(/* ... */));
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
Alternative, you can create a new command bus to handle each message with CreatingTacticianHandler. This is useful if you’re using forking child processes to handle messages.
<?php
use League\Tactician\CommandBus;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\CreatingTaticianHandler;
$handler = new TacticianHandler(function () {
return new CommandBus(/* ... */);
});
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
Handling Messages in Separate Processes¶
To handle messages in a forked process use the PcntlForkingHandler
decorator.
<?php
use PMG\Queue\Handler\MappingHandler;
use PMG\Queue\Handler\PcntlForkingHandler;
// create an actual handler
$realHandler = MappingHandler::fromArray([
// ...
]);
// decorate it with the forking handler
$handler = new PcntlForkingHandler($realHandler);
Forking is useful for memory management, but requires some consideration. For
instance, database connections might need to be re-opened in the forked process.
In such cases, the best bet is to simply create the resources on demand. that’s
why the TaticianHandler
above takes a factory callable by default.
In cases where a process fails to fork, a PMG\Queue\Exception\CouldNotFork
exception will be thrown and the consumer will exit with an unsuccessful status
code. Your process manager (supervisord, upstart, systemd, etc) should be
configured to restart the consumer when that happens.