Source code for koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlers

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusError import (
    KiwoomOpenApiPlusError,
)
from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusRealType import (
    KiwoomOpenApiPlusRealType,
)
from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusTrInfo import (
    KiwoomOpenApiPlusTrInfo,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlerForGrpc import (
    KiwoomOpenApiPlusEventHandlerForGrpc,
)
from koapy.utils.logging.Logging import Logging
from koapy.utils.notimplemented import isimplemented


[docs]class KiwoomOpenApiPlusLazyAllEventHandler( KiwoomOpenApiPlusEventHandlerForGrpc, Logging ):
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, _datalength, _errorcode, _message, _splmmsg, ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrData" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = recordname response.arguments.add().string_value = prevnext self.observer.on_next(response)
[docs] def OnReceiveRealData(self, code, realtype, realdata): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealData" response.arguments.add().string_value = code response.arguments.add().string_value = realtype response.arguments.add().string_value = realdata self.observer.on_next(response)
[docs] def OnReceiveMsg(self, scrnno, rqname, trcode, msg): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveMsg" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = msg self.observer.on_next(response)
[docs] def OnReceiveChejanData(self, gubun, itemcnt, fidlist): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveChejanData" response.arguments.add().string_value = gubun response.arguments.add().long_value = itemcnt response.arguments.add().string_value = fidlist self.observer.on_next(response)
[docs] def OnEventConnect(self, errcode): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnEventConnect" response.arguments.add().long_value = errcode self.observer.on_next(response)
[docs] def OnReceiveRealCondition( self, code, condition_type, condition_name, condition_index ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealCondition" response.arguments.add().string_value = code response.arguments.add().string_value = condition_type response.arguments.add().string_value = condition_name response.arguments.add().string_value = condition_index self.observer.on_next(response)
[docs] def OnReceiveTrCondition( self, scrnno, codelist, condition_name, condition_index, prevnext ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrCondition" response.arguments.add().string_value = scrnno response.arguments.add().string_value = codelist response.arguments.add().string_value = condition_name response.arguments.add().long_value = condition_index response.arguments.add().long_value = prevnext self.observer.on_next(response)
[docs] def OnReceiveConditionVer(self, ret, msg): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveConditionVer" response.arguments.add().long_value = ret response.arguments.add().string_value = msg self.observer.on_next(response)
[docs]class KiwoomOpenApiPlusEagerAllEventHandler( KiwoomOpenApiPlusEventHandlerForGrpc, Logging ):
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, _datalength, _errorcode, _message, _splmmsg, ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrData" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = recordname response.arguments.add().string_value = prevnext repeat_cnt = self.control.GetRepeatCnt(trcode, recordname) trinfo = KiwoomOpenApiPlusTrInfo.get_trinfo_by_code(trcode) if trinfo is None: self.logger.error("Cannot find names for trcode %s", trinfo) single_names = trinfo.get_single_output_names() multi_names = trinfo.get_multi_output_names() if len(single_names) > 0: values = [ self.control.GetCommData(trcode, recordname, 0, name).strip() for name in single_names ] response.single_data.names.extend(single_names) response.single_data.values.extend(values) if repeat_cnt > 0 and len(multi_names) > 0: rows = [ [ self.control.GetCommData(trcode, recordname, i, name).strip() for name in multi_names ] for i in range(repeat_cnt) ] response.multi_data.names.extend(multi_names) for row in rows: response.multi_data.values.add().values.extend(row) self.observer.on_next(response)
[docs] def OnReceiveRealData(self, code, realtype, realdata): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealData" response.arguments.add().string_value = code response.arguments.add().string_value = realtype response.arguments.add().string_value = realdata fids = KiwoomOpenApiPlusRealType.get_fids_by_realtype_name(realtype) if fids is None: self.logger.error("Cannot find fids for realtype %s", realtype) names = [ KiwoomOpenApiPlusRealType.Fid.get_name_by_fid(fid, str(fid)) for fid in fids ] values = [self.control.GetCommRealData(code, fid) for fid in fids] assert len(names) == len(values) response.single_data.names.extend(names) response.single_data.values.extend(values) self.observer.on_next(response)
[docs] def OnReceiveMsg(self, scrnno, rqname, trcode, msg): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveMsg" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = msg self.observer.on_next(response)
[docs] def OnReceiveChejanData(self, gubun, itemcnt, fidlist): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveChejanData" response.arguments.add().string_value = gubun response.arguments.add().long_value = itemcnt response.arguments.add().string_value = fidlist fids = fidlist.rstrip(";") fids = fids.split(";") if fids else [] fids = [int(fid) for fid in fids] assert itemcnt == len(fids) names = [ KiwoomOpenApiPlusRealType.Fid.get_name_by_fid(fid, str(fid)) for fid in fids ] values = [self.control.GetChejanData(fid).strip() for fid in fids] response.single_data.names.extend(names) response.single_data.values.extend(values) self.observer.on_next(response)
[docs] def OnEventConnect(self, errcode): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnEventConnect" response.arguments.add().long_value = errcode self.observer.on_next(response)
[docs] def OnReceiveRealCondition( self, code, condition_type, condition_name, condition_index ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealCondition" response.arguments.add().string_value = code response.arguments.add().string_value = condition_type response.arguments.add().string_value = condition_name response.arguments.add().string_value = condition_index self.observer.on_next(response)
[docs] def OnReceiveTrCondition( self, scrnno, codelist, condition_name, condition_index, prevnext ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrCondition" response.arguments.add().string_value = scrnno response.arguments.add().string_value = codelist response.arguments.add().string_value = condition_name response.arguments.add().long_value = condition_index response.arguments.add().long_value = prevnext self.observer.on_next(response)
[docs] def OnReceiveConditionVer(self, ret, msg): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveConditionVer" response.arguments.add().long_value = ret response.arguments.add().string_value = msg self.observer.on_next(response)
[docs]class KiwoomOpenApiPlusAllEventHandler(KiwoomOpenApiPlusEagerAllEventHandler): pass
[docs]class KiwoomOpenApiPlusLazySomeEventHandler(KiwoomOpenApiPlusLazyAllEventHandler): def __init__(self, control, request, context): super().__init__(control, context) self._request = request
[docs] def slots(self): names = self.names() slots = [getattr(self, name) for name in names] names_and_slots_implemented = [ (name, slot) for name, slot in zip(names, slots) if isimplemented(slot) and name in self._request.slots ] return names_and_slots_implemented
[docs]class KiwoomOpenApiPlusEagerSomeEventHandler(KiwoomOpenApiPlusEagerAllEventHandler): def __init__(self, control, request, context): super().__init__(control, context) self._request = request
[docs] def slots(self): names = self.names() slots = [getattr(self, name) for name in names] names_and_slots_implemented = [ (name, slot) for name, slot in zip(names, slots) if isimplemented(slot) and name in self._request.slots ] return names_and_slots_implemented
[docs]class KiwoomOpenApiPlusSomeEventHandler(KiwoomOpenApiPlusEagerSomeEventHandler): pass
[docs]class KiwoomOpenApiPlusSomeBidirectionalEventHandler( KiwoomOpenApiPlusLazySomeEventHandler ): def __init__(self, control, request_iterator, context): self._request_iterator = request_iterator self._first_request = next(self._request_iterator) assert self._first_request.HasField("listen_request") self._request = request = self._first_request.listen_request super().__init__(control, request, context)
[docs] def await_handled(self): request = next(self._request_iterator) if request.HasField("handled_request"): pass elif request.HasField("stop_listen_request"): self.observer.on_completed() else: raise KiwoomOpenApiPlusError("Unexpected request")
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, _datalength, _errorcode, _message, _splmmsg, ): super().OnReceiveTrData( scrnno, rqname, trcode, recordname, prevnext, _datalength, _errorcode, _message, _splmmsg, ) self.await_handled()
[docs] def OnReceiveRealData(self, code, realtype, realdata): super().OnReceiveRealData(code, realtype, realdata) self.await_handled()
[docs] def OnReceiveMsg(self, scrnno, rqname, trcode, msg): super().OnReceiveMsg(scrnno, rqname, trcode, msg) self.await_handled()
[docs] def OnReceiveChejanData(self, gubun, itemcnt, fidlist): super().OnReceiveChejanData(gubun, itemcnt, fidlist) self.await_handled()
[docs] def OnEventConnect(self, errcode): super().OnEventConnect(errcode) self.await_handled()
[docs] def OnReceiveRealCondition( self, code, condition_type, condition_name, condition_index ): super().OnReceiveRealCondition( code, condition_type, condition_name, condition_index ) self.await_handled()
[docs] def OnReceiveTrCondition( self, scrnno, codelist, condition_name, condition_index, prevnext ): super().OnReceiveTrCondition( scrnno, codelist, condition_name, condition_index, prevnext ) self.await_handled()
[docs] def OnReceiveConditionVer(self, ret, msg): super().OnReceiveConditionVer(ret, msg) self.await_handled()