Neste artigo, mostrarei como resolver um dos problemas que surgem ao usar filas de tarefas distribuídas - limitando a largura de banda da fila ou, em um idioma mais simples, definindo seu limite de taxa. Como exemplo, vou usar python e meu grupo favorito de Celery + RabbitMQ , embora o algoritmo que eu uso não dependa dessas ferramentas e possa ser implementado em qualquer outra pilha.

Então qual é o problema?
, . , 99.9% / , - 403 500. , , ? … , , - .
, 1 , , . Celery, N ( ), M , .
What's in the box
, , , throttling celery, rate_limit
Task
. , , , , , :
rate limit .
, . - .
@app.task(rate_limit='30/m')
def get_github_api1():
...
@app.task(rate_limit='30/m')
def get_github_api2():
...
, .
, , . , , - 60 get_github_api1()
0 get_github_api2()
— 30 , 60. , , , . , .
Bringing decision
Token Bucket
Token Bucket — , . 2 : , , , ; . , .
, RabbitMQ.

Wrting some code
, . main.py
:
from celery import Celery
from kombu import Queue
app = Celery('Test app', broker='amqp://guest@localhost//')
app.conf.task_queues = [
Queue('github'),
Queue('github_tokens', max_length=2)
]
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))
Rabbit, 1 :
docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
celery beat
— celery
, .
celery -A main beat --loglevel=info
:
[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)
, ''. . , github. main.py
:
def rate_limit(task, task_group):
with task.app.connection_for_read() as conn:
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
if msg is None:
task.retry(countdown=1)
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')
@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')
, . beat
8 :
celery -A main worker - 8 -Q github
, producer.py
:
from main import get_github_api1, get_github_api2
tasks = [get_github_api1, get_github_api2]
for i in range(100):
tasks[i % 2].apply_async(queue='github')
— python producer.py
, :
[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)
, 8 , , , . , , , rate limit , - , , . .
Putting it all together
, ( ). , :
from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps
app = Celery('hello', broker='amqp://guest@localhost//')
task_queues = [
Queue('github'),
Queue('google')
]
rate_limits = {
'github': 60,
'google': 100
}
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]
app.conf.task_queues = task_queues
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')
google
100/, github
— 60/. , , throttling, 50 . , .
Moving further
, , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !
P.s. , ;)