Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusEntrypointProxy

import threading

import grpc

from requests.structures import CaseInsensitiveDict

from koapy.backend.daishin_cybos_plus.core.CybosPlusEntrypointMixin import (
    CybosPlusEntrypointMixin,
)
from koapy.backend.daishin_cybos_plus.proxy import (
    CybosPlusProxyService_pb2,
    CybosPlusProxyService_pb2_grpc,
)
from koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceMessageUtils import (
    AssignPrimitive,
    ExtractPrimitive,
)
from koapy.utils.logging.Logging import Logging


[docs]class CybosPlusDispatchProxyMethod: def __init__(self, proxy, name): self._proxy = proxy self._name = name def __call__(self, *args, **kwargs): return self._proxy._InvokeMethod(self._name, args)
[docs]class CybosPlusDispatchProxy(Logging): def __init__(self, proxy, progid): self._proxy = proxy self._progid = progid self._is_trade_related = self._progid.startswith("CpTrade.") request = CybosPlusProxyService_pb2.DispatchRequest() request.prog = self._progid self._dispatch = self._proxy._stub.Dispatch(request) def _GetProperty(self, name): request = CybosPlusProxyService_pb2.PropertyRequest() request.prog = self._progid request.name = name should_stop = False while not should_stop: response_future = self._proxy._stub.Property.future( request, timeout=self._proxy._timeout ) try: response = response_future.result() except grpc.RpcError: if response_future.code() == grpc.StatusCode.DEADLINE_EXCEEDED: self.logger.warning("Property %s lookup failed, retrying...", name) else: raise else: should_stop = True result = ExtractPrimitive(response.value) return result def _InvokeMethod(self, name, args): request = CybosPlusProxyService_pb2.MethodRequest() request.prog = self._progid request.name = name for arg in args: AssignPrimitive(request.arguments.add().value, arg) should_stop = False while not should_stop: response_future = self._proxy._stub.Method.future( request, timeout=self._proxy._timeout ) try: response = response_future.result() except grpc.RpcError: if response_future.code() == grpc.StatusCode.DEADLINE_EXCEEDED: self.logger.warning( "Method invocation %s(%s) timed out, retrying...", name, ", ".join(map(repr, args)), ) else: raise else: should_stop = True result = ExtractPrimitive(response.return_value) return result def __getattr__(self, name): if name in self._dispatch.properties: return self._GetProperty(name) elif name in self._dispatch.methods: return CybosPlusDispatchProxyMethod(self, name) else: raise AttributeError
[docs]class CybosPlusIncompleteProgIDProxy: def __init__(self, proxy, prefix): self._proxy = proxy self._prefix = prefix self._cache = {} self._lock = threading.RLock() def __getattr__(self, name): if name not in self._cache: with self._lock: if name not in self._cache: progid = "{}.{}".format(self._prefix, name) self._cache[name] = CybosPlusDispatchProxy(self._proxy, progid) return self._cache[name]
[docs]class CybosPlusEntrypointProxy(CybosPlusEntrypointMixin): def __init__(self, host=None, port=None): if host is None: host = "localhost" if port is None: port = 3031 self._host = host self._port = port self._address = self._host + ":" + str(self._port) self._channel = grpc.insecure_channel(self._address) self._stub = CybosPlusProxyService_pb2_grpc.CybosPlusProxyServiceStub( self._channel ) self._timeout = 10 grpc.channel_ready_future(self._channel).result(timeout=5) self._attribute_mapping = { "CpDib": "DsCbo1", "CpSysDib": "CpSysDib", "CpTrade": "CpTrade", "CpUtil": "CpUtil", "DsCbo1": "DsCbo1", } self._attribute_mapping = CaseInsensitiveDict(self._attribute_mapping) self._cache = {} self._lock = threading.RLock() def __getattr__(self, name): if name not in self._attribute_mapping: raise AttributeError( "'{}' object has no attribute '{}'".format(type(self), name) ) name = self._attribute_mapping[name] if name not in self._cache: with self._lock: if name not in self._cache: self._cache[name] = CybosPlusIncompleteProgIDProxy(self, name) return self._cache[name]