RabbitMQ and Symfony, how use multiple consumers.

I have a stack with many entities like microservices, BDD, ecc…, and i want use RabbitMQ for notify a change to every entities of my stack.

For do that i want use a single producer but n queues that are consumed by consumers. Every consumer will notify the related entity.

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

 

I am not be going into details of how RabbitMQ works. I strongly suggest you to read the documentation.

Assuming that you have already installed RabbitMQ on your server you can follow the tutorial Symfony 4 and RabbitMQ for install package php-amqplib/rabbitmq-bundle into your project.

//app/config/config.yml
old_sound_rabbit_mq:
    connections:
        default:
            host:     %rabbitmq_host%
            port:     %rabbitmq_port%
            user:     %rabbitmq_user%
            password: %rabbitmq_password%
            vhost:    %rabbitmq_vhost%
            lazy:     %rabbitmq_lazy%
            connection_timeout: 10
            read_write_timeout: 10

    consumers:
        simple_consumer:
            connection:         default
            callback:           my_service_callback
            exchange_options:
                name: %rabbitmq_exchange_1%
                type: direct
            queue_options:
                name: %rabbitmq_queue_1%

    multiple_consumers:
        remove_contact:
            connection: default
            exchange_options:
                name: %rabbitmq_remove_contact_exchange%
                type: 'fanout'
            queues:
                remove_contact_from_source_1:
                    name: %rabbitmq_remove_contact_1%
                    callback: 'my_multiple_consumer_callback_1'
                remove_contact_from_source_2:
                    name: %rabbitmq_remove_contact_2%
                    callback: 'my_multiple_consumer_callback_1'
                remove_contact_from_source_3:
                    name: %rabbitmq_remove_contact_3%
                    callback: 'my_multiple_consumer_callback_1'
    producers:
        simple_producer:
            connection: default
            exchange_options:
                name: %rabbitmq_exchange_1%
                type: direct
        remove_contact:
            connection: default
            exchange_options:
              name: %rabbitmq_remove_contact_exchange%
              type: 'fanout'

It should be noted that the type of exchange becomes ‘fanout’: The fanout exchange is very simple. It just broadcasts all the messages it receives to all the queues it knows. And that’s exactly what we need.

Now when i call from my Controller (or from another place) the producer remove_contact, the exchange defined into configuration of producer, will push the message into 3 queues (remove_contact_from_source_1, remove_contact_from_source_2, remove_contact_from_source_3).

$this->get('old_sound_rabbit_mq.remove_contact')->publish($message);

Now every consumer can access the message entered in his queue.

<?php

namespace Acme\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
...

class MyMultipleConsumer1 implements ConsumerInterface
{
     ...
     ...

    /**
     * @param AMQPMessage $msg
     * @return mixed|void
     * @throws \Exception
     */
    public function execute(AMQPMessage $msg)
    {
         //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.
         $message = json_decode($msg->body, true);

         //Logic here
    }  
}
<?php

namespace Acme\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
...

class MyMultipleConsumer2 implements ConsumerInterface
{
     ...
     ...

    /**
     * @param AMQPMessage $msg
     * @return mixed|void
     * @throws \Exception
     */
    public function execute(AMQPMessage $msg)
    {
         //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.
         $message = json_decode($msg->body, true);
     
         //Logic here
    }  
}

 

<?php

namespace Acme\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
...

class MyMultipleConsumer3 implements ConsumerInterface
{
     ...
     ...

    /**
     * @param AMQPMessage $msg
     * @return mixed|void
     * @throws \Exception
     */
    public function execute(AMQPMessage $msg)
    {
         //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.
         $message = json_decode($msg->body, true);
         
         //Logic here
    }  
}

 

References

Advertisements

One thought on “RabbitMQ and Symfony, how use multiple consumers.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s