Source code for koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceClientSideSignalConnector

import atexit
import threading
import time

from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceMessageUtils import (
    convert_arguments_from_protobuf_to_python,
)
from koapy.backend.kiwoom_open_api_plus.utils.queue.QueueBasedIterableObserver import (
    QueueBasedIterableObserver,
)


[docs]class KiwoomOpenApiPlusServiceClientSideSignalConnector: def __init__(self, stub, name, executor): self._stub = stub self._name = name self._executor = executor self._lock = threading.RLock() self._observers = {} atexit.register(self.shutdown) def __del__(self): atexit.unregister(self.shutdown) def _stop_observer(self, observer): request = KiwoomOpenApiPlusService_pb2.BidirectionalListenRequest() request.stop_listen_request.time = time.time() observer.on_next(request) observer.on_completed() def _get_observer(self, callback, default=None): return self._observers.setdefault(self._name, {}).get(callback, default) def _remove_observer(self, callback): with self._lock: observer = self._get_observer(callback) if observer: self._stop_observer(observer) del self._observers[self._name][callback] return observer def _add_observer(self, callback): with self._lock: self._remove_observer(callback) observer = QueueBasedIterableObserver() self._observers[self._name][callback] = observer return observer
[docs] def shutdown(self): with self._lock: for _name, observers in self._observers.items(): for _callback, observer in observers.items(): self._stop_observer(observer)
[docs] def connect(self, callback): with self._lock: observer = self._add_observer(callback) def fn(): request = KiwoomOpenApiPlusService_pb2.BidirectionalListenRequest() request.listen_request.slots.append(self._name) observer.on_next(request) observer_iterator = iter(observer) response_iterator = self._stub.BidirectionalListen(observer_iterator) for response in response_iterator: args = convert_arguments_from_protobuf_to_python(response.arguments) callback(*args) request = KiwoomOpenApiPlusService_pb2.BidirectionalListenRequest() request.handled_request.time = time.time() observer.on_next(request) _future = self._executor.submit(fn)
[docs] def disconnect(self, callback): with self._lock: self._remove_observer(callback)