Вот самый простой алгоритм , если вы хотите просто отбрасывать сообщения, когда они приходят слишком быстро (вместо того, чтобы ставить их в очередь, что имеет смысл, потому что очередь может стать произвольно большой):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
В этом решении нет структур данных, таймеров и т. Д., И оно работает чисто :). Чтобы увидеть это, «допуск» растет со скоростью не более 5/8 единиц в секунду, то есть не более пяти единиц в восемь секунд. Каждое пересылаемое сообщение удерживает одну единицу, поэтому вы не можете отправлять более пяти сообщений каждые восемь секунд.
Обратите внимание, что это rate
должно быть целое число, т.е. без ненулевой десятичной части, иначе алгоритм не будет работать правильно (фактическая скорость не будет rate/per
). Например rate=0.5; per=1.0;
, не работает, потому allowance
что никогда не будет расти до 1,0. Но rate=1.0; per=2.0;
работает отлично.
allowance
. Размер ковша естьrate
.allowance += …
Линия является оптимизацией добавления маркеров каждой скорости ÷ за секунды.Используйте этот декоратор @RateLimited (ratepersec) перед вашей функцией, которая ставит в очередь.
По сути, он проверяет, прошло ли 1 / скорость в секундах с последнего раза, и если нет, ждет оставшуюся часть времени, в противном случае он не ждет. Это эффективно ограничивает вас, чтобы оценить / сек. Декоратор может быть применен к любой функции, которую вы хотите ограничить по скорости.
В вашем случае, если вы хотите максимум 5 сообщений за 8 секунд, используйте @RateLimited (0.625) перед функцией sendToQueue.
источник
time.clock()
в моей системе недостаточно разрешения, поэтому я адаптировал код и переключился на использованиеtime.time()
time.clock()
, который измеряет истекшее время процессора. Процессорное время может работать намного быстрее или намного медленнее, чем «фактическое» время. Вы хотите использоватьtime.time()
вместо этого, который измеряет время стены («фактическое» время).Token Bucket довольно прост в реализации.
Начните с ведра с 5 жетонов.
Каждые 5/8 секунд: если в ведре меньше 5 жетонов, добавьте один.
Каждый раз, когда вы хотите отправить сообщение: если в корзине есть токен ≥1, выньте один токен и отправьте сообщение. В противном случае, подождите / отбросьте сообщение / что угодно.
(очевидно, в реальном коде вы использовали бы целочисленный счетчик вместо реальных токенов, и вы можете оптимизировать каждый шаг 5/8, сохраняя временные метки)
Повторное чтение вопроса, если ограничение скорости полностью сбрасывается каждые 8 секунд, то вот модификация:
Начните с отметки
last_send
времени, давно, например, в эпоху. Кроме того, начните с того же ведра с 5 жетонами.Ударьте правило каждые 5/8 секунд.
Каждый раз, когда вы отправляете сообщение: сначала проверьте, если
last_send
≥ 8 секунд назад. Если так, заполните ведро (установите это к 5 жетонам). Во-вторых, если в корзине есть токены, отправьте сообщение (в противном случае отбросьте / подождите / и т.д.). В-третьих, установитьlast_send
сейчас.Это должно работать для этого сценария.
Я на самом деле написал бот IRC, используя такую стратегию (первый подход). Это на Perl, а не на Python, но вот некоторый код для иллюстрации:
Первая часть здесь посвящена добавлению токенов в корзину. Вы можете увидеть оптимизацию добавления токенов на основе времени (от 2-й до последней строки), а затем последняя строка ограничивает содержимое сегмента до максимума (MESSAGE_BURST)
$ conn - это структура данных, которая передается. Это внутри метода, который выполняется регулярно (он рассчитывает, когда в следующий раз ему будет что-то делать, и спит либо так долго, либо до тех пор, пока не получит сетевой трафик). Следующая часть метода обрабатывает отправку. Это довольно сложно, потому что сообщения имеют приоритеты, связанные с ними.
Это первая очередь, которая запускается несмотря ни на что. Даже если это приведет к тому, что наше соединение погибнет от наводнения. Используется для чрезвычайно важных вещей, например, для ответа на PING сервера. Далее остальные очереди:
Наконец, статус корзины сохраняется обратно в структуру данных $ conn (на самом деле чуть позже в методе; сначала он вычисляет, как скоро у него будет больше работы)
Как видите, реальный код обработки сегментов очень мал - около четырех строк. Остальная часть кода является приоритетной обработкой очереди. У бота есть приоритетные очереди, так что, например, кто-то, общаясь с ним, не может помешать ему выполнять свои важные обязанности по кик-бану.
источник
чтобы заблокировать обработку, пока сообщение не может быть отправлено, таким образом, помещая в очередь дальнейшие сообщения, красивое решение antti также может быть изменено следующим образом:
он просто ждет, пока не будет достаточно разрешения для отправки сообщения. чтобы не начинать с двухкратной ставки, скидка также может быть инициализирована с 0.
источник
(1-allowance) * (per/rate)
, вы должны добавить ту же суммуlast_check
.Сохраните время, когда были отправлены последние пять строк. Удерживайте сообщения в очереди до тех пор, пока пятое самое последнее сообщение (если оно существует) не пройдет как минимум 8 секунд в прошлом (с last_five в виде массива раз):
источник
Одним из решений является прикрепление метки времени к каждому элементу очереди и удаление элемента по истечении 8 секунд. Вы можете выполнять эту проверку каждый раз, когда очередь добавляется.
Это работает, только если вы ограничите размер очереди до 5 и откажетесь от любых добавлений, пока очередь заполнена.
источник
Если кто-то все еще заинтересован, я использую этот простой вызываемый класс в сочетании с временным хранилищем значений ключа LRU, чтобы ограничить частоту запросов на IP. Использует deque, но может быть переписан для использования со списком.
источник
Просто реализация Python кода из принятого ответа.
источник
Как насчет этого:
источник
Мне нужна была вариация в Scala. Вот:
Вот как это можно использовать:
источник