Source code for koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusRealEventHandler

import queue
import threading

from time import sleep

import grpc

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusError import (
    KiwoomOpenApiPlusError,
    KiwoomOpenApiPlusNegativeReturnCodeError,
)
from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusRealType import (
    KiwoomOpenApiPlusRealType,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlerForGrpc import (
    KiwoomOpenApiPlusEventHandlerForGrpc,
)
from koapy.backend.kiwoom_open_api_plus.utils.queue.QueueBasedBufferedIterator import (
    QueueBasedBufferedIterator,
)
from koapy.utils.itertools import chunk
from koapy.utils.logging.Logging import Logging


[docs]class KiwoomOpenApiPlusRealEventHandler(KiwoomOpenApiPlusEventHandlerForGrpc, Logging): _num_codes_per_screen = 100 _default_opt_type = "0" def __init__(self, control, request, context, screen_manager): super().__init__(control, context) self._request = request self._screen_manager = screen_manager self._screen_no = request.screen_no self._code_list = request.code_list self._fid_list = request.fid_list self._opt_type = request.opt_type self._infer_fids = request.flags.infer_fids self._readable_names = request.flags.readable_names self._fast_parse = request.flags.fast_parse self._code_lists = [ codes for codes in chunk(self._code_list, self._num_codes_per_screen) ] if len(self._screen_no) == 0: self._screen_nos = [None for i in range(len(self._code_lists))] elif len(self._screen_no) < len(self._code_lists): self.logger.warning("Given screen nos are not sufficient.") self._screen_nos = list(self._screen_no) + [ None for i in range(len(self._code_lists) - len(self._screen_no)) ] else: self._screen_nos = self._screen_no self._fid_list_joined = ";".join([str(fid) for fid in self._fid_list]) self._opt_type_final = self._opt_type or self._default_opt_type
[docs] def on_enter(self): for screen_no, code_list in zip(self._screen_nos, self._code_lists): code_list_joined = ";".join(code_list) screen_no = self._screen_manager.borrow_screen(screen_no) self.add_callback(self._screen_manager.return_screen, screen_no) for code in code_list: self.add_callback( self.control.SetRealRemove.queuedCall, screen_no, code ) KiwoomOpenApiPlusError.try_or_raise( self.control.SetRealReg.queuedCall( screen_no, code_list_joined, self._fid_list_joined, self._opt_type_final, ).result() )
[docs] def OnReceiveRealData(self, code, realtype, realdata): if code in self._code_list: response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealData" response.arguments.add().string_value = code response.arguments.add().string_value = realtype response.arguments.add().string_value = realdata if self._infer_fids: fids = KiwoomOpenApiPlusRealType.get_fids_by_realtype_name(realtype) else: fids = self._fid_list if self._readable_names: names = [ KiwoomOpenApiPlusRealType.Fid.get_name_by_fid(fid, str(fid)) for fid in fids ] else: names = [str(fid) for fid in fids] if self._infer_fids and self._fast_parse: values = realdata.split("\t") else: values = [self.control.GetCommRealData(code, fid) for fid in fids] assert len(names) == len(values) response.single_data.names.extend(names) response.single_data.values.extend(values) self.observer.on_next(response)
[docs] def OnEventConnect(self, errcode): if errcode < 0: error = KiwoomOpenApiPlusNegativeReturnCodeError(errcode) self.observer.on_error(error) return
[docs]class KiwoomOpenApiPlusBidirectionalRealEventHandler( KiwoomOpenApiPlusRealEventHandler, Logging ): def __init__(self, control, request_iterator, context, screen_manager): request = KiwoomOpenApiPlusService_pb2.RealRequest() super().__init__(control, request, context, screen_manager) self._request_iterator = request_iterator self._buffered_request_iterator = QueueBasedBufferedIterator( self._request_iterator ) self._fid_list = [] self._fid_list_joined = ";".join(str(fid) for fid in self._fid_list) self._screen_by_code = {} self._code_list_by_screen = {} self._code_list = [] self._request_iterator_consumer = None self._request_iterator_consumer_should_stop = False self._request_iterator_consumer_timeout = 2.0
[docs] def register_code(self, code, fid_list=None): if code in self._screen_by_code: screen_no = self._screen_by_code[code] opt_type = "1" else: screen_no = None opt_type = "0" for ( existing_screen_no, screen_code_list, ) in self._code_list_by_screen.items(): if len(screen_code_list) < self._num_codes_per_screen: screen_no = existing_screen_no opt_type = "1" break if screen_no is None: screen_no = self._screen_manager.borrow_screen() opt_type = "0" self._screen_by_code[code] = screen_no self._code_list_by_screen.setdefault(screen_no, []).append(code) self._code_list.append(code) if fid_list: fid_list_joined = ";".join(str(fid) for fid in fid_list) else: fid_list_joined = self._fid_list_joined self.logger.debug( "Registering code %s to screen %s with type %s", code, screen_no, opt_type ) def try_to_register(retry=2, timeout=3.0): retry_count = 0 def call(): KiwoomOpenApiPlusError.try_or_raise( self.control.SetRealReg.queuedCall( screen_no, code, fid_list_joined, opt_type ), except_callback=on_error, ) def on_error(e): nonlocal retry_count if isinstance(e, KiwoomOpenApiPlusError): error_message = ( f"Failed to register {code=}. Reason: {e} ({e.code})." ) else: error_message = f"Failed to register {code=}. Reason: {e}." if retry_count < retry: retry_count += 1 self.logger.warning( f"{error_message} Retrying ({retry_count}/{retry}) in {timeout} ..." ) else: self.logger.warning( f"{error_message} Kiwoom server does not allow register." ) return sleep(timeout) call() call() try_to_register()
[docs] def remove_code(self, code): if code in self._screen_by_code: screen_no = self._screen_by_code[code] self.logger.debug("Removing code %s from screen %s", code, screen_no) self.control.SetRealRemove.queuedCall(screen_no, code) self._screen_by_code.pop(code) self._code_list_by_screen[screen_no].remove(code) self._code_list.remove(code) else: self.logger.warning( "Given code %s is not in managed code list and cannot be removed", code )
[docs] def remove_all_codes(self): code_list = list( self._code_list ) # copy in order to prevent "modification while iteration" for code in code_list: self.remove_code(code)
[docs] def remove_all_screens(self): self.remove_all_codes() screen_nos = list(self._code_list_by_screen.keys()) for screen_no in screen_nos: self.control.DisconnectRealData( screen_no ) # ensure although already removed in remove_all_codes() self._screen_manager.return_screen(screen_no) code_list = self._code_list_by_screen.pop(screen_no) assert len(code_list) == 0
[docs] def consume_request_iterator(self): while not self._request_iterator_consumer_should_stop: try: request = self._buffered_request_iterator.next( timeout=self._request_iterator_consumer_timeout ) except queue.Empty: pass except grpc.RpcError: break else: if request.HasField("register_request"): code_list = request.register_request.code_list fid_list = request.register_request.fid_list for code in code_list: self.register_code(code, fid_list) elif request.HasField("remove_request"): code_list = request.remove_request.code_list for code in code_list: self.remove_code(code) elif request.HasField("stop_request"): self.stop() break elif request.HasField("initialize_request"): self._fid_list = request.initialize_request.fid_list self._fid_list_joined = ";".join(str(fid) for fid in self._fid_list) self._infer_fids = request.initialize_request.flags.infer_fids self._infer_fids = True self._readable_names = ( request.initialize_request.flags.readable_names ) self._fast_parse = request.initialize_request.flags.fast_parse self.remove_all_codes() else: raise KiwoomOpenApiPlusError("Unexpected request")
[docs] def stop_request_iterator_consumer(self): if ( self._request_iterator_consumer is not None and self._request_iterator_consumer.is_alive() ): self._request_iterator_consumer_should_stop = True self._request_iterator_consumer.join()
[docs] def start_request_iterator_consumer(self): self.stop_request_iterator_consumer() self._request_iterator_consumer_should_stop = False self._request_iterator_consumer = threading.Thread( target=self.consume_request_iterator, daemon=True ) self._request_iterator_consumer.start()
[docs] def on_enter(self): self.start_request_iterator_consumer()
[docs] def on_exit(self, exc_type=None, exc_value=None, traceback=None): self.stop_request_iterator_consumer() self.remove_all_screens()
[docs] def OnReceiveRealData(self, code, realtype, realdata): if code in self._code_list: response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealData" response.arguments.add().string_value = code response.arguments.add().string_value = realtype response.arguments.add().string_value = realdata if self._infer_fids: fids = KiwoomOpenApiPlusRealType.get_fids_by_realtype_name(realtype) else: fids = self._fid_list if self._readable_names: names = [ KiwoomOpenApiPlusRealType.Fid.get_name_by_fid(fid, str(fid)) for fid in fids ] else: names = [str(fid) for fid in fids] if self._infer_fids and self._fast_parse: values = realdata.split("\t") else: values = [self.control.GetCommRealData(code, fid) for fid in fids] assert len(names) == len(values) response.single_data.names.extend(names) response.single_data.values.extend(values) self.observer.on_next(response)