Source code for koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceClient

import inspect

from concurrent.futures import ThreadPoolExecutor

import grpc

from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2_grpc
from koapy.backend.kiwoom_open_api_plus.grpc.KiwoomOpenApiPlusServiceClientStubWrapper import (
    KiwoomOpenApiPlusServiceClientStubWrapper,
)
from koapy.config import config


[docs]class KiwoomOpenApiPlusServiceClient: def __init__( self, host=None, port=None, credentials=None, thread_pool=None, check_timeout=None, **kwargs, ): if host is None: host = config.get_string( "koapy.backend.kiwoom_open_api_plus.grpc.host", "localhost" ) host = config.get_string( "koapy.backend.kiwoom_open_api_plus.grpc.client.host", host ) if port is None: port = config.get_int("koapy.backend.kiwoom_open_api_plus.grpc.port") port = config.get_int( "koapy.backend.kiwoom_open_api_plus.grpc.client.port", port ) if check_timeout is None: check_timeout = config.get_int( "koapy.backend.kiwoom_open_api_plus.grpc.client.is_ready.timeout", 10 ) self._host = host self._port = port self._credentials = credentials self._thread_pool = thread_pool self._check_timeout = check_timeout self._kwargs = kwargs self._target = self._host + ":" + str(self._port) if self._credentials is None: channel_signature = inspect.signature(grpc.insecure_channel) channel_params = list(channel_signature.parameters.keys()) channel_kwargs = { k: v for k, v in self._kwargs.items() if k in channel_params } channel_bound_arguments = channel_signature.bind_partial(**channel_kwargs) channel_bound_arguments.arguments["target"] = self._target self._channel = grpc.insecure_channel( *channel_bound_arguments.args, **channel_bound_arguments.kwargs, ) else: channel_signature = inspect.signature(grpc.secure_channel) channel_params = list(channel_signature.parameters.keys()) channel_kwargs = { k: v for k, v in self._kwargs.items() if k in channel_params } channel_bound_arguments = channel_signature.bind_partial(**channel_kwargs) channel_bound_arguments.arguments["target"] = self._target channel_bound_arguments.arguments["credentials"] = self._credentials self._channel = grpc.secure_channel( *channel_bound_arguments.args, **channel_bound_arguments.kwargs, ) if self._thread_pool is None: thread_pool_signature = inspect.signature(ThreadPoolExecutor) thread_pool_params = list(thread_pool_signature.parameters.keys()) thread_pool_kwargs = { k: v for k, v in self._kwargs.items() if k in thread_pool_params } thread_pool_bound_arguments = thread_pool_signature.bind( **thread_pool_kwargs ) if thread_pool_bound_arguments.arguments.get("max_workers") is None: max_workers = config.get_int( "koapy.backend.kiwoom_open_api_plus.grpc.client.max_workers", 8, ) thread_pool_bound_arguments.arguments["max_workers"] = max_workers self._thread_pool = ThreadPoolExecutor( *thread_pool_bound_arguments.args, **thread_pool_bound_arguments.kwargs, ) self._stub = KiwoomOpenApiPlusService_pb2_grpc.KiwoomOpenApiPlusServiceStub( self._channel ) self._stub_wrapped = KiwoomOpenApiPlusServiceClientStubWrapper( self._stub, self._thread_pool )
[docs] def is_ready(self, timeout=None): if timeout is None: timeout = self._check_timeout try: grpc.channel_ready_future(self._channel).result(timeout=timeout) return True except grpc.FutureTimeoutError: return False
[docs] def get_grpc_stub(self): return self._stub
[docs] def get_stub(self): return self._stub_wrapped
[docs] def close(self): return self._channel.close()
def __getattr__(self, name): return getattr(self._stub_wrapped, name) def __enter__(self): assert self.is_ready(), "Client is not ready" return self def __exit__(self, exc_type, exc_value, traceback): self.close()