Source code for koapy.utils.store.SQLiteFileLibrary

import pandas as pd
import pytz

from sqlalchemy import create_engine, inspect, select
from sqlalchemy.schema import DropTable, MetaData, Table

from .sqlalchemy.Timestamp import Timestamp


[docs]class SQLiteFileLibrary: def __init__(self, filename): self._engine = create_engine("sqlite:///" + filename) self._inspector = inspect(self._engine)
[docs] def list_symbols(self): return self._inspector.get_table_names()
[docs] def has_symbol(self, symbol): return self._inspector.has_table(symbol)
[docs] def write(self, symbol, data): index_original_names = [name for name in data.index.names] index_table_names = [ name if name is not None else "index_%d" % i for i, name in enumerate(data.index.names) ] datetime_columns = [ (name, dtype) for name, dtype in data.dtypes.items() if pd.api.types.is_datetime64_any_dtype(dtype) ] column_timezones = { name: str(dtype.tz) for name, dtype in datetime_columns if hasattr(dtype, "tz") } index_label = index_table_names dtype = { name: Timestamp(getattr(dtype, "tz", False)) for name, dtype in datetime_columns } index_col = index_table_names parse_dates = [name for name, dtype in datetime_columns] pandas_metadata = { "read_sql_table": { "index_col": index_col, "parse_dates": parse_dates, "index_names": index_original_names, "column_timezones": column_timezones, }, } to_sql_kwargs = { "index_label": index_label, "dtype": dtype, "if_exists": "replace", } data.to_sql(symbol, self._engine, **to_sql_kwargs) return data
[docs] def read_as_dataframe( self, symbol, time_column=None, start_time=None, end_time=None ): index_col = ["Date"] parse_dates = ["Date"] read_sql_table_kwargs = { "index_col": index_col, "parse_dates": parse_dates, } data = pd.read_sql_table(symbol, self._engine, **read_sql_table_kwargs) index_names = index_col index_names = tuple(index_names) if len(data.index.names) == 1: index_names = index_names[0] data.index.rename(index_names, inplace=True) column_timezones = {} for column_name in parse_dates: timezone = column_timezones.get(column_name, Timestamp.local_timezone) if isinstance(timezone, str): timezone = pytz.timezone(timezone) data[column_name] = ( data[column_name].dt.tz_localize(Timestamp.utc).dt.tz_convert(timezone) ) if column_name not in column_timezones: data[column_name] = data[column_name].dt.tz_localize(None) return data
[docs] def read_as_cursor(self, symbol, time_column=None, start_time=None, end_time=None): records = Table(symbol, MetaData(), autoload_with=self._engine) statement = select(records) if time_column is not None: time_column = records.columns[ time_column ] # pylint: disable=unsubscriptable-object statement = statement.order_by(time_column) if start_time is not None: start_time = pd.Timestamp(start_time) if time_column is None: time_column = 0 time_column = records.columns[ time_column ] # pylint: disable=unsubscriptable-object statement = statement.order_by(time_column) if Timestamp.is_naive(start_time): start_time = start_time.tz_localize(Timestamp.local_timezone) start_time = start_time.astimezone(Timestamp.utc) statement = statement.where( time_column >= start_time ) # pylint: disable=unsubscriptable-object if end_time is not None: end_time = pd.Timestamp(end_time) if time_column is None: time_column = 0 time_column = records.columns[ time_column ] # pylint: disable=unsubscriptable-object statement = statement.order_by(time_column) if Timestamp.is_naive(end_time): end_time = end_time.tz_localize(Timestamp.local_timezone) end_time = end_time.astimezone(Timestamp.utc) statement = statement.where( time_column <= end_time ) # pylint: disable=unsubscriptable-object data = self._engine.execute(statement) return data
[docs] def read(self, *args, **kwargs): return self.read_as_dataframe(*args, **kwargs)
[docs] def delete(self, symbol): table = Table(symbol, MetaData(), autoload_with=self._engine) self._engine.execute(DropTable(table), if_exists=True)