Retrays retrasados ​​por RabbitMQ

Mi nombre es Aleksey Kazakov, soy un líder técnico del equipo de comunicaciones de clientes en DomKlik. En este artículo quiero compartir con ustedes una "receta" que nos permitió implementar retransmisiones retrasadas al usar el agente de mensajes RabbitMQ


conejo_retry

Introducción


RabbitMQ. .


esquema_0


  • A RabbitMQ-virtual_host (service_a_vh):
    • RabbitMQ-exchange (service_a_inner_exch), - A,
    • RabbitMQ-queue (service_a_input_q), A,
    • service_a_input_q service_a_inner_exch.
  • B, service_a_vh, service_a_inner_exch , .

B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .


RabbitMQ .



// , , . . A HTTP E. E //. HTTP- E - , ?


RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).



1) , .


:


  • consumer , , E .
  • , ( ), .

2) RabbitMQ-dead_letter_exchange consumer-.


:


  • consumer .
  • — , .

3) , consumer .


:


  • , .


, consumer . , « — ».


, RabbitMQ.


, RabbitMQ : x-message-ttl, , , «».


, Python, .


Esquema 1


dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.


:


  • B service_a_inner_exch,
  • service_a_input_q,
  • A reject,
  • dead_letter_exchange,
  • dead_letter_queue,
  • 5 «»,
  • «» dead letter exchange dead_letter_queue, service_a_inner_exch.

«», , , dead letter exchange. .


Python 3.6.2 pika==0.10.0.


publisher.py
import pika

import settings

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    return channel, connection

if __name__ == '__main__':
    channel, connection = init_rmq()

    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()

consumer.py
import logging

import pika

import settings

logger = logging.getLogger(__name__)

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    #  service_a_inner_exch
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    #  dead_letter_exchange
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')

    #  service_a_input_q
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )

    #    "" 
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            #   ,    "" 
            #    dead letter exchange
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    #   ""   dead_letter_exchange
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )

    #      exchange
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)

    return channel

def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        # requeue=False      ,   dead letter exchange
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return

    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)

def can_retry(properties):
    """
     x-death      dead letter exchange.
        ,  ""  .
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True

if __name__ == '__main__':
    channel = init_rmq()

    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()

settings.py
import logging.config

RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  # 1 
RETRY_COUNT = 2

dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}

logging.config.dictConfig(dict_config)

settings.py RabbitMQ, consumer.py publisher.py :


...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...

.. , , , .


.


dead letter queue . dead letter exchange:



. consumer-


service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .


esquema_2



. , RabbitMQ.


7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.


, - .


All Articles