Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusDispatchProxy

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from typing import Any, Callable, Optional

try:
    from typing import ParamSpec
except ImportError:
    from typing_extensions import ParamSpec

import grpc

from koapy.backend.daishin_cybos_plus.stub import (
    CpForeDib,
    CpForeTrade,
    CpSysDib,
    CpTrade,
    CpUtil,
    DsCbo1,
)
from koapy.common import DispatchProxyService_pb2, DispatchProxyService_pb2_grpc
from koapy.common.DispatchProxyServiceMessageUtils import AssignValue, ExtractValue
from koapy.common.EventInstance import EventInstance
from koapy.utils.logging.Logging import Logging

[docs]P = ParamSpec("P")
[docs]CLSIDToClassMap = { str(getattr(mod, name).CLSID): getattr(mod, name) for mod in [ CpForeDib, CpForeTrade, CpSysDib, CpTrade, CpUtil, DsCbo1, ] for name in dir(mod) if hasattr(getattr(mod, name), "CLSID") }
[docs]class CybosPlusDispatchProxyMethod: def __init__(self, proxy, name): self._proxy = proxy self._name = name def __call__(self, *args, **kwargs): return self._proxy._CallMethod(self._name, args)
[docs]class CybosPlusDispatchProxyEventInstance(EventInstance[P]): def __init__(self, proxy, name): super().__init__() self._proxy = proxy self._name = name self._future = None
[docs] def connect(self, slot: Callable[P, Any]): with self._lock: if slot not in self._slots: self._slots.append(slot) if len(self._slots) > 0 and self._future is None: self._future, _ = self._proxy._ConnectEvent(self._name)
[docs] def disconnect(self, slot: Optional[Callable[P, Any]] = None): super().disconnect(slot)
[docs]class CybosPlusDispatchProxy(Logging): def __init__( self, iid: str, stub: DispatchProxyService_pb2_grpc.DispatchProxyServiceStub, timeout: Optional[int] = None, max_workers: Optional[int] = None, ): self._iid = iid self._stub = stub self._timeout = timeout self._max_workers = max_workers self._class = CLSIDToClassMap[self._iid] self._properties = [ name for name in dir(self._class) if isinstance(getattr(self._class, name), property) and not name.startswith("On") ] self._methods = [ name for name in dir(self._class) if hasattr(getattr(self._class, name), "__call__") ] self._events = [ name for name in dir(self._class) if isinstance(getattr(self._class, name), property) and name.startswith("On") ] self._method_instances = {} self._event_instances = {} for method in self._methods: self._method_instances[method] = CybosPlusDispatchProxyMethod(self, method) for event in self._events: self._event_instances[event] = CybosPlusDispatchProxyEventInstance( self, event ) self._executor = ThreadPoolExecutor(max_workers=self._max_workers) def _GetAttr(self, name): request = DispatchProxyService_pb2.GetAttrRequest() request.iid = self._iid request.name = name should_stop = False while not should_stop: try: response = self._stub.GetAttr(request, timeout=self._timeout) except grpc.RpcError as error: # error = typing.cast(grpc.Call, error) # pylint: disable=no-member if error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: self.logger.warning( "getattr(self, %r) timed out, retrying...", name ) else: raise else: should_stop = True result = ExtractValue(response.value) return result def _SetAttr(self, name, value): request = DispatchProxyService_pb2.SetAttrRequest() request.iid = self._iid request.name = name request.value = value should_stop = False while not should_stop: try: response = self._stub.SetAttr(request, timeout=self._timeout) except grpc.RpcError as error: # error = typing.cast(grpc.Call, error) # pylint: disable=no-member if error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: self.logger.warning( "setattr(self, %r, %r) timed out, retrying...", name, value ) else: raise else: should_stop = True return response and None def _CallMethod(self, name, args): request = DispatchProxyService_pb2.CallMethodRequest() request.iid = self._iid request.name = name for arg in args: AssignValue(request.arguments.add().value, arg) should_stop = False while not should_stop: try: response = self._stub.CallMethod(request, timeout=self._timeout) except grpc.RpcError as error: # error = typing.cast(grpc.Call, error) # pylint: disable=no-member if error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: self.logger.warning( "Method call %s(%s) timed out, retrying...", name, ", ".join(map(repr, args)), ) else: raise else: should_stop = True result = ExtractValue(response.return_value) return result def _ConnectEvent(self, name): request = DispatchProxyService_pb2.ConnectEventRequest() request.establish_request.iid = self._iid request.establish_request.name = name event_instance = self._event_instances[name] request_queue = Queue() sentinel = object() request_iterator = iter(request_queue.get, sentinel) request_queue.put(request) def put_sentinel(): request_queue.put(sentinel) response_iterator = self._stub.ConnectEvent(request_iterator) def submit(): for response in response_iterator: args = [ExtractValue(arg.value) for arg in response.arguments] event_instance(*args) request = DispatchProxyService_pb2.ConnectEventRequest() request.ack_request # pylint: disable=pointless-statement request_queue.put(request) future = self._executor.submit(submit) return future, put_sentinel def __getattr__(self, name): if name in self._properties: return self._GetAttr(name) elif name in self._method_instances: return self._method_instances[name] elif name in self._event_instances: return self._event_instances[name] else: return super().__getattribute__(name) def __setattr__(self, name, value): if "_properties" in self.__dict__ and name in self.__dict__["_properties"]: self._SetAttr(name, value) else: super().__setattr__(name, value)