Limitation du céleri - définition de la limite de débit pour les files d'attente

Dans cet article, je montrerai comment résoudre l'un des problèmes qui surviennent lors de l'utilisation de files d'attente de tâches distribuées - la limitation de la bande passante de la file d'attente ou, dans un langage plus simple, la définition de sa limite de débit. À titre d'exemple, je prendrai python et mon bouquet préféré de Celery + RabbitMQ , bien que l'algorithme que j'utilise ne dépend pas de ces outils et puisse être implémenté sur n'importe quelle autre pile.


CĂ©leri + LapinMQ


Donc quel est le problème?


​ , . , 99.9% / , - 403 500. , , ? … , , - .


​ , 1 , , . Celery, N ( ), M , .


What's in the box


​ , , , throttling celery, rate_limit Task. , , , , , :


rate limit .
, . - .

   #        API  60 req/min
   #    
   @app.task(rate_limit='30/m')
   def get_github_api1():
       ...

   @app.task(rate_limit='30/m')
   def get_github_api2():
       ...

, .

​ , , . , , - 60 get_github_api1() 0 get_github_api2() — 30 , 60. , , , . , .


Bringing decision


Token Bucket


​ Token Bucket — , . 2 : , , , ; . , .
​ , RabbitMQ.


Seau Ă  jetons


Wrting some code


, . main.py :


from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1      1     
app.conf.task_queues = [
    Queue('github'),
    #      2,    
    #        rate limit'a
    Queue('github_tokens', max_length=2)
]

#       
#     ,         
@app.task
def token():
    return 1

#     
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #     1   
    #    rate limit   github - 60   
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Rabbit, 1 :


docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

celery beat — celery, .


celery -A main beat --loglevel=info

:


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

, ''. . , github. main.py:


#       
def rate_limit(task, task_group):
    #      
    with task.app.connection_for_read() as conn:
        #  
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        #  None -  ,  
        if msg is None:
            #    1 
            task.retry(countdown=1)

#  print    
#    max_retries=None,    
# ,    
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')

@app.task(bind=True)
def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')

, . beat 8 :


celery -A main worker - 8 -Q github

, producer.py:


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for  i in range(100):
    #    
    tasks[i % 2].apply_async(queue='github')

— python producer.py, :


[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

, 8 , , , . , , , rate limit , - , , . .


Putting it all together


​ , ( ). , :


from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
    Queue('github'),
    Queue('google')
]

#    
rate_limits = {
    'github': 60,
    'google': 100
}

#        ,    
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
    return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #       
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

#     ?
def rate_limit(task_group):
    def decorator_func(func):
        @wraps(func)
        def function(self, *args, **kwargs):
            with self.app.connection_for_read() as conn:
                #         :
                #       
                #     , .    
                #    
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                    try:
                        #    -     
                        #    ,     retry()
                        # ,     
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
                        self.retry(countdown=1)
        return function
    return decorator_func

#   -    , ? ;)
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
    print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
    print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 2')

google 100/, github — 60/. , , throttling, 50 . , .


Moving further


​ , , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !


P.s. , ;)


All Articles