Source code for koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusLoginEventHandler

from google.protobuf.json_format import MessageToDict

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusError import (
    KiwoomOpenApiPlusError,
    KiwoomOpenApiPlusNegativeReturnCodeError,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlerForGrpc import (
    KiwoomOpenApiPlusEventHandlerForGrpc,
)


[docs]class KiwoomOpenApiPlusLoginEventHandler(KiwoomOpenApiPlusEventHandlerForGrpc): def __init__(self, control, request, context): super().__init__(control, context) self._request = request if self._request.HasField("credentials"): self._credentials = self._request.credentials self._credentials = MessageToDict( self._credentials, preserving_proto_field_name=True ) else: self._credentials = None
[docs] def on_enter(self): if self._credentials is not None: self.control.DisableAutoLogin() KiwoomOpenApiPlusError.try_or_raise(self.control.CommConnect()) self.control.LoginUsingPywinauto(self._credentials) else: KiwoomOpenApiPlusError.try_or_raise(self.control.CommConnect())
[docs] def OnEventConnect(self, errcode): if errcode < 0: error = KiwoomOpenApiPlusNegativeReturnCodeError(errcode) self.observer.on_error(error) response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnEventConnect" response.arguments.add().long_value = errcode self.observer.on_next(response) self.observer.on_completed()