рдЕрдЬрд╡рд╛рдЗрди рдереНрд░реЙрдЯрд▓рд┐рдВрдЧ - рдХрддрд╛рд░реЛрдВ рдХреЗ рд▓рд┐рдП рджрд░ рд╕реАрдорд╛ рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд░рдирд╛

рдЗрд╕ рд▓реЗрдЦ рдореЗрдВ, рдореИрдВ рджрд┐рдЦрд╛рдКрдВрдЧрд╛ рдХрд┐ рд╡рд┐рддрд░рд┐рдд рдХрд╛рд░реНрдп рдХрддрд╛рд░реЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╕рдордп рдЖрдиреЗ рд╡рд╛рд▓реА рд╕рдорд╕реНрдпрд╛рдУрдВ рдореЗрдВ рд╕реЗ рдПрдХ рдХреЛ рдХреИрд╕реЗ рд╣рд▓ рдХрд┐рдпрд╛ рдЬрд╛рдП - рдПрдХ рдХрддрд╛рд░ рдХреЗ рдкреНрд░рд╡рд╛рд╣ рдХреЛ рдирд┐рдпрдВрддреНрд░рд┐рдд рдХрд░рдирд╛, рдпрд╛, рдПрдХ рд╕рд░рд▓ рднрд╛рд╖рд╛ рдореЗрдВ, рдЗрд╕рдХреА рджрд░ рд╕реАрдорд╛ рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд░рдирд╛ред рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХреЗ рд░реВрдк рдореЗрдВ, рдореИрдВ рд▓реЗ рд▓реЗрдВрдЧреЗ рдЕрдЬрдЧрд░ рдФрд░ рдХреА рдореЗрд░реА рдкрд╕рдВрджреАрджрд╛ рдЧреБрдЪреНрдЫрд╛ рдЕрдЬрд╡рд╛рдЗрди + RabbitMQ , рд╣рд╛рд▓рд╛рдВрдХрд┐ рдПрд▓реНрдЧреЛрд░рд┐рдереНрдо рд╣реИ рдХрд┐ рдореИрдВ рдЙрдкрдпреЛрдЧ рдЗрди рдЙрдкрдХрд░рдгреЛрдВ рдкрд░ рдирд┐рд░реНрднрд░ рдирд╣реАрдВ рдХрд░рддрд╛ рдФрд░ рдХрд┐рд╕реА рднреА рдЕрдиреНрдп рдвреЗрд░ рдкрд░ рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред


рдЕрдЬрд╡рд╛рдЗрди + RabbitMQ


рддреЛ рд╕рдорд╕реНрдпрд╛ рдХреНрдпрд╛ рд╣реИ?


тАЛ , . , 99.9% / , - 403 500. , , ? тАж , , - .


тАЛ , 1 , , . Celery, N ( ), M , .


What's in the box


тАЛ , , , throttling celery, rate_limit Task. , , , , , :


rate limit .
, . - .

   #        API  60 req/min
   #    
   @app.task(rate_limit='30/m')
   def get_github_api1():
       ...

   @app.task(rate_limit='30/m')
   def get_github_api2():
       ...

, .

тАЛ , , . , , - 60 get_github_api1() 0 get_github_api2() тАФ 30 , 60. , , , . , .


Bringing decision


Token Bucket


тАЛ Token Bucket тАФ , . 2 : , , , ; . , .
тАЛ , RabbitMQ.


рдЯреЛрдХрди рдмрд╛рд▓реНрдЯреА


Wrting some code


, . main.py :


from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1      1     
app.conf.task_queues = [
    Queue('github'),
    #      2,    
    #        rate limit'a
    Queue('github_tokens', max_length=2)
]

#       
#     ,         
@app.task
def token():
    return 1

#     
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #     1   
    #    rate limit   github - 60   
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Rabbit, 1 :


docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

celery beat тАФ celery, .


celery -A main beat --loglevel=info

:


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

, ''. . , github. main.py:


#       
def rate_limit(task, task_group):
    #      
    with task.app.connection_for_read() as conn:
        #  
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        #  None -  ,  
        if msg is None:
            #    1 
            task.retry(countdown=1)

#  print    
#    max_retries=None,    
# ,    
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')

@app.task(bind=True)
def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')

, . beat 8 :


celery -A main worker - 8 -Q github

, producer.py:


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for  i in range(100):
    #    
    tasks[i % 2].apply_async(queue='github')

тАФ python producer.py, :


[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

, 8 , , , . , , , rate limit , - , , . .


Putting it all together


тАЛ , ( ). , :


from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
    Queue('github'),
    Queue('google')
]

#    
rate_limits = {
    'github': 60,
    'google': 100
}

#        ,    
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
    return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #       
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

#     ?
def rate_limit(task_group):
    def decorator_func(func):
        @wraps(func)
        def function(self, *args, **kwargs):
            with self.app.connection_for_read() as conn:
                #         :
                #       
                #     , .    
                #    
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                    try:
                        #    -     
                        #    ,     retry()
                        # ,     
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
                        self.retry(countdown=1)
        return function
    return decorator_func

#   -    , ? ;)
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
    print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
    print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 2')

google 100/, github тАФ 60/. , , throttling, 50 . , .


Moving further


тАЛ , , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !


P.s. , ;)


All Articles