Retrays Tertunda oleh RabbitMQ

Nama saya Aleksey Kazakov, saya seorang pemimpin teknis dari tim Komunikasi Klien di DomKlik. Dalam artikel ini saya ingin berbagi dengan Anda "resep" yang memungkinkan kami menerapkan retays tertunda saat menggunakan broker pesan RabbitMQ


rabbit_retry

pengantar


RabbitMQ. .


schema_0


  • A RabbitMQ-virtual_host (service_a_vh):
    • RabbitMQ-exchange (service_a_inner_exch), - A,
    • RabbitMQ-queue (service_a_input_q), A,
    • service_a_input_q service_a_inner_exch.
  • B, service_a_vh, service_a_inner_exch , .

B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .


RabbitMQ .



// , , . . A HTTP E. E //. HTTP- E - , ?


RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).



1) , .


:


  • consumer , , E .
  • , ( ), .

2) RabbitMQ-dead_letter_exchange consumer-.


:


  • consumer .
  • — , .

3) , consumer .


:


  • , .


, consumer . , « — ».


, RabbitMQ.


, RabbitMQ : x-message-ttl, , , «».


, Python, .


skema_1


dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.


:


  • B service_a_inner_exch,
  • service_a_input_q,
  • A reject,
  • dead_letter_exchange,
  • dead_letter_queue,
  • 5 «»,
  • «» dead letter exchange dead_letter_queue, service_a_inner_exch.

«», , , dead letter exchange. .


Python 3.6.2 pika==0.10.0.


publisher.py
import pika

import settings

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    return channel, connection

if __name__ == '__main__':
    channel, connection = init_rmq()

    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()

consumer.py
import logging

import pika

import settings

logger = logging.getLogger(__name__)

def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()

    #  service_a_inner_exch
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')

    #  dead_letter_exchange
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')

    #  service_a_input_q
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )

    #    "" 
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            #      service_a_input_q
            #  nack-    dead_letter_exchange
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            #   ,    "" 
            #    dead letter exchange
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    #   ""   dead_letter_exchange
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )

    #      exchange
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)

    return channel

def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        # requeue=False      ,   dead letter exchange
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return

    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)

def can_retry(properties):
    """
     x-death      dead letter exchange.
        ,  ""  .
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True

if __name__ == '__main__':
    channel = init_rmq()

    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()

settings.py
import logging.config

RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  # 1 
RETRY_COUNT = 2

dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}

logging.config.dictConfig(dict_config)

settings.py RabbitMQ, consumer.py publisher.py :


...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...

.. , , , .


.


dead letter queue . dead letter exchange:



. consumer-


service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .


schema_2



. , RabbitMQ.


7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.


, - .


All Articles