from __future__ import annotations
import atexit
import queue
import time
from concurrent.futures import Executor, Future
from functools import update_wrapper
from queue import Queue
from threading import RLock
from typing import Any, Callable, Dict, Optional, Sequence
from koapy.backend.kiwoom_open_api_plus.utils.pyside2.QSlotLikeExecutor import (
QSlotLikeCallable,
)
from koapy.compat.pyside2.QtCore import QObject
from koapy.utils.logging.Logging import Logging
from koapy.utils.logging.pyside2.QThreadLogging import QThreadLogging
from koapy.utils.rate_limiting.RateLimiter import RateLimiter
[docs]class QRateLimitedExecutorRunnable(Logging):
def __init__(
self,
future: Future,
fn: Callable,
args: Optional[Sequence[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
self._future = future
self._fn = fn
self._args = ()
self._kwargs = {}
if args:
self._args = args
if kwargs:
self._kwargs = kwargs
[docs] def run(self):
if not self._future.set_running_or_notify_cancel():
return
try:
result = self._fn(*self._args, **self._kwargs)
except BaseException as exc: # pylint: disable=broad-except
self.logger.exception(
"Exception while running QRateLimitedExecutorRunnable"
)
self._future.set_exception(exc)
# break a reference cycle with the exception 'exc'
self = None # pylint: disable=self-cls-assignment
else:
self._future.set_result(result)
[docs] def cancel(self):
return self._future.cancel()
[docs] def result(self):
return self._future.result()
[docs]class QRateLimitedExecutorDecoratedFunction(Logging):
def __init__(
self,
func: QSlotLikeCallable,
limiter: RateLimiter,
executor: QRateLimitedExecutor,
):
self._func = func
self._limiter = limiter
self._executor = executor
update_wrapper(self, self._func, updated=[])
def _checkAndSleepIfNecessary(self, *args, **kwargs):
sleep_seconds = self._limiter.check_sleep_seconds(*args, **kwargs)
if sleep_seconds > 0:
if sleep_seconds > 1:
self.logger.debug(
"Rate limiting function call %s(...), sleeping for %f seconds...",
self._func.__name__,
sleep_seconds,
)
time.sleep(sleep_seconds)
self._limiter.add_call_history(*args, **kwargs)
def _directCallFn(self, *args, **kwargs):
self._checkAndSleepIfNecessary(*args, **kwargs)
return self._func.directCall(*args, **kwargs)
[docs] def directCall(self, *args, **kwargs):
return self._directCallFn(*args, **kwargs)
def _queuedCallFn(self, *args, **kwargs):
self._checkAndSleepIfNecessary(*args, **kwargs)
return self._func.queuedCall(*args, **kwargs).result()
[docs] def queuedCall(self, *args, **kwargs):
return self._executor.submit(self._queuedCallFn, *args, **kwargs)
def __call__(self, *args, **kwargs):
return self.directCall(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._func, name)
[docs]class QRateLimitedExecutor(QThreadLogging, Executor):
def __init__(self, limiter: RateLimiter, parent: Optional[QObject] = None):
QThreadLogging.__init__(self, parent)
Executor.__init__(self)
self._limiter = limiter
self._parent = parent
self._runnable_queue: Queue = Queue()
self._sentinel = object()
self._shutdown = False
self._shutdown_lock = RLock()
atexit.register(self.shutdown)
def __del__(self):
atexit.unregister(self.shutdown)
[docs] def run(self):
while True:
runnable = self._runnable_queue.get()
if runnable is not self._sentinel:
runnable.run()
continue
if self._shutdown:
return
[docs] def submit(self, fn: Callable, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError("Cannot schedule new futures after shutdown")
future = Future()
runnable = QRateLimitedExecutorRunnable(future, fn, args, kwargs)
self._runnable_queue.put(runnable)
return future
[docs] def shutdown(
self, wait: bool = True, cancel_futures: bool = False
): # pylint: disable=arguments-differ
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
while True:
try:
runnable = self._runnable_queue.get_nowait()
except queue.Empty:
break
if runnable is not self._sentinel:
runnable.cancel()
self._runnable_queue.put(self._sentinel)
if wait:
self.wait()
[docs] def wrapCallable(self, func: Callable):
wrapped = QRateLimitedExecutorDecoratedFunction(func, self._limiter, self)
return wrapped