Verzögerte Retrays von RabbitMQ

Mein Name ist Aleksey Kazakov, ich bin technischer Leiter des Client Communications-Teams bei DomKlik. In diesem Artikel möchte ich Ihnen ein „Rezept“ vorstellen, mit dem wir verzögerte Retrays bei Verwendung des RabbitMQ- Nachrichtenbrokers implementieren konnten


rabbit_retry

Einführung


RabbitMQ. .


schema_0


  • A RabbitMQ-virtual_host (service_a_vh):
    • RabbitMQ-exchange (service_a_inner_exch), - A,
    • RabbitMQ-queue (service_a_input_q), A,
    • service_a_input_q service_a_inner_exch.
  • B, service_a_vh, service_a_inner_exch , .

B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .


RabbitMQ .



// , , . . A HTTP E. E //. HTTP- E - , ?


RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).



1) , .


:


  • consumer , , E .
  • , ( ), .

2) RabbitMQ-dead_letter_exchange consumer-.


:


  • consumer .
  • — , .

3) , consumer .


:


  • , .


, consumer . , « — ».


, RabbitMQ.


, RabbitMQ : x-message-ttl, , , «».


, Python, .


Schema 1


dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.


:


  • B service_a_inner_exch,
  • service_a_input_q,
  • A reject,
  • dead_letter_exchange,
  • dead_letter_queue,
  • 5 «»,
  • «» dead letter exchange dead_letter_queue, service_a_inner_exch.

«», , , dead letter exchange. .


Python 3.6.2 pika==0.10.0.


publisher.py
import pika

import settings

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    return channel, connection

if __name__ == '__main__':
    channel, connection = init_rmq()

    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()

consumer.py
import logging

import pika

import settings

logger = logging.getLogger(__name__)

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    #  service_a_inner_exch
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    #  dead_letter_exchange
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')

    #  service_a_input_q
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )

    #    "" 
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            #   ,    "" 
            #    dead letter exchange
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    #   ""   dead_letter_exchange
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )

    #      exchange
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)

    return channel

def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        # requeue=False      ,   dead letter exchange
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return

    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)

def can_retry(properties):
    """
     x-death      dead letter exchange.
        ,  ""  .
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True

if __name__ == '__main__':
    channel = init_rmq()

    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()

settings.py
import logging.config

RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  # 1 
RETRY_COUNT = 2

dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}

logging.config.dictConfig(dict_config)

settings.py RabbitMQ, consumer.py publisher.py :


...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...

.. , , , .


.


dead letter queue . dead letter exchange:



. consumer-


service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .


Schema_2



. , RabbitMQ.


7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.


, - .


All Articles