Source code for koapy.backend.kiwoom_open_api_plus.utils.pyside2.QThreadPoolExecutor

import atexit

from concurrent.futures import Executor, Future
from threading import RLock
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload

from koapy.compat.pyside2.QtCore import QObject, QRunnable, QThreadPool


[docs]class QThreadPoolExecutorRunnable(QRunnable): def __init__( self, future: Future, fn: Callable[..., Any], args: Union[Tuple[Any], List[Any]], kwargs: Dict[str, Any], ): super().__init__() self._future = future self._fn = fn self._args = args self._kwargs = kwargs
[docs] def run(self): # pylint: disable=broad-except,self-cls-assignment if not self._future.set_running_or_notify_cancel(): return try: result = self._fn(*self._args, **self._kwargs) except BaseException as exc: self._future.set_exception(exc) # break a reference cycle with the exception 'exc' self = None else: self._future.set_result(result)
[docs]class QThreadPoolExecutor(QObject, Executor): # pylint: disable=arguments-differ @overload def __init__(self, thread_pool: QThreadPool, parent: Optional[QObject]): ... @overload def __init__(self, parent: Optional[QObject]): ... def __init__(self, *args, **kwargs): thread_pool: Optional[QThreadPool] = None parent: Optional[QObject] = None args = list(args) kwargs = dict(kwargs) if len(args) > 0 and isinstance(args[0], QThreadPool): thread_pool = args.pop(0) elif "thread_pool" in kwargs: thread_pool = kwargs.pop("thread_pool") if len(args) > 0: parent = args[0] elif "parent" in kwargs: parent = kwargs["parent"] if thread_pool is None: thread_pool = QThreadPool(self) self._thread_pool = thread_pool self._parent = parent QObject.__init__(self, *args, **kwargs) Executor.__init__(self) self._shutdown = False self._shutdown_lock = RLock() atexit.register(self.shutdown) def __del__(self): atexit.unregister(self.shutdown)
[docs] def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError("Cannot schedule new futures after shutdown") future = Future() runnable = QThreadPoolExecutorRunnable(future, fn, args, kwargs) runnable.setAutoDelete(True) self._thread_pool.start(runnable) return future
[docs] def shutdown(self, wait=True, cancel_futures=False): with self._shutdown_lock: self._shutdown = True if cancel_futures: self._thread_pool.clear() if wait: self._thread_pool.waitForDone()