Source code for koapy.backend.daishin_cybos_plus.core.CybosPlusEntrypointMixin

import datetime
import logging
import subprocess
import time

import pandas as pd

from koapy.backend.daishin_cybos_plus.core.CybosPlusTypeLibSpec import INSTALLATION_PATH
from koapy.backend.daishin_cybos_plus.stub import CpSysDib, CpUtil, DsCbo1
from koapy.config import get_32bit_executable
from koapy.utils.ctypes import is_admin
from koapy.utils.itertools import chunk
from koapy.utils.logging.Logging import Logging
from koapy.utils.subprocess import function_to_subprocess_args


[docs]class CybosPlusEntrypointMixin(Logging):
[docs] def GetConnectState(self): cybos: CpUtil.CpCybos = self["CpUtil.CpCybos"] return cybos.IsConnect
@classmethod
[docs] def ConnectUsingPywinauto_Impl(cls, credentials=None): """ 아래 구현을 참고해 작성함 https://github.com/ippoeyeslhw/cppy/blob/master/cp_luncher.py """ import pywinauto is_in_development = False if credentials is None: from koapy.config import config credentials = config.get("koapy.backend.daishin_cybos_plus.credentials") userid = credentials.get("user_id") password = credentials.get("user_password") cert = credentials.get("cert_password") auto_account_password = credentials.get("auto_account_password") auto_cert_password = credentials.get("auto_cert_password") price_check_only = credentials.get("price_check_only") account_passwords = credentials.get("account_passwords") cls.logger.info("Starting CYBOS Starter application") app = pywinauto.Application(allow_magic_lookup=False) desktop = pywinauto.Desktop(allow_magic_lookup=False) starter_path = INSTALLATION_PATH / "STARTER" / "ncStarter.exe" starter_command = f"{starter_path} /prj:cp" app.start(starter_command) from koapy.utils.pywinauto import wait_any ask_stop = desktop.window(title="ncStarter") starter = desktop.window(title="CYBOS Starter") def wait_ask_stop_ready(): ask_stop.wait("ready", timeout=1) return 0 def wait_starter_ready(): starter.wait("ready", timeout=1) return 1 cls.logger.info("Waiting for CYBOS Starter login screen") try: index = wait_any( [ wait_ask_stop_ready, wait_starter_ready, ], timeout=30, ) except pywinauto.timings.TimeoutError: cls.logger.exception("Failed to find login screen") raise else: if index == 0: cls.logger.info("Existing program found, shutting down") if is_in_development: ask_stop.print_control_identifiers() if ask_stop["Static2"].window_text().endswith("종료하시겠습니까?"): ask_stop["Button1"].click() try: cls.logger.info( "Wating for possible failure message on shutdown" ) ask_stop.wait_not("ready", timeout=10) except pywinauto.timings.TimeoutError as err: cls.logger.error("Failed to shutdown existing program") if is_in_development: ask_stop.print_control_identifiers() if ask_stop["Static2"].window_text().endswith("종료할 수 없습니다."): ask_stop["Button"].click() raise RuntimeError("Cannot stop existing program") from err else: cls.logger.info("Successfully shutdown existing program") try: starter.wait("ready", timeout=30) except pywinauto.timings.TimeoutError: cls.logger.exception("Failed to find login screen") raise else: cls.logger.info("CYBOS Starter login screen found") elif index == 1: cls.logger.info("CYBOS Starter login screen found") if is_in_development: starter.print_control_identifiers() if userid: cls.logger.info("Putting user id") starter["Edit1"].set_text(userid) if password: cls.logger.info("Putting user password") starter["Edit2"].set_text(password) else: raise RuntimeError("No user password given") if not price_check_only: if cert: cls.logger.info("Putting cert password") starter["Edit3"].set_text(cert) else: raise RuntimeError("No cert password given") if price_check_only: if starter["Edit3"].is_enabled(): cls.logger.info("Checking price check only option") starter["Button6"].check_by_click() else: if not starter["Edit3"].is_enabled(): starter["Button6"].uncheck_by_click() if auto_account_password: cls.logger.info("Checking auto account password option") starter["Button4"].check() # check dosen't work try: confirm = desktop.window(title="대신증권") confirm.wait("ready", timeout=5) except pywinauto.timings.TimeoutError: pass else: if is_in_development: confirm.print_control_identifiers() confirm["Button"].click() else: starter["Button4"].uncheck() # uncheck dosen't work if auto_cert_password: cls.logger.info("Checking auto cert password option") starter["Button5"].check() # check dosen't work try: confirm = desktop.window(title="대신증권") confirm.wait("ready", timeout=5) except pywinauto.timings.TimeoutError: pass else: if is_in_development: confirm.print_control_identifiers() confirm["Button"].click() else: starter["Button5"].uncheck() # uncheck dosen't work cls.logger.info("Clicking login button") starter["Button1"].click() cls.logger.info("Setting account passwords if necessary") should_stop = False while not should_stop: try: cls.logger.info("Waiting for account password setting screen") account_password = desktop.window(title="종합계좌 비밀번호 확인 입력") account_password.wait("ready", timeout=5) except pywinauto.timings.TimeoutError: cls.logger.info("Account password setting screen not found") should_stop = True else: cls.logger.info("Account password setting screen found") if is_in_development: account_password.print_control_identifiers() account_no = ( account_password["Static"].window_text().split(":")[-1].strip() ) if account_no in account_passwords: account_password["Edit"].set_text(account_passwords[account_no]) else: raise RuntimeError( "No account password given for account %s" % account_no ) account_password["Button1"].click() try: cls.logger.info("Waiting for starter screen to be closed") starter.wait_not("visible", timeout=60) except pywinauto.timings.TimeoutError: cls.logger.warning("Could not wait for starter screen to be closed") else: cls.logger.info("Starter screen is closed") try: cls.logger.info("Waiting for notice window") notice = desktop.window(title="공지사항") notice.wait("ready", timeout=60) except pywinauto.timings.TimeoutError: cls.logger.info("No notice window found") else: cls.logger.info("Closing notice window") notice.close() try: cls.logger.info("Waiting for the notice window to be closed") notice.wait_not("visible", timeout=60) except pywinauto.timings.TimeoutError: cls.logger.warning("Could not wait for notice screen to be closed") else: cls.logger.info("Notice window is closed")
@classmethod
[docs] def ConnectUsingPywinauto_RunScriptInSubprocess(cls, credentials=None): import json def main(): import json import sys from koapy.backend.daishin_cybos_plus.core.CybosPlusEntrypoint import ( CybosPlusEntrypoint, ) credentials = json.load(sys.stdin) CybosPlusEntrypoint.ConnectUsingPywinauto_Impl(credentials) executable = get_32bit_executable() args = function_to_subprocess_args(main, executable=executable) return subprocess.run( args, input=json.dumps(credentials), text=True, check=True, )
[docs] def ConnectUsingPywinauto(self, credentials=None): return self.ConnectUsingPywinauto_RunScriptInSubprocess(credentials)
[docs] def Connect(self, credentials=None): assert is_admin(), "Connect() method requires to be run as administrator" self.ConnectUsingPywinauto(credentials) if self.GetConnectState() == 0: self.logger.error("Failed to start and connect to CYBOS Plus") else: self.logger.info("Succesfully connected to CYBOS Plus") return self.GetConnectState()
[docs] def CommConnect(self, credentials=None): return self.Connect(credentials)
[docs] def EnsureConnected(self, credentials=None): errcode = 0 if self.GetConnectState() == 0: self.Connect(credentials) if self.GetConnectState() == 0: raise RuntimeError("CYBOS Plus is not running, please start CYBOS Plus") return errcode
[docs] def GetCodeListByMarketAsList(self, market): """ 0: 구분없음 1: 거래소 2: 코스닥 3: 프리보드 4: KRX """ codemgr: CpUtil.CpCodeMgr = self["CpUtil.CpCodeMgr"] codes = codemgr.GetStockListByMarket(market) codes = list(codes) return codes
[docs] def GetKospiCodeList(self): codes = self.GetCodeListByMarketAsList(1) return codes
[docs] def GetKosdaqCodeList(self): codes = self.GetCodeListByMarketAsList(2) return codes
[docs] def GetGeneralCodeList( self, include_preferred_stock=False, include_etn=False, include_etf=False, include_mutual_fund=False, include_reits=False, include_kosdaq=False, ): codes = self.GetCodeListByMarketAsList(1) codemgr: CpUtil.CpCodeMgr = self["CpUtil.CpCodeMgr"] if not include_preferred_stock: codes = [code for code in codes if code.endswith("0")] if not include_etn: codes = [code for code in codes if not code.startswith("Q")] if not include_etf: codes = [ code for code in codes if codemgr.GetStockSectionKind(code) not in [10, 12] ] if not include_mutual_fund: codes = [ code for code in codes if codemgr.GetStockSectionKind(code) not in [2] ] if not include_reits: codes = [ code for code in codes if codemgr.GetStockSectionKind(code) not in [3] ] if include_kosdaq: codes += self.GetCodeListByMarketAsList(2) codes = sorted(codes) return codes
[docs] def GetStockDataAsDataFrame( self, code, chart_type, interval, start_date=None, end_date=None, adjusted_price=False, adjustement_only=False, ): """ http://cybosplus.github.io/cpsysdib_rtf_1_/stockchart.htm """ chart: CpSysDib.StockChart = self["CpSysDib.StockChart"] cybos: CpUtil.CpCybos = self["CpUtil.CpCybos"] needs_time = chart_type in ["m", "T"] if len(code) == 6 and not code.startswith("A"): code = "A" + code fids = [0] if needs_time: fids += [1] if not adjustement_only: fids += [2, 3, 4, 5, 8, 9] else: assert chart_type == "D" adjusted_price = True if adjusted_price and chart_type == "D": fids += [18, 19] sorted_fids = sorted(fids) field_indexes = [sorted_fids.index(i) for i in fids] """ maximum_value_count = 20000 num_fis = len(fids) expected_count = math.floor(maximum_value_count / num_fids) - 1 request_count = expected_count + 1 """ date_format_arg = "%Y%m%d%H%M%S" date_format_input = "%Y%m%d" if start_date is not None: if isinstance(start_date, str): start_date_len = len(start_date) if start_date_len == 14: start_date = datetime.datetime.strptime(start_date, date_format_arg) elif start_date_len == 8: start_date = datetime.datetime.strptime( start_date, date_format_input ) else: raise ValueError if not isinstance(start_date, datetime.datetime): raise ValueError start_date = int(start_date.strftime(date_format_input)) else: start_date = 0 if end_date is not None: if isinstance(end_date, str): end_date_len = len(end_date) if end_date_len == 14: end_date = datetime.datetime.strptime(end_date, date_format_arg) elif end_date_len == 8: end_date = datetime.datetime.strptime(end_date, date_format_input) else: raise ValueError if not isinstance(end_date, datetime.datetime): raise ValueError else: end_date = datetime.datetime.min end_date = int(end_date.strftime(date_format_input)) chart.SetInputValue(0, code) chart.SetInputValue(1, ord("1")) chart.SetInputValue(2, start_date) chart.SetInputValue(3, end_date) chart.SetInputValue(5, fids) chart.SetInputValue(6, ord(chart_type)) chart.SetInputValue(7, int(interval)) chart.SetInputValue(9, ord("1") if adjusted_price else ord("0")) dataframes = [] should_stop = False while not should_stop: limit_remain_count = cybos.GetLimitRemainCount(1) if limit_remain_count == 0: limit_remain_time_millis = cybos.GetLimitRemainTime(1) if limit_remain_time_millis > 0: limit_remain_time_seconds = limit_remain_time_millis / 1000 self.logger.debug( "Sleeping for %f seconds", limit_remain_time_seconds ) time.sleep(limit_remain_time_seconds) chart.BlockRequest() records = [] received_count = chart.GetHeaderValue(3) for i in range(received_count): record = [chart.GetDataValue(j, i) for j in field_indexes] records.append(record) names = chart.GetHeaderValue(2) names = [names[i] for i in field_indexes] df = pd.DataFrame.from_records(records, columns=names) if df.shape[0] > 0 and self.logger.isEnabledFor(logging.DEBUG): if needs_time: dates = df["날짜"].astype(int).astype(str) times = df["시간"].astype(int).astype(str).str.rjust(4, "0") datetimes = dates.str.cat(times) datetimes = pd.to_datetime(datetimes, format="%Y%m%d%H%M") else: dates = df["날짜"].astype(int).astype(str) datetimes = dates datetimes = pd.to_datetime(datetimes, format="%Y%m%d") self.logger.debug( "Received data from %s to %s for code %s", datetimes.iloc[0], datetimes.iloc[-1], code, ) should_stop = not chart.Continue dataframes.append(df) df = pd.concat(dataframes) df.reset_index(drop=True, inplace=True) return df
[docs] def GetDailyStockDataAsDataFrame( self, code, start_date=None, end_date=None, adjusted_price=False ): return self.GetStockDataAsDataFrame( code, "D", 1, start_date, end_date, adjusted_price=adjusted_price )
[docs] def GetMinuteStockDataAsDataFrame( self, code, interval, start_date=None, end_date=None, adjusted_price=False ): return self.GetStockDataAsDataFrame( code, "m", interval, start_date, end_date, adjusted_price=adjusted_price )
[docs] def GetDailyAdjustmentRatioAsDataFrame(self, code, start_date=None, end_date=None): return self.GetStockDataAsDataFrame( code, "D", 1, start_date, end_date, adjustement_only=True )
[docs] def GetCurrentStockDataAsDataFrame(self, codes): """ http://cybosplus.github.io/cpdib_rtf_1_/stockmst2.htm """ stock: DsCbo1.StockMst2 = self["DsCbo1.StockMst2"] cybos: CpUtil.CpCybos = self["CpUtil.CpCybos"] names = [ "종목코드", "종목명", "시간", "현재가", "전일대비", "상태구분", "시가", "고가", "저가", "매도호가", "매수호가", "거래량", "거래대금", "총매도잔량", "총매수잔량", "매도잔량", "매수잔량", "상장주식수", "외국인보유비율", "전일종가", "전일거래량", "체결강도", "순간체결량", "체결가비교", "호가비교", "동시호가구분", "예상체결가", "예상체결가 전일대비", "예상체결가 상태구분", "예상체결가 거래량", ] fids = list(range(len(names))) dataframes = [] for code_chunk in chunk(codes, 110): stock.SetInputValue(0, ",".join(code_chunk)) limit_remain_count = cybos.GetLimitRemainCount(1) if limit_remain_count == 0: limit_remain_time_millis = cybos.GetLimitRemainTime(1) if limit_remain_time_millis > 0: limit_remain_time_seconds = limit_remain_time_millis / 1000 self.logger.debug( "Sleeping for %f seconds", limit_remain_time_seconds ) time.sleep(limit_remain_time_seconds) stock.BlockRequest() received_count = stock.GetHeaderValue(0) records = [] for i in range(received_count): record = [stock.GetDataValue(j, i) for j in fids] records.append(record) df = pd.DataFrame.from_records(records, columns=names) dataframes.append(df) df = pd.concat(dataframes) df.reset_index(drop=True, inplace=True) return df