Meu nome Ă© Alexey Kazakov, sou lĂder tĂ©cnico da equipe de comunicação com clientes da DomKlik. Neste artigo, quero compartilhar com vocĂŞ uma “receita” que nos permitiu implementar retrays atrasados ​​ao usar o broker de mensagens RabbitMQ
Introdução
RabbitMQ. .

- A RabbitMQ-virtual_host (service_a_vh):
 - RabbitMQ-exchange (service_a_inner_exch), - A,
- RabbitMQ-queue (service_a_input_q), A,
- service_a_input_q service_a_inner_exch.
 
- B, service_a_vh, service_a_inner_exch , .
B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .
RabbitMQ .
// , , . . A HTTP E. E //. HTTP- E - , ?
RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).
1) , .
:
2) RabbitMQ-dead_letter_exchange consumer-.
:
3) , consumer .
:
, consumer . , « — ».
, RabbitMQ.
, RabbitMQ : x-message-ttl, , , «».
, Python, .

dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.
:
- B service_a_inner_exch,
- service_a_input_q,
- A reject,
- dead_letter_exchange,
- dead_letter_queue,
- 5 «»,
- «» dead letter exchange dead_letter_queue, service_a_inner_exch.
«», , , dead letter exchange. .
Python 3.6.2 pika==0.10.0.
publisher.pyimport pika
import settings
def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
    return channel, connection
if __name__ == '__main__':
    channel, connection = init_rmq()
    channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
    connection.close()
 consumer.pyimport logging
import pika
import settings
logger = logging.getLogger(__name__)
def init_rmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=settings.RMQ_HOST,
        port=settings.RMQ_PORT,
        virtual_host=settings.RMQ_VHOST,
        credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
    ))
    channel = connection.channel()
    
    channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
    
    channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')
    
    channel.queue_declare(
        queue=settings.RMQ_INPUT_QUEUE,
        durable=True,
        arguments={
            
            
            'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
        }
    )
    
    channel.queue_declare(
        queue=settings.RMQ_DEAD_QUEUE,
        durable=True,
        arguments={
            
            
            'x-message-ttl': settings.RMQ_DEAD_TTL,
            
            
            'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
        }
    )
    
    channel.queue_bind(
        exchange=settings.RMQ_DEAD_EXCHANGE,
        queue=settings.RMQ_DEAD_QUEUE,
    )
    
    channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)
    return channel
def callback(ch, method, properties, body):
    logger.info('Processing message `%s`', body)
    if can_retry(properties):
        logger.warning('Retrying message')
        
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        return
    logger.error('Can`t retry, drop message')
    ch.basic_ack(delivery_tag=method.delivery_tag)
def can_retry(properties):
    """
     x-death      dead letter exchange.
        ,  ""  .
    """
    deaths = (properties.headers or {}).get('x-death')
    if not deaths:
        return True
    if deaths[0]['count'] >= settings.RETRY_COUNT:
        return False
    return True
if __name__ == '__main__':
    channel = init_rmq()
    logger.info('Consuming.')
    channel.basic_consume(
        queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
    )
    channel.start_consuming()
 settings.pyimport logging.config
RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000  
RETRY_COUNT = 2
dict_config = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'detailed': {
            'class': 'logging.Formatter',
            'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'INFO',
            'formatter': 'detailed',
        },
    },
    'root': {
        'level': 'INFO',
        'handlers': ['console']
    },
}
logging.config.dictConfig(dict_config)
 settings.py RabbitMQ, consumer.py publisher.py :
...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...
.. , , , .
.
dead letter queue . dead letter exchange:
. consumer-
service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .

. , RabbitMQ.
7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.
, - .