рдЗрд╕ рд▓реЗрдЦ рдореЗрдВ, рдореИрдВ рджрд┐рдЦрд╛рдКрдВрдЧрд╛ рдХрд┐ рд╡рд┐рддрд░рд┐рдд рдХрд╛рд░реНрдп рдХрддрд╛рд░реЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╕рдордп рдЖрдиреЗ рд╡рд╛рд▓реА рд╕рдорд╕реНрдпрд╛рдУрдВ рдореЗрдВ рд╕реЗ рдПрдХ рдХреЛ рдХреИрд╕реЗ рд╣рд▓ рдХрд┐рдпрд╛ рдЬрд╛рдП - рдПрдХ рдХрддрд╛рд░ рдХреЗ рдкреНрд░рд╡рд╛рд╣ рдХреЛ рдирд┐рдпрдВрддреНрд░рд┐рдд рдХрд░рдирд╛, рдпрд╛, рдПрдХ рд╕рд░рд▓ рднрд╛рд╖рд╛ рдореЗрдВ, рдЗрд╕рдХреА рджрд░ рд╕реАрдорд╛ рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд░рдирд╛ред рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХреЗ рд░реВрдк рдореЗрдВ, рдореИрдВ рд▓реЗ рд▓реЗрдВрдЧреЗ рдЕрдЬрдЧрд░ рдФрд░ рдХреА рдореЗрд░реА рдкрд╕рдВрджреАрджрд╛ рдЧреБрдЪреНрдЫрд╛ рдЕрдЬрд╡рд╛рдЗрди + RabbitMQ , рд╣рд╛рд▓рд╛рдВрдХрд┐ рдПрд▓реНрдЧреЛрд░рд┐рдереНрдо рд╣реИ рдХрд┐ рдореИрдВ рдЙрдкрдпреЛрдЧ рдЗрди рдЙрдкрдХрд░рдгреЛрдВ рдкрд░ рдирд┐рд░реНрднрд░ рдирд╣реАрдВ рдХрд░рддрд╛ рдФрд░ рдХрд┐рд╕реА рднреА рдЕрдиреНрдп рдвреЗрд░ рдкрд░ рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред

рддреЛ рд╕рдорд╕реНрдпрд╛ рдХреНрдпрд╛ рд╣реИ?
тАЛ , . , 99.9% / , - 403 500. , , ? тАж , , - .
тАЛ , 1 , , . Celery, N ( ), M , .
What's in the box
тАЛ , , , throttling celery, rate_limit
Task
. , , , , , :
rate limit .
, . - .
@app.task(rate_limit='30/m')
def get_github_api1():
...
@app.task(rate_limit='30/m')
def get_github_api2():
...
, .
тАЛ , , . , , - 60 get_github_api1()
0 get_github_api2()
тАФ 30 , 60. , , , . , .
Bringing decision
Token Bucket
тАЛ Token Bucket тАФ , . 2 : , , , ; . , .
тАЛ , RabbitMQ.

Wrting some code
, . main.py
:
from celery import Celery
from kombu import Queue
app = Celery('Test app', broker='amqp://guest@localhost//')
app.conf.task_queues = [
Queue('github'),
Queue('github_tokens', max_length=2)
]
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))
Rabbit, 1 :
docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
celery beat
тАФ celery
, .
celery -A main beat --loglevel=info
:
[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)
, ''. . , github. main.py
:
def rate_limit(task, task_group):
with task.app.connection_for_read() as conn:
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
if msg is None:
task.retry(countdown=1)
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')
@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')
, . beat
8 :
celery -A main worker - 8 -Q github
, producer.py
:
from main import get_github_api1, get_github_api2
tasks = [get_github_api1, get_github_api2]
for i in range(100):
tasks[i % 2].apply_async(queue='github')
тАФ python producer.py
, :
[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)
, 8 , , , . , , , rate limit , - , , . .
Putting it all together
тАЛ , ( ). , :
from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps
app = Celery('hello', broker='amqp://guest@localhost//')
task_queues = [
Queue('github'),
Queue('google')
]
rate_limits = {
'github': 60,
'google': 100
}
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]
app.conf.task_queues = task_queues
@app.task
def token():
return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')
google
100/, github
тАФ 60/. , , throttling, 50 . , .
Moving further
тАЛ , , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !
P.s. , ;)