Limitation du céleri - définition de la limite de débit pour les files d'attente

Dans cet article, je montrerai comment résoudre l'un des problèmes qui surviennent lors de l'utilisation de files d'attente de tâches distribuées - la limitation de la bande passante de la file d'attente ou, dans un langage plus simple, la définition de sa limite de débit. À titre d'exemple, je prendrai python et mon bouquet préféré de Celery + RabbitMQ , bien que l'algorithme que j'utilise ne dépend pas de ces outils et puisse être implémenté sur n'importe quelle autre pile.

CĂ©leri + LapinMQ

Donc quel est le problème?

What's in the box

   #        API  60 req/min
   def get_github_api1():

   def get_github_api2():

Bringing decision

Token Bucket

Seau Ă  jetons

Wrting some code

from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

app.conf.task_queues = [
    Queue('github_tokens', max_length=2)

def token():
    return 1

def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

celery -A main beat --loglevel=info


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

, ''. . , github.

def rate_limit(task, task_group):
    with as conn:
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        if msg is None:
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')

def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')

celery -A main worker - 8 -Q github


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for  i in range(100):
    tasks[i % 2].apply_async(queue='github')

[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

Putting it all together

from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [

rate_limits = {
    'github': 60,
    'google': 100

task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

def token():
    return 1

def setup_periodic_tasks(sender, **kwargs):
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

def rate_limit(task_group):
    def decorator_func(func):
        def function(self, *args, **kwargs):
            with as conn:
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
        return function
    return decorator_func

@app.task(bind=True, max_retries=None)
def get_github_api1(self):
    print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
def get_github_api2(self):
    print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
def query_google_api1(self):
    print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
def query_google_api1(self):
    print ('Called Google Api 2')

Moving further

