Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyService
from concurrent.futures import ThreadPoolExecutor
import grpc
from koapy.backend.daishin_cybos_plus.proxy import CybosPlusProxyService_pb2_grpc
from koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceServicer import (
CybosPlusProxyServiceServicer,
)
[docs]class CybosPlusProxyService:
def __init__(self, host=None, port=None, max_workers=None):
if host is None:
host = "localhost"
if port is None:
port = 3031
if max_workers is None:
max_workers = None
self._host = host
self._port = port
self._max_workers = max_workers
self._address = self._host + ":" + str(self._port)
self._servicer = CybosPlusProxyServiceServicer()
self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
self._server = grpc.server(self._executor)
CybosPlusProxyService_pb2_grpc.add_CybosPlusProxyServiceServicer_to_server(
self._servicer, self._server
)
self._server.add_insecure_port(self._address)
def __getattr__(self, name):
return getattr(self._server, name)
[docs]def main():
service = CybosPlusProxyService()
service.start()
service.wait_for_termination()
if __name__ == "__main__":
main()