Source code for koapy.utils.rate_limiting.RateLimiter
import collections
import threading
import time
from typing import List
from koapy.utils.logging.Logging import Logging
[docs]class RateLimiter:
[docs] def check_sleep_seconds(self, *args, **kwargs):
return 0
[docs] def add_call_history(self, *args, **kwargs):
pass
[docs] def sleep_if_necessary(self, *args, **kwargs):
sleep_seconds = self.check_sleep_seconds(*args, **kwargs)
if sleep_seconds > 0:
time.sleep(sleep_seconds)
[docs]class TimeWindowRateLimiter(RateLimiter, Logging):
def __init__(self, period, calls):
super().__init__()
self._period = period
self._calls = calls
self._lock = threading.RLock()
self._clock = time.time
if hasattr(time, "monotonic"):
self._clock = time.monotonic
self._call_history = collections.deque(maxlen=self._calls)
[docs] def check_sleep_seconds(self, *args, **kwargs):
with self._lock:
if len(self._call_history) < self._calls:
return 0
clock = self._clock()
remaining = self._call_history[0] + self._period - clock
return remaining
[docs] def add_call_history(self, *args, **kwargs):
with self._lock:
return self._call_history.append(self._clock())
[docs] def sleep_if_necessary(self, *args, **kwargs):
with self._lock:
sleep_seconds = self.check_sleep_seconds(*args, **kwargs)
if sleep_seconds > 0:
time.sleep(sleep_seconds)
[docs]class CompositeTimeWindowRateLimiter(RateLimiter):
def __init__(self, limiters: List[RateLimiter]):
super().__init__()
self._lock = threading.RLock()
self._limiters = limiters
[docs] def check_sleep_seconds(self, *args, **kwargs):
with self._lock:
return max(
limiter.check_sleep_seconds(*args, **kwargs)
for limiter in self._limiters
)
[docs] def add_call_history(self, *args, **kwargs):
with self._lock:
for limiter in self._limiters:
limiter.add_call_history(*args, **kwargs)
[docs] def sleep_if_necessary(self, *args, **kwargs):
with self._lock:
sleep_seconds = self.check_sleep_seconds(*args, **kwargs)
if sleep_seconds > 0:
time.sleep(sleep_seconds)