Source code for koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceServer
import inspect
from concurrent.futures import ThreadPoolExecutor
import grpc
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2_grpc
from koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceServicer import (
KiwoomOpenApiPlusServiceServicer,
)
from koapy.config import config
from koapy.utils.logging.Logging import Logging
from koapy.utils.networking import find_free_port_for_host, is_in_private_network
[docs]class KiwoomOpenApiPlusServiceServer(Logging):
def __init__(
self,
control,
host=None,
port=None,
credentials=None,
**kwargs,
):
if host is None:
host = config.get_string(
"koapy.backend.kiwoom_open_api_plus.grpc.host", "localhost"
)
host = config.get_string(
"koapy.backend.kiwoom_open_api_plus.grpc.server.host", host
)
if port is None:
port = config.get_int("koapy.backend.kiwoom_open_api_plus.grpc.port", 0)
port = config.get_int(
"koapy.backend.kiwoom_open_api_plus.grpc.server.port", port
)
if port == 0:
port = find_free_port_for_host(host)
self.logger.info(
"Using one of the free ports, final address would be %s:%d", host, port
)
self._control = control
self._host = host
self._port = port
self._credentials = credentials
self._kwargs = dict(kwargs)
self._servicer = KiwoomOpenApiPlusServiceServicer(self._control)
self._address = self._host + ":" + str(self._port)
grpc_server_signature = inspect.signature(grpc.server)
grpc_server_params = list(grpc_server_signature.parameters.keys())
grpc_server_kwargs = {
k: v for k, v in self._kwargs.items() if k in grpc_server_params
}
grpc_server_bound_arguments = grpc_server_signature.bind_partial(
**grpc_server_kwargs
)
if grpc_server_bound_arguments.arguments.get("thread_pool") is None:
thread_pool_signature = inspect.signature(ThreadPoolExecutor)
thread_pool_params = list(thread_pool_signature.parameters.keys())
thread_pool_kwargs = {
k: v for k, v in self._kwargs.items() if k in thread_pool_params
}
thread_pool_bound_arguments = thread_pool_signature.bind(
**thread_pool_kwargs
)
if thread_pool_bound_arguments.arguments.get("max_workers") is None:
max_workers = config.get_int(
"koapy.backend.kiwoom_open_api_plus.grpc.server.max_workers", 8
)
thread_pool_bound_arguments.arguments["max_workers"] = max_workers
thread_pool = ThreadPoolExecutor(
*thread_pool_bound_arguments.args,
**thread_pool_bound_arguments.kwargs,
)
grpc_server_bound_arguments.arguments["thread_pool"] = thread_pool
self._thread_pool = thread_pool
else:
self._thread_pool = grpc_server_bound_arguments.arguments["thread_pool"]
self._grpc_server_bound_arguments = grpc_server_bound_arguments
self._server = None
self._server_started = False
self._server_stopped = False
self.reinitialize_server()
[docs] def reinitialize_server(self):
if self._server is not None:
self.stop()
self.wait_for_termination()
self._server = grpc.server(
*self._grpc_server_bound_arguments.args,
**self._grpc_server_bound_arguments.kwargs,
)
self._server_started = False
self._server_stopped = False
KiwoomOpenApiPlusService_pb2_grpc.add_KiwoomOpenApiPlusServiceServicer_to_server(
self._servicer, self._server
)
if self._credentials is None:
if not is_in_private_network(self._host):
self.logger.warning(
"Adding insecure port %s to server, but the address is not private.",
self._address,
)
self._server.add_insecure_port(self._address)
else:
self._server.add_secure_port(self._address, self._credentials)
[docs] def get_host(self):
return self._host
[docs] def get_port(self):
return self._port
[docs] def start(self):
if self._server_started and self._server_stopped:
self.reinitialize_server()
if not self._server_started:
self._server.start()
self._server_started = True
[docs] def wait_for_termination(self, timeout=None):
return self._server.wait_for_termination(timeout)
[docs] def is_running(self):
return self.wait_for_termination(1)
[docs] def stop(self, grace=None):
event = self._server.stop(grace)
self._server_stopped = True
return event
def __getattr__(self, name):
return getattr(self._server, name)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
self.wait_for_termination()