Meu nome Ă© Alexey Kazakov, sou lĂder tĂ©cnico da equipe de comunicação com clientes da DomKlik. Neste artigo, quero compartilhar com vocĂŞ uma “receita” que nos permitiu implementar retrays atrasados ​​ao usar o broker de mensagens RabbitMQ
Introdução
RabbitMQ. .

- A RabbitMQ-virtual_host (service_a_vh):
- RabbitMQ-exchange (service_a_inner_exch), - A,
- RabbitMQ-queue (service_a_input_q), A,
- service_a_input_q service_a_inner_exch.
- B, service_a_vh, service_a_inner_exch , .
B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .
RabbitMQ .
// , , . . A HTTP E. E //. HTTP- E - , ?
RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).
1) , .
:
2) RabbitMQ-dead_letter_exchange consumer-.
:
3) , consumer .
:
, consumer . , « — ».
, RabbitMQ.
, RabbitMQ : x-message-ttl
, , , «».
, Python, .

dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.
:
- B service_a_inner_exch,
- service_a_input_q,
- A reject,
- dead_letter_exchange,
- dead_letter_queue,
- 5 «»,
- «» dead letter exchange dead_letter_queue, service_a_inner_exch.
«», , , dead letter exchange. .
Python 3.6.2 pika==0.10.0.
publisher.pyimport pika
import settings
def init_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=settings.RMQ_HOST,
port=settings.RMQ_PORT,
virtual_host=settings.RMQ_VHOST,
credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
))
channel = connection.channel()
channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
return channel, connection
if __name__ == '__main__':
channel, connection = init_rmq()
channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
connection.close()
consumer.pyimport logging
import pika
import settings
logger = logging.getLogger(__name__)
def init_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=settings.RMQ_HOST,
port=settings.RMQ_PORT,
virtual_host=settings.RMQ_VHOST,
credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
))
channel = connection.channel()
channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')
channel.queue_declare(
queue=settings.RMQ_INPUT_QUEUE,
durable=True,
arguments={
'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
}
)
channel.queue_declare(
queue=settings.RMQ_DEAD_QUEUE,
durable=True,
arguments={
'x-message-ttl': settings.RMQ_DEAD_TTL,
'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
}
)
channel.queue_bind(
exchange=settings.RMQ_DEAD_EXCHANGE,
queue=settings.RMQ_DEAD_QUEUE,
)
channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)
return channel
def callback(ch, method, properties, body):
logger.info('Processing message `%s`', body)
if can_retry(properties):
logger.warning('Retrying message')
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
return
logger.error('Can`t retry, drop message')
ch.basic_ack(delivery_tag=method.delivery_tag)
def can_retry(properties):
"""
x-death dead letter exchange.
, "" .
"""
deaths = (properties.headers or {}).get('x-death')
if not deaths:
return True
if deaths[0]['count'] >= settings.RETRY_COUNT:
return False
return True
if __name__ == '__main__':
channel = init_rmq()
logger.info('Consuming.')
channel.basic_consume(
queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
)
channel.start_consuming()
settings.pyimport logging.config
RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000
RETRY_COUNT = 2
dict_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'detailed',
},
},
'root': {
'level': 'INFO',
'handlers': ['console']
},
}
logging.config.dictConfig(dict_config)
settings.py RabbitMQ, consumer.py publisher.py :
...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...
.. , , , .
.
dead letter queue . dead letter exchange:
. consumer-
service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .

. , RabbitMQ.
7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.
, - .