From RabbitMqBundle to PhpEnqueue with Symfony Messenger

Today i try to explain how i migrate from RabbitMqBundle to PhpEnqueue using the Messenger component of Symfony.

Stack

  • PHP 7.2
  • Symfony 4.1

Starting Situation

I have well installed RabbitMQ following simple tutorial Symfony 4 and RabbitMQ.

So in my configuration file i have the following configuration:

#config/packages/old_sound_rabbit_mq.yaml

old_sound_rabbit_mq:
    enable_collector: false
    connections:
        default:
            url: '%env(RABBITMQ_URL)%'
            lazy: true

    producers:
        mailer:
            connection: default
            exchange_options:
                name: mailer
                type: direct
    consumers:
        mailer:
            connection:       default
            exchange_options: {name: 'mailer', type: direct}
            queue_options:    {name: 'mailer'}
            callback:         App\Consumer\Mailer\MailerConsumer

And i have my producer service:

<?php

namespace App\Service\Mailer;

use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class MailerProducerService
{
    /**
     * @var ProducerInterface
     */
    private $producer;

    /**
     * MailerProducerService constructor.
     *
     * @param ProducerInterface $producer
     */
    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param array  $data
     */
    public function send(array $data) 
    {
        $this->producer->publish(json_encode('data' => $data));
    }
}

And my consumer

<?php

namespace App\Consumer\Mailer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class MailerConsumer implements ConsumerInterface
{
    private $logger;

    public function __construct(LoggerInterface $logger) 
   {
        $this->logger = $logger;
    }

    public function execute(AMQPMessage $msg)
    {
        $data = json_decode($msg->getBody(), true);
        
        // Do something with the message received
        ...
        ...
    }
}

Step 1: Replace OldSoundRabbitMqBundle with EnqueueBundle

Reading Max Kotliar’s article, we realize that the migration to EnqueueBundle is really simple.

  • Install packages:
$ composer require enqueue/enqueue-bundle enqueue/amqp-lib
  • Set configuration:
#config/packages/enqueue.yaml
enqueue:
    transport:
        default: '%env(ENQUEUE_DSN)%'
    client: ~
#.env
...
...
###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@queue:5672
###< enqueue/enqueue-bundle ###
...
...
  • Replace Producer call
<?php

namespace App\Service\Mailer;

use Enqueue\Client\ProducerInterface;

class MailerProducerService
{
    /**
     * @var ProducerInterface
     */
    private $producer;

    public function __construct(ProducerInterface $producer) 
    {
        $this->producer = $producer;
    }

    public function send($data)
    {
        $this->producer->sendCommand('mailer.processor', $data);
    }
}
  • Replace Consumer by Processor
<?php

namespace App\Processor\Mailer;

use Enqueue\Client\CommandSubscriberInterface;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Psr\Log\LoggerInterface;

class MailerProcessor implements PsrProcessor, CommandSubscriberInterface
{
    private $logger;

    public function __construct(LoggerInterface $logger) 
   {
        $this->logger = $logger;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $body = json_decode($message->getBody(), true);

        try {
             // Do something with the message received     
        } catch (\Exception $e) {
            $this->logger->error($e->getMessage());

            return self::REQUEUE;
        }

        return self::ACK;
    }

    public static function getSubscribedCommand()
    {
        return [
            'processor_name' => 'mailer.processor'
            'queueName' => 'emails',
            'queueNameHardcoded' => true,
            'exclusive' => true
        ];
    }
}

It’s cool, but we can do better. We can use the Messenger component of Symfony to make everything easier.

Step 2: Use Messenger component of Symfony

  • Install packages
composer require messenger enqueue/messenger-adapter
  • Update messenger.yaml
#config/packages/messenger.yaml
framework:
    messenger:
        transports:
            mailer: enqueue://default?&queue[name]=mailer
        routing:
            App\Command\Mailer\SendMail: mailer

In this way, we are telling to Messenger to dispatch all messages of type SendMail  to the mailer queue defined in transports.
Where SendMail is a message object, that is simply PHP object that can be handled by a handler.

  • Create Command (message object)
<?php

namespace App\Command\Mailer;

final class SendMail
{
    private $data;

    public function __construct(array $data)
    {
        $this->data = $data;
    }

    public function getData()
    {
        return $this->data;
    }
}
  • Create CommandHandler (our new consumer)
<?php

namespace App\Command\SendMailHandler;

use Enqueue\MessengerAdapter\Exception\RequeueMessageException;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Psr\Log\LoggerInterface;

final class SendMailHandler implements MessageHandlerInterface
{
    /**
     * @var LoggerInterface
     */
    private $logger;

    public function __construct(LoggerInterface $logger) 
    {
        $this->logger = $logger;
    }

    public function __invoke(SendMail $sendMail)
    {
        $data = $sendMail->getData();

        try {
             // Do something with the message received  
        } catch (\Exception $e) {
            $this->logger->error(...); 
            throw new RequeueMessageException('Error sending mail');
        }
    }
}
  • Use Bus to enqueue message
<?php

namespace App\Service\Mailer;

use Symfony\Component\Messenger\MessageBusInterface;

class MailerProducerService
{
    /**
     * @var MessageBusInterface
     */
    private $bus;

    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;
    }

    public function send($data)
    {
        $sendMailCommand = new SendMail($data);
        $this->bus->dispatch($sendMailCommand);
    }
}
  • In order to process the message, we simply run the command
php bin/console messenger:consume mailer

In this way we no longer need the Processor or the Consumer. The message sent to the queue will be automatically handled by the Handler which, in other words, becomes our new consumer.

So now we just have to remove old_sound:

composer remove php-amqplib/rabbitmq-bundle

 

Conclusions

In my previous article we saw how easy it is to use CQRS with the Messenger component of Symfony. Today we have seen that how make the process asynchronous simply using php enqueue and its adapter for Messenger.

Special thanks to my friend and colleague Pierre Escobar, great mind and great Symfony expert who helps me to discover always new interesting things.

 

References

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s