اختناق الكرفس - تحديد حد معدل قوائم الانتظار

في هذه المقالة سأوضح كيفية حل إحدى المشاكل التي تنشأ عند استخدام قوائم انتظار المهام الموزعة - اختناق عرض النطاق الترددي لقائمة الانتظار ، أو ، في لغة أبسط ، تعيين حد المعدل الخاص بها. كمثال ، سوف آخذ python ومجموعة حفلي المفضلة من Celery + RabbitMQ ، على الرغم من أن الخوارزمية التي أستخدمها لا تعتمد على هذه الأدوات ويمكن تنفيذها على أي مكدس آخر.


كرفس + أرنب


إذا ما هي المشكلة؟


​ , . , 99.9% / , - 403 500. , , ? … , , - .


​ , 1 , , . Celery, N ( ), M , .


What's in the box


​ , , , throttling celery, rate_limit Task. , , , , , :


rate limit .
, . - .

   #        API  60 req/min
   #    
   @app.task(rate_limit='30/m')
   def get_github_api1():
       ...

   @app.task(rate_limit='30/m')
   def get_github_api2():
       ...

, .

​ , , . , , - 60 get_github_api1() 0 get_github_api2() — 30 , 60. , , , . , .


Bringing decision


Token Bucket


Token Bucket — , . 2 : , , , ; . , .
​ , RabbitMQ.


دلو الرمز المميز


Wrting some code


, . main.py :


from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1      1     
app.conf.task_queues = [
    Queue('github'),
    #      2,    
    #        rate limit'a
    Queue('github_tokens', max_length=2)
]

#       
#     ,         
@app.task
def token():
    return 1

#     
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #     1   
    #    rate limit   github - 60   
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Rabbit, 1 :


docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

celery beatcelery, .


celery -A main beat --loglevel=info

:


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

, ''. . , github. main.py:


#       
def rate_limit(task, task_group):
    #      
    with task.app.connection_for_read() as conn:
        #  
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        #  None -  ,  
        if msg is None:
            #    1 
            task.retry(countdown=1)

#  print    
#    max_retries=None,    
# ,    
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')

@app.task(bind=True)
def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')

, . beat 8 :


celery -A main worker - 8 -Q github

, producer.py:


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for  i in range(100):
    #    
    tasks[i % 2].apply_async(queue='github')

python producer.py, :


[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

, 8 , , , . , , , rate limit , - , , . .


Putting it all together


​ , ( ). , :


from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
    Queue('github'),
    Queue('google')
]

#    
rate_limits = {
    'github': 60,
    'google': 100
}

#        ,    
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
    return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #       
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

#     ?
def rate_limit(task_group):
    def decorator_func(func):
        @wraps(func)
        def function(self, *args, **kwargs):
            with self.app.connection_for_read() as conn:
                #         :
                #       
                #     , .    
                #    
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                    try:
                        #    -     
                        #    ,     retry()
                        # ,     
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
                        self.retry(countdown=1)
        return function
    return decorator_func

#   -    , ? ;)
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
    print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
    print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 2')

google 100/, github — 60/. , , throttling, 50 . , .


Moving further


​ , , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !


P.s. , ;)


All Articles