RabbitMQ and Symfony, how use multiple consumers – Part 2

In the previous article i show how use multiple conumers with RabbitMQ and Symfony. Now i want that if one of my consumers generate a error, the message will inserted in a new error queue that can be treated later.

I want that the process of inserting into the new error queue is automatically. And I need three different queues of error, one for each original consumer.

An image I think explains better how I would like it to be the architecture of the queues.

So let’s see how the rabbitMQ configuration change:

multiple_consumers:
    remove_contact:
        connection: default
        exchange_options:
            name: 'remove_contact'
            type: 'fanout'
        queues:
            remove_contact_from_source1:
                callback:'my_multiple_consumer_callback_1'
                name: %rabbitmq_remove_contact_1%
                arguments:
                    x-dead-letter-exchange: ['S', 'remove_contact_errors']
                    x-dead-letter-routing-key: ['S', 'remove_contact_source1_errors']
            remove_contact_from_source2:
                callback:'my_multiple_consumer_callback_2'
                name: %rabbitmq_remove_contact_2
                arguments:
                    x-dead-letter-exchange: ['S', 'remove_contact_errors']
                    x-dead-letter-routing-key: ['S', 'remove_contact_source2_errors']
            remove_contact_from_source3:
                name: %rabbitmq_remove_contact_3%
                callback:'my_multiple_consumer_callback_3'
                arguments:
                    x-dead-letter-exchange: ['S', 'remove_contact_errors']
                    x-dead-letter-routing-key: ['S', 'remove_contact_source3_errors']
    remove_contact_errors:
        connection: default
        exchange_options:
            name: 'remove_contact_errors'
            type: 'direct'
        queues:
            remove_contact_source1_errors:
                name: 'remove_contact_source1_errors'
                callback: 'my_error_consumer_callback'
                routing_keys:
                    - remove_contact_source1_errors
            remove_contact_source2_errors:
                name: 'remove_contact_source2_errors'
                callback: 'my_error_consumer_callback'
                routing_keys:
                    - remove_contact_source2_errors
            remove_contact_source3_errors:
                name: 'remove_contact_source3_errors'
                callback: 'my_error_consumer_callback'
                routing_keys:
                    - remove_contact_source3_errors
producers:
    remove_contact:
        connection: default
        exchange_options:
            name: 'remove_contact'
            type: 'fanout'
    remove_contact_errors:
        connection: default
        exchange_options:
            name: 'remove_contact_errors'
            type: 'direct'

But what did we do concretely?

We simply applied the notion of Dead letter queue. Messages from a queue can be ‘dead-lettered’; that is, republished to another exchange when any of the following events occur:

  • The message is rejected (basic.reject or basic.nack) with requeue=false,
  • The TTL for the message expires; or
  • The queue length limit is exceeded.

So in our case when for any reason the conumer1 rejects the message

return ConsumerInterface::MSG_REJECT;

the excange remove_contact_errors will be used for republishing message

x-dead-letter-exchange: ['S', 'remove_contact_errors']

that will stack the message in the queue associated with the routing key declared in the configuration

x-dead-letter-routing-key: ['S', 'remove_contact_source1_errors']

It’s all.

References

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s