Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusDispatchProxyService

import sys

from concurrent.futures import ThreadPoolExecutor

import grpc
import pythoncom

from koapy.backend.daishin_cybos_plus.proxy.CybosPlusDispatchProxyServiceServicer import (
    CybosPlusDispatchProxyServiceServicer,
)
from koapy.common import DispatchProxyService_pb2_grpc


[docs]class CybosPlusDispatchProxyService: def __init__(self, host=None, port=None, max_workers=None): if host is None: host = "localhost" if port is None: port = 3031 if max_workers is None: max_workers = None self._host = host self._port = port self._max_workers = max_workers self._address = self._host + ":" + str(self._port) self._servicer = CybosPlusDispatchProxyServiceServicer() def initializer(): flags = getattr(sys, "coinit_flags", pythoncom.COINIT_MULTITHREADED) pythoncom.CoInitializeEx(flags) self._executor = ThreadPoolExecutor( max_workers=self._max_workers, initializer=initializer, ) self._server = grpc.server(self._executor) DispatchProxyService_pb2_grpc.add_DispatchProxyServiceServicer_to_server( self._servicer, self._server ) self._server.add_insecure_port(self._address) def __getattr__(self, name): return getattr(self._server, name)
[docs]def main(): service = CybosPlusDispatchProxyService() service.start() service.wait_for_termination()
if __name__ == "__main__": main()