Source code for koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusOrderEventHandler

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusError import (
    KiwoomOpenApiPlusError,
    KiwoomOpenApiPlusNegativeReturnCodeError,
)
from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusRealType import (
    KiwoomOpenApiPlusRealType,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlerForGrpc import (
    KiwoomOpenApiPlusEventHandlerForGrpc,
)
from koapy.utils.logging.Logging import Logging


[docs]class KiwoomOpenApiPlusBaseOrderEventHandler( KiwoomOpenApiPlusEventHandlerForGrpc, Logging ): def __init__(self, control, context): super().__init__(control, context) self._single_names = ["주문번호"] self._multi_names = [] self._is_stop_condition = lambda row: False self._inputs = {} self._listen_msg = True
[docs] def ResponseForOnReceiveMsg(self, scrnno, rqname, trcode, msg): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveMsg" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = msg return response
[docs] def OnReceiveMsg(self, scrnno, rqname, trcode, msg): if self._listen_msg: response = self.ResponseForOnReceiveMsg(scrnno, rqname, trcode, msg) self.observer.on_next(response) # 아래는 개발과정에서 확인 용도 # 키움에서도 경고하지만 메시지의 코드로 판단하는건 위험함 (게다가 모의투자만 해당) if trcode == "KOA_NORMAL_BUY_KP_ORD": if msg == "[00Z218] 모의투자 장종료 상태입니다": self.logger.warning("Market is closed") elif msg == "[00Z217] 모의투자 장시작전입니다": self.logger.warning("Market is before opening") elif msg == "[00Z237] 모의투자 단가를 입력하지 않는 호가입니다.": self.logger.warning("Price is given but should not") elif msg == "[00Z112] 모의투자 정상처리 되었습니다": self.logger.debug("Order processed successfully") elif trcode == "KOA_NORMAL_KP_CANCEL": if msg == "[00Z924] 모의투자 취소수량이 취소가능수량을 초과합니다": self.logger.warning("Not enough amount to cancel")
[docs] def ResponseForOnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, datalength, errorcode, message, splmmsg, ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrData" response.arguments.add().string_value = scrnno response.arguments.add().string_value = rqname response.arguments.add().string_value = trcode response.arguments.add().string_value = recordname response.arguments.add().string_value = prevnext should_stop = prevnext in ["", "0"] # pylint: disable=unused-variable repeat_cnt = self.control.GetRepeatCnt(trcode, recordname) if len(self._single_names) > 0: values = [ self.control.GetCommData(trcode, recordname, 0, name).strip() for name in self._single_names ] response.single_data.names.extend(self._single_names) response.single_data.values.extend(values) if repeat_cnt > 0 and len(self._multi_names) > 0: rows = [ [ self.control.GetCommData(trcode, recordname, i, name).strip() for name in self._multi_names ] for i in range(repeat_cnt) ] response.multi_data.names.extend(self._multi_names) for row in rows: if self._is_stop_condition(row): should_stop = True break response.multi_data.values.add().values.extend(row) return response
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, datalength, errorcode, message, splmmsg, ): order_no = self.control.GetCommData(trcode, recordname, 0, "주문번호").strip() if order_no: response = self.ResponseForOnReceiveTrData( scrnno, rqname, trcode, recordname, prevnext, datalength, errorcode, message, splmmsg, ) self.observer.on_next(response) should_stop = prevnext in ["", "0"] if not should_stop: self.logger.warning("Unexpected to have prevnext for order tr data.")
[docs] def ResponseForOnReceiveChejanData(self, gubun, itemcnt, fidlist): fids = fidlist.rstrip(";") fids = fids.split(";") if fids else [] fids = [int(fid) for fid in fids] assert itemcnt == len(fids) names = [ KiwoomOpenApiPlusRealType.Fid.get_name_by_fid(fid, str(fid)) for fid in fids ] values = [self.control.GetChejanData(fid).strip() for fid in fids] response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveChejanData" response.arguments.add().string_value = gubun response.arguments.add().long_value = itemcnt response.arguments.add().string_value = fidlist response.single_data.names.extend(names) response.single_data.values.extend(values) return response
[docs] def OnReceiveChejanData(self, gubun, itemcnt, fidlist): response = self.ResponseForOnReceiveChejanData(gubun, itemcnt, fidlist) self.observer.on_next(response)
[docs] def OnEventConnect(self, errcode): if errcode < 0: error = KiwoomOpenApiPlusNegativeReturnCodeError(errcode) self.observer.on_error(error) return
[docs]class KiwoomOpenApiPlusAllOrderEventHandler(KiwoomOpenApiPlusBaseOrderEventHandler): pass
[docs]class KiwoomOpenApiPlusOrderEventHandler( KiwoomOpenApiPlusBaseOrderEventHandler, Logging ): def __init__(self, control, request, context, screen_manager): super().__init__(control, context) self._request = request self._screen_manager = screen_manager self._rqname = request.request_name self._scrnno = request.screen_no self._accno = request.account_no self._ordertype = request.order_type self._code = request.code self._qty = request.quantity self._price = request.price self._hogagb = request.quote_type self._orgorderno = request.original_order_no self._order_no = None self._should_stop = False
[docs] def on_enter(self): self._scrnno = self._screen_manager.borrow_screen(self._scrnno) self.add_callback(self._screen_manager.return_screen, self._scrnno) self.add_callback(self.control.DisconnectRealData, self._scrnno) KiwoomOpenApiPlusError.try_or_raise( self.control.RateLimitedSendOrder.queuedCall( self._rqname, self._scrnno, self._accno, self._ordertype, self._code, self._qty, self._price, self._hogagb, self._orgorderno, ), except_callback=self.observer.on_error, )
[docs] def OnReceiveMsg(self, scrnno, rqname, trcode, msg): if (rqname, scrnno) == (self._rqname, self._scrnno): response = self.ResponseForOnReceiveMsg(scrnno, rqname, trcode, msg) self.observer.on_next(response)
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, datalength, errorcode, message, splmmsg, ): if (rqname, scrnno) == (self._rqname, self._scrnno): response = self.ResponseForOnReceiveTrData( scrnno, rqname, trcode, recordname, prevnext, datalength, errorcode, message, splmmsg, ) self.observer.on_next(response) self._order_no = self.control.GetCommData( trcode, recordname, 0, "주문번호" ).strip() if not self._order_no: e = KiwoomOpenApiPlusError("Cannot specify order no") self.observer.on_error(e) return should_stop = prevnext in ["", "0"] if not should_stop: self.logger.warning("Unexpected to have prevnext for order tr data.")
[docs] def OnReceiveChejanData(self, gubun, itemcnt, fidlist): # TODO: 정정 케이스에 대해 테스트 해보지 않음 # TODO: 취소를 취소하는 케이스 같은건 고려하지 않음 # TODO: 서로 같은 원주문을 정정 혹은 취소하는 케이스 사이에는 이벤트 전파가 필요할지 모르겠음 accno = self.control.GetChejanData(9201).strip() code = self.control.GetChejanData(9001).strip() if accno == self._accno and code.endswith( self._code ): # code 비교시에 앞에 prefix 가 붙어오기 때문에 endswith 으로 비교해야됨 if gubun == "0": # 접수와 체결시 (+ 취소 확인) order_no = self.control.GetChejanData(9203).strip() original_order_no = self.control.GetChejanData(904).strip() status = self.control.GetChejanData(913).strip() scrnno = self.control.GetChejanData(920).strip() is_last = self.control.GetChejanData(819).strip() == "1" if order_no in [self._order_no, self._orgorderno] or self._order_no in [ order_no, original_order_no, ]: response = self.ResponseForOnReceiveChejanData( gubun, itemcnt, fidlist ) self.observer.on_next(response) if ( order_no == self._order_no ): # 자기 주문 처리하는 입장 OR 취소 및 정정 당한 뒤 원주문 정보를 받는 입장 if is_last and self._should_stop: # 취소 확인 이후 원주문 정보 받고 종료 (타) self.observer.on_completed() return elif status == "접수": pass elif status == "체결": orders_left = self.control.GetChejanData(902).strip() orders_left = int(orders_left) if orders_left.isdigit() else 0 if orders_left == 0: self._should_stop = True # 미체결수량이 더 이상 없다면 이후 잔고 이벤트 후 종료 elif status == "확인": self._should_stop = True # 취소 확인 이후 원주문 정보 받고 종료 (자) else: e = KiwoomOpenApiPlusError( "Unexpected order status: %s" % status ) self.observer.on_error(e) return elif order_no == self._orgorderno: # 취소하는 입장에서 원주문 정보 받는 케이스 if is_last and self._should_stop: # 취소 확인 이후 원주문 정보 받고 종료 (자) self.observer.on_completed() return elif status in ["접수", "체결"]: pass else: e = KiwoomOpenApiPlusError( "Unexpected order status: %s" % status ) self.observer.on_error(e) return elif self._order_no == original_order_no: # 취소 혹은 정정 당하는 케이스 if status == "접수": pass elif status == "확인": self._should_stop = True # 취소 확인 이후 원주문 정보 받고 종료 (타) else: e = KiwoomOpenApiPlusError( "Unexpected order status: %s" % status ) self.observer.on_error(e) return elif gubun == "1": # 국내주식 잔고전달 response = self.ResponseForOnReceiveChejanData(gubun, itemcnt, fidlist) self.observer.on_next(response) if self._should_stop: # 미체결수량이 더 이상 없다면 잔고 이벤트 후 종료 self.observer.on_completed() return elif gubun == "4": # 파생 잔고전달 response = self.ResponseForOnReceiveChejanData(gubun, itemcnt, fidlist) self.observer.on_next(response) if self._should_stop: # 미체결수량이 더 이상 없다면 잔고 이벤트 후 종료 self.observer.on_completed() return else: e = KiwoomOpenApiPlusError("Unexpected gubun value: %s" % gubun) self.observer.on_error(e) return