In diesem Artikel werde ich zeigen, wie eines der Probleme gelöst werden kann, die bei der Verwendung verteilter Aufgabenwarteschlangen auftreten - Steuern des Durchsatzes einer Warteschlange oder Festlegen des Ratenlimits in einer einfacheren Sprache. Als Beispiel nehme ich Python und meine Lieblingsgruppe von Celery + RabbitMQ , obwohl der von mir verwendete Algorithmus nicht von diesen Tools abhängt und auf jedem anderen Stack implementiert werden kann.

Also, was ist das Problem?
​ , . , 99.9% / , - 403 500. , , ? … , , - .
​ , 1 , , . Celery, N ( ), M , .
What's in the box
​ , , , throttling celery, rate_limit Task. , , , , , :
rate limit .
, . - .
   
   
   @app.task(rate_limit='30/m')
   def get_github_api1():
       ...
   @app.task(rate_limit='30/m')
   def get_github_api2():
       ...
, .
​ , , . , , - 60 get_github_api1() 0 get_github_api2() — 30 , 60. , , , . , .
Bringing decision
Token Bucket
​ Token Bucket — , . 2 : , , , ; . , .
​ , RabbitMQ.

Wrting some code
, . main.py :
from celery import Celery
from kombu import Queue
app = Celery('Test app', broker='amqp://guest@localhost//')
app.conf.task_queues = [
    Queue('github'),
    
    
    Queue('github_tokens', max_length=2)
]
@app.task
def token():
    return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    
    
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))
Rabbit, 1 :
docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
celery beat — celery, .
celery -A main beat --loglevel=info
:
[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)
, ''. . , github. main.py:
def rate_limit(task, task_group):
    
    with task.app.connection_for_read() as conn:
        
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        
        if msg is None:
            
            task.retry(countdown=1)
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')
@app.task(bind=True)
def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')
, . beat 8 :
celery -A main worker - 8 -Q github
, producer.py:
from main import get_github_api1, get_github_api2
tasks = [get_github_api1, get_github_api2]
for  i in range(100):
    
    tasks[i % 2].apply_async(queue='github')
— python producer.py, :
[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)
, 8 , , , . , , , rate limit , - , , . .
Putting it all together
​ , ( ). , :
from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps
app = Celery('hello', broker='amqp://guest@localhost//')
task_queues = [
    Queue('github'),
    Queue('google')
]
rate_limits = {
    'github': 60,
    'google': 100
}
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]
app.conf.task_queues = task_queues
@app.task
def token():
    return 1
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))
def rate_limit(task_group):
    def decorator_func(func):
        @wraps(func)
        def function(self, *args, **kwargs):
            with self.app.connection_for_read() as conn:
                
                
                
                
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                    try:
                        
                        
                        
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
                        self.retry(countdown=1)
        return function
    return decorator_func
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
    print ('Called github Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
    print ('Called github Api 2')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 1')
@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 2')
google 100/, github — 60/. , , throttling, 50 . , .
Moving further
​ , , - , . ;)? , . , 1, ( , ), '' , '' , , . , ) , !
P.s. , ;)