Source code for koapy.backtrader.KiwoomOpenApiPlusEventStreamer

import datetime
import threading

import rx

from exchange_calendars import get_calendar
from rx import operators as ops
from rx.core.typing import Observer
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusRealType import (
    KiwoomOpenApiPlusRealType,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.utils.queue.QueueBasedBufferedIterator import (
    QueueBasedBufferedIterator,
)
from koapy.backend.kiwoom_open_api_plus.utils.queue.QueueBasedIterableObserver import (
    QueueBasedIterableObserver,
)
from koapy.utils.logging.Logging import Logging


[docs]class KiwoomOpenApiPlusPriceEventChannel(Logging): _krx_timezone = get_calendar("XKRX").tz def __init__(self, stub): self._stub = stub self._fid_list = KiwoomOpenApiPlusRealType.get_fids_by_realtype_name("주식시세") self._request_observer = QueueBasedIterableObserver() self._request_iterator = iter(self._request_observer) self._response_iterator = self._stub.BidirectionalRealCall( self._request_iterator ) self._response_subject = Subject() self._response_scheduler_max_workers = 8 self._response_scheduler = ThreadPoolScheduler( self._response_scheduler_max_workers ) self._buffered_response_iterator = QueueBasedBufferedIterator( self._response_iterator ) self._response_observable = rx.from_iterable( self._buffered_response_iterator, self._response_scheduler ) self._response_subscription = self._response_observable.subscribe( self._response_subject ) self._subjects_by_code = {} self.initialize()
[docs] def close(self): for _code, (_subject, subscription) in self._subjects_by_code.items(): subscription.dispose() self._response_subscription.dispose() self._buffered_response_iterator.stop() self._response_iterator.cancel()
def __del__(self): self.close()
[docs] def initialize(self): request = KiwoomOpenApiPlusService_pb2.BidirectionalRealRequest() request.initialize_request.fid_list.extend( self._fid_list ) # pylint: disable=no-member self._request_observer.on_next(request)
[docs] def register_code(self, code): request = KiwoomOpenApiPlusService_pb2.BidirectionalRealRequest() code_list = [code] fid_list = KiwoomOpenApiPlusRealType.get_fids_by_realtype_name("주식시세") request.register_request.code_list.extend( code_list ) # pylint: disable=no-member request.register_request.fid_list.extend(fid_list) # pylint: disable=no-member self._request_observer.on_next(request) self.logger.debug("Registering code %s for real events", code)
[docs] def is_for_code(self, response, code): return response.arguments[0].string_value == code
[docs] def filter_for_code(self, code): return ops.filter(lambda response: self.is_for_code(response, code))
[docs] def is_valid_price_event(self, response): return all(name in response.single_data.names for name in ["20", "27", "28"])
[docs] def filter_price_event(self): return ops.filter(self.is_valid_price_event)
[docs] def time_to_timestamp(self, fid20): dt = datetime.datetime.now(self._krx_timezone).date() tm = datetime.datetime.strptime(fid20, "%H%M%S").time() dt = datetime.datetime.combine(dt, tm) dt = self._krx_timezone.localize(dt) return dt.timestamp() * (10**6)
[docs] def event_to_dict(self, response): single_data = dict(zip(response.single_data.names, response.single_data.values)) result = { "time": self.time_to_timestamp(single_data["20"]), "bid": abs(float(single_data["28"])), "ask": abs(float(single_data["27"])), } return result
[docs] def convert_to_dict(self): return ops.map(self.event_to_dict)
[docs] def get_observable_for_code(self, code): self.register_code(code) if code not in self._subjects_by_code: subject = Subject() subscription = self._response_subject.pipe( self.filter_for_code(code), self.filter_price_event(), self.convert_to_dict(), ).subscribe(subject) self._subjects_by_code[code] = (subject, subscription) subject, subscription = self._subjects_by_code[code] return subject
[docs]class KiwoomOpenApiPlusOrderEventChannel: def __init__(self, stub): self._stub = stub request = KiwoomOpenApiPlusService_pb2.ListenRequest() self._response_iterator = self._stub.OrderListen(request) self._response_subject = Subject() self._response_scheduler_max_workers = 8 self._response_scheduler = ThreadPoolScheduler( self._response_scheduler_max_workers ) self._buffered_response_iterator = QueueBasedBufferedIterator( self._response_iterator ) self._response_observable = rx.from_iterable( self._buffered_response_iterator, self._response_scheduler ) self._response_subscription = self._response_observable.subscribe( self._response_subject ) self._observable = Subject() self._subscription = self._response_subject.pipe( self.filter_chejan_response(), self.convert_to_dict(), ).subscribe(self._observable)
[docs] def close(self): self._subscription.dispose() self._response_subscription.dispose() self._buffered_response_iterator.stop() self._response_iterator.cancel()
def __del__(self): self.close()
[docs] def is_chejan_response(self, response): name = response.name gubun = response.arguments[0].string_value if name == "OnReceiveChejanData" and gubun == "0": data = dict(zip(response.single_data.names, response.single_data.values)) order_type = data["주문구분"] hoga_type = data["매매구분"] status = data["주문상태"] market_order_created_and_filled = ( order_type in ["+매수", "-매도"] and hoga_type in ["시장가"] and status in ["체결"] ) limit_order_created = ( order_type in ["+매수", "-매도"] and hoga_type in ["보통"] and status in ["접수"] and data["819"] in ["0"] ) # 취소 확인 뒤에 오는 원주문 이벤트는 819 가 1 로 들어오는 것 같음 order_filled = order_type in ["+매수", "-매도"] and status in ["체결"] order_canceled = order_type in ["매수취소", "매도취소"] and status in ["확인"] condition = any( [ market_order_created_and_filled, limit_order_created, order_filled, order_canceled, ] ) return condition return False
[docs] def filter_chejan_response(self): return ops.filter(self.is_chejan_response)
[docs] def event_to_dict(self, response): result = {} data = dict(zip(response.single_data.names, response.single_data.values)) original_order_no = data["원주문번호"] if int(original_order_no): status = data["주문상태"] if status == "확인": reject_reason = data["거부사유"] result = { "type": "ORDER_CANCEL", "orderId": original_order_no, "reason": reject_reason if int(reject_reason) else "ORDER_FILLED", } else: status = data["주문상태"] hoga_type = data["매매구분"] if status == "접수": if hoga_type == "보통": order_no = data["주문번호"] result = { "type": "LIMIT_ORDER_CREATE", "id": order_no, } elif hoga_type == "시장가": pass else: self.logger.warning("Unexpected hoga type %s", hoga_type) elif status == "체결": if hoga_type == "보통": order_no = data["주문번호"] units = data["단위체결량"] buy_or_sell = data["주문구분"] side = { "+매수": "buy", "-매도": "sell", }[buy_or_sell] price = data["단위체결가"] result = { "type": "ORDER_FILLED", "orderId": order_no, "units": int(units), "side": side, "price": abs(float(price)), } elif hoga_type == "시장가": order_no = data["주문번호"] units = data["단위체결량"] buy_or_sell = data["주문구분"] side = { "+매수": "buy", "-매도": "sell", }[buy_or_sell] price = data["단위체결가"] result = { "type": "MARKET_ORDER_CREATE", "id": order_no, "tradeOpened": {"id": order_no}, "units": int(units), "side": side, "price": abs(float(price)), } else: self.logger.warning("Unexpected hoga type %s", hoga_type) else: self.logger.warning("Unexcpected status %s", status) return result
[docs] def convert_to_dict(self): return ops.map(self.event_to_dict)
[docs] def get_observable(self): return self._observable
[docs]class MetaKiwoomOpenApiPlusEventStreamer(type(Logging), type(Observer)): pass
[docs]class KiwoomOpenApiPlusEventStreamer( Observer, Logging, metaclass=MetaKiwoomOpenApiPlusEventStreamer ): _price_event_channels_by_stub = {} _order_event_channels_by_stub = {} _lock = threading.RLock() def __init__(self, stub, queue): super().__init__() self._stub = stub self._queue = queue
[docs] def on_next(self, value): self._queue.put(value)
[docs] def on_error(self, error): self.logger.error("Streamer.on_error(%s)", error)
[docs] def on_completed(self): pass
[docs] def rates(self, code): with self._lock: if self._stub not in self._price_event_channels_by_stub: self._price_event_channels_by_stub[ self._stub ] = KiwoomOpenApiPlusPriceEventChannel(self._stub) event_channel = self._price_event_channels_by_stub[self._stub] subscription = event_channel.get_observable_for_code(code).subscribe(self) self.logger.debug("Subscribing rates for code %s", code) return subscription
[docs] def events(self): with self._lock: if self._stub not in self._order_event_channels_by_stub: self._order_event_channels_by_stub[ self._stub ] = KiwoomOpenApiPlusOrderEventChannel(self._stub) event_channel = self._order_event_channels_by_stub[self._stub] subscription = event_channel.get_observable().subscribe(self) self.logger.debug("Subscribing order events") return subscription