Source code for koapy.backtrader.SQLAlchemyData

import datetime

import pandas as pd

from backtrader import date2num
from backtrader.feed import DataBase
from sqlalchemy import MetaData, Table, create_engine, select


[docs]class SQLAlchemyData(DataBase): # pylint: disable=no-member
[docs] params = ( ("url", None), ("engine", None), ("connection", None), ("tablename", None), ("timestampcolumn", 0), ("timestampcolumntimezone", None), ("timestampcolumnsort", False), ("lazy", True), )
def __init__(self): super().__init__() assert self.p.url assert self.p.tablename self._engine = self.p.engine self._should_dispose = False self._cursor = None if self.p.timestampcolumntimezone is None: self.p.timestampcolumntimezone = datetime.datetime.now().astimezone().tzinfo self._started_already = False if not self.p.lazy: self.start() def _dispose_engine(self): if self._engine is not None and self._should_dispose: self._engine.dispose() self._engine = None def _initialize_engine(self): self._dispose_engine() self._engine = self.p.engine self._should_dispose = False if self._engine is None: if self.p.connection is not None: self._engine = create_engine( self.p.url, creator=lambda: self.p.connection ) self._should_dispose = False else: self._engine = create_engine(self.p.url) self._should_dispose = True assert self._engine is not None def _close_cursor(self): if self._cursor is not None: self._cursor.close() self._cursor = None def _initialize_cursor(self): self._close_cursor() metadata = MetaData() table = Table(self.p.tablename, metadata, autoload_with=self._engine) timestampcolumn = table.columns[ self.p.timestampcolumn ] # pylint: disable=unsubscriptable-object statement = select(table) if self.p.timestampcolumnsort: statement = statement.order_by(timestampcolumn) if self.p.fromdate is not None: fromdate = pd.Timestamp(self.p.fromdate, tz=self.p.timestampcolumntimezone) statement = statement.where(timestampcolumn >= fromdate) if self.p.todate is not None: todate = pd.Timestamp(self.p.todate, tz=self.p.timestampcolumntimezone) statement = statement.where(timestampcolumn <= todate) statement = statement.execution_options(stream_results=True) self._cursor = self._engine.execute(statement)
[docs] def start(self): if not self._started_already: self._initialize_engine() self._initialize_cursor() self._started_already = True
[docs] def stop(self): self._close_cursor() self._dispose_engine() self._started_already = False
def _load(self): if self._cursor is None: return False try: date, open_, high, low, close, volume, openinterest = next(self._cursor) except StopIteration: return False else: dt = pd.Timestamp(date) self.lines.datetime[0] = date2num(dt) self.lines.open[0] = open_ self.lines.high[0] = high self.lines.low[0] = low self.lines.close[0] = close self.lines.volume[0] = volume self.lines.openinterest[0] = openinterest return True