Source code for koapy.utils.store.sqlalchemy.Symbol

import pandas as pd

from sqlalchemy import Column, ForeignKey, Integer, String, UniqueConstraint, func
from sqlalchemy.orm import aliased, object_session, relationship
from sqlalchemy.orm.exc import NoResultFound

from .Base import Base
from .SnapshotAssociation import SnapshotAssociation
from .Version import Version


[docs]class Symbol(Base): __tablename__ = "symbols"
[docs] id = Column(Integer, primary_key=True)
[docs] name = Column(String, index=True, nullable=False)
[docs] library_id = Column(Integer, ForeignKey("libraries.id"), nullable=False)
[docs] library = relationship("Library", back_populates="symbols")
[docs] versions = relationship( "Version", back_populates="symbol", order_by="Version.version" )
__table_args__ = (UniqueConstraint("library_id", "name"),)
[docs] def get_versions(self, deleted=False): """ versions = self.versions if not deleted: versions = [version for version in versions if not version.deleted] """ session = object_session(self) versions = session.query(Version).with_parent(self) version = aliased(Version, alias=versions.subquery()) versions = session.query(version) if not deleted: versions = versions.filter(version.deleted != True) versions = versions.order_by(version.version) versions = versions.all() return versions
[docs] def get_latest_version(self, deleted=False): """ versions = self.versions if len(versions) == 0: raise NoResultFound latest_version = self.versions[-1] if not deleted and latest_version.deleted: raise NoResultFound """ session = object_session(self) latest_version = session.query(Version, func.max(Version.version)).with_parent( self ) version = aliased(Version, alias=latest_version.subquery()) latest_version = session.query(version) if not deleted: latest_version = latest_version.filter(version.deleted != True) latest_version = latest_version.one() return latest_version
[docs] def create_new_version( self, table_name=None, user_metadata=None, pandas_metadata=None, deleted=None ): try: latest_version = self.get_latest_version() except NoResultFound: next_version = 0 else: next_version = latest_version.version + 1 new_version = Version( version=next_version, table_name=table_name, user_metadata=user_metadata, pandas_metadata=pandas_metadata, deleted=deleted, ) self.versions.append(new_version) return new_version
[docs] def get_version_by_number(self, version_number, deleted=False): session = object_session(self) version = ( session.query(Version) .with_parent(self) .filter(Version.version == version_number) ) if not deleted: version = version.filter(Version.deleted != True) version = version.one() return version
[docs] def get_prunable_versions(self, keep_mins=120): session = object_session(self) versions = session.query(Version).with_parent(self) latest_version = self.get_latest_version(deleted=True) keep_mins_seconds = keep_mins * 60 latest_timestamp = latest_version.timestamp oldest_timestamp = latest_timestamp - pd.Timedelta(keep_mins_seconds, unit="s") prunable_verions = ( versions.outerjoin(SnapshotAssociation) .filter(SnapshotAssociation.version_id == None) .filter(Version.timestamp < oldest_timestamp) ) prunable_verions = prunable_verions.all() return prunable_verions