Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusEntrypointProxy
from threading import RLock
import grpc
from koapy.backend.daishin_cybos_plus.core.CybosPlusEntrypointMixin import (
CybosPlusEntrypointMixin,
)
from koapy.backend.daishin_cybos_plus.proxy.CybosPlusDispatchProxy import (
CybosPlusDispatchProxy,
)
from koapy.common import DispatchProxyService_pb2, DispatchProxyService_pb2_grpc
[docs]class CybosPlusEntrypointProxy(CybosPlusEntrypointMixin):
def __init__(self, host=None, port=None):
if host is None:
host = "localhost"
if port is None:
port = 3031
self._host = host
self._port = port
self._address = self._host + ":" + str(self._port)
self._channel = grpc.insecure_channel(self._address)
self._stub = DispatchProxyService_pb2_grpc.DispatchProxyServiceStub(
self._channel
)
grpc.channel_ready_future(self._channel).result(timeout=5)
self._lock = RLock()
self._dispatch_proxies = {}
def __getitem__(self, name):
if name not in self._dispatch_proxies:
with self._lock:
if name not in self._dispatch_proxies:
request = DispatchProxyService_pb2.GetDispatchRequest()
request.iid = name
response = self._stub.GetDispatch(request)
iid = response.iid
proxy = CybosPlusDispatchProxy(iid, self._stub)
self._dispatch_proxies[name] = proxy
proxy = self._dispatch_proxies[name]
return proxy