Nama saya Aleksey Kazakov, saya seorang pemimpin teknis dari tim Komunikasi Klien di DomKlik. Dalam artikel ini saya ingin berbagi dengan Anda "resep" yang memungkinkan kami menerapkan retays tertunda saat menggunakan broker pesan RabbitMQ
pengantar
RabbitMQ. .

- A RabbitMQ-virtual_host (service_a_vh):
- RabbitMQ-exchange (service_a_inner_exch), - A,
- RabbitMQ-queue (service_a_input_q), A,
- service_a_input_q service_a_inner_exch.
- B, service_a_vh, service_a_inner_exch , .
B . RabbitMQ-exchange, A , RabbitMQ-routing_key . «» .
RabbitMQ .
// , , . . A HTTP E. E //. HTTP- E - , ?
RabbitMQ reject with requeue, . , (~100 ) consumer, E ( ).
1) , .
:
2) RabbitMQ-dead_letter_exchange consumer-.
:
3) , consumer .
:
, consumer . , « — ».
, RabbitMQ.
, RabbitMQ : x-message-ttl
, , , «».
, Python, .

dead_letter_queue service_a_inner_exch. «» , dead_letter_queue dead letter exchange service_a_inner_exch. . , dead_letter_queue exchange.
:
- B service_a_inner_exch,
- service_a_input_q,
- A reject,
- dead_letter_exchange,
- dead_letter_queue,
- 5 «»,
- «» dead letter exchange dead_letter_queue, service_a_inner_exch.
«», , , dead letter exchange. .
Python 3.6.2 pika==0.10.0.
publisher.pyimport pika
import settings
def init_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=settings.RMQ_HOST,
port=settings.RMQ_PORT,
virtual_host=settings.RMQ_VHOST,
credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
))
channel = connection.channel()
channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
return channel, connection
if __name__ == '__main__':
channel, connection = init_rmq()
channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
connection.close()
consumer.pyimport logging
import pika
import settings
logger = logging.getLogger(__name__)
def init_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=settings.RMQ_HOST,
port=settings.RMQ_PORT,
virtual_host=settings.RMQ_VHOST,
credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
))
channel = connection.channel()
channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')
channel.queue_declare(
queue=settings.RMQ_INPUT_QUEUE,
durable=True,
arguments={
'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
}
)
channel.queue_declare(
queue=settings.RMQ_DEAD_QUEUE,
durable=True,
arguments={
'x-message-ttl': settings.RMQ_DEAD_TTL,
'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
}
)
channel.queue_bind(
exchange=settings.RMQ_DEAD_EXCHANGE,
queue=settings.RMQ_DEAD_QUEUE,
)
channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)
return channel
def callback(ch, method, properties, body):
logger.info('Processing message `%s`', body)
if can_retry(properties):
logger.warning('Retrying message')
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
return
logger.error('Can`t retry, drop message')
ch.basic_ack(delivery_tag=method.delivery_tag)
def can_retry(properties):
"""
x-death dead letter exchange.
, "" .
"""
deaths = (properties.headers or {}).get('x-death')
if not deaths:
return True
if deaths[0]['count'] >= settings.RETRY_COUNT:
return False
return True
if __name__ == '__main__':
channel = init_rmq()
logger.info('Consuming.')
channel.basic_consume(
queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
)
channel.start_consuming()
settings.pyimport logging.config
RMQ_HOST = ''
RMQ_PORT = 5672
RMQ_VHOST = ''
RMQ_USERNAME = ''
RMQ_PASSWORD = ''
RMQ_INPUT_EXCHANGE = ''
RMQ_INPUT_QUEUE = ''
RMQ_DEAD_EXCHANGE = ''
RMQ_DEAD_QUEUE = ''
RMQ_DEAD_TTL = 60 * 1000
RETRY_COUNT = 2
dict_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'detailed',
},
},
'root': {
'level': 'INFO',
'handlers': ['console']
},
}
logging.config.dictConfig(dict_config)
settings.py RabbitMQ, consumer.py publisher.py :
...
2020-05-02 12:16:32,260 INFO __main__: Consuming.
2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:16:35,233 WARNING __main__: Retrying message
2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:17:35,241 WARNING __main__: Retrying message
2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
...
.. , , , .
.
dead letter queue . dead letter exchange:
. consumer-
service_a_inner_exch , consumer-, . , A_another, service_a_another_input_q, service_a_inner_exch. «» , . , exchange dead_inner_exch, .

. , RabbitMQ.
7 , E . : RabbitMQ 3.6.12, 4 RPS , 40 RPS.
, - .