# -*- coding: utf-8 -*-
"""
chanjo.store.core
~~~~~~~~~~~~~~~~~~
"""
from __future__ import division
import os
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.sql.expression import ClauseElement
from .models import (BASE, Gene, Transcript, Exon, Sample)
[docs]class Store(object):
"""SQLAlchemy-based database object.
Bundles functionality required to setup and interact with various
related genomic interval elements.
.. versionchanged:: 2.1.0
Lazy-loadable, all "init" arguments optional.
Examples:
>>> chanjo_db = Store('data/elements.sqlite3')
>>> chanjo_db.set_up()
.. note::
For testing pourposes use ``:memory:`` as the ``path`` argument to
set up in-memory (temporary) database.
Args:
uri (Optional[str]): path/URI to the database to connect to
debug (Optional[bool]): whether to output logging information
Attributes:
uri (str): path/URI to the database to connect to
engine (class): SQLAlchemy engine, defines what database to use
session (class): SQLAlchemy ORM session, manages persistance
query (method): SQLAlchemy ORM query builder method
classes (dict): bound ORM classes
"""
def __init__(self, uri=None, debug=False):
super(Store, self).__init__()
self.uri = uri
if uri:
self.connect(uri, debug=debug)
# ORM class shortcuts to enable fetching models dynamically
self.classes = {'gene': Gene, 'transcript': Transcript,
'exon': Exon, 'sample': Sample}
[docs] def connect(self, db_uri, debug=False):
"""Configure connection to a SQL database.
.. versionadded:: 2.1.0
Args:
db_uri (str): path/URI to the database to connect to
debug (Optional[bool]): whether to output logging information
"""
kwargs = {'echo': debug, 'convert_unicode': True}
# connect to the SQL database
if 'mysql' in db_uri:
kwargs['pool_recycle'] = 3600
elif '://' not in db_uri:
# expect only a path to a sqlite database
db_path = os.path.abspath(os.path.expanduser(db_uri))
db_uri = "sqlite:///{}".format(db_path)
self.engine = create_engine(db_uri, **kwargs)
# make sure the same engine is propagated to the BASE classes
BASE.metadata.bind = self.engine
# start a session
self.session = scoped_session(sessionmaker(bind=self.engine))
# shortcut to query method
self.query = self.session.query
return self
@property
def dialect(self):
"""Return database dialect name used for the current connection.
Dynamic attribute.
Returns:
str: name of dialect used for database connection
"""
return self.engine.dialect.name
[docs] def set_up(self):
"""Initialize a new database with the default tables and columns.
Returns:
Store: self
"""
# create the tables
BASE.metadata.create_all(self.engine)
return self
[docs] def tear_down(self):
"""Tear down a database (tables and columns).
Returns:
Store: self
"""
# drop/delete the tables
BASE.metadata.drop_all(self.engine)
return self
[docs] def get_or_create(self, model, **kwargs):
"""Get or create a record in the database."""
try:
query = self.query(model).filter_by(**kwargs)
instance = query.first()
if instance:
return instance, False
else:
self.session.begin(nested=True)
try:
params = dict((key, value) for key, value
in kwargs.iteritems()
if not isinstance(value, ClauseElement))
instance = model(**params)
self.session.add(instance)
self.session.commit()
return instance, True
except IntegrityError as error:
self.session.rollback()
instance = query.one()
return instance, False
except Exception as exception:
raise exception
[docs] def save(self):
"""Manually persist changes made to various elements. Chainable.
.. versionchanged:: 2.1.2
Flush session before commit.
Returns:
Store: ``self`` for chainability
"""
# commit/persist dirty changes to the database
self.session.flush()
self.session.commit()
return self
[docs] def get(self, typ, type_id):
"""Fetch a specific element or ORM class.
Calls itself recursively when asked to fetch an element.
Args:
typ (str): element key or 'class'
type_id (str): element id or ORM model id
Returns:
model: element or ORM class
Examples:
>>> gene = db.get('gene', 'GIT1')
"""
if typ == 'class':
return self.classes.get(type_id, None)
# get an ORM class (recursive)
klass = self.get('class', typ)
# return the requested element object (or ``None``) if not found
return self.session.query(klass).get(type_id)
[docs] def find(self, klass_id, query=None, attrs=None):
"""Fetch one or more elements based on the query.
If the 'query' parameter is a string :meth:`~chanjo.Store.find`
will fetch one element; just like `get`. If query is a list it will
match element ids to items in that list and return a list of
elements. If 'query' is ``None`` all elements of that class will
be returned.
Args:
klass_id (str): type of element to find
query (str/list, optional): element Id(s)
attrs (list, optional): list of columns to fetch
Returns:
object/list: element(s) from the database
"""
# get an ORM class
klass = self.get('class', klass_id)
if attrs is not None:
params = [getattr(klass, attr) for attr in attrs]
else:
params = (klass,)
if query is None:
# return all `klass_id` elements in the database
return self.query(*params).all()
elif isinstance(query, list):
# return all `klass_id` elements in the database
return self.query(*params).filter(klass.id.in_(query)).all()
elif isinstance(query, str):
# call 'get' to return the single element
return self.get(klass_id, query)
else:
raise ValueError("'query' must be 'None', 'list', or 'str'")
[docs] def add(self, elements):
"""Add one or more new elements and commit the changes. Chainable.
Args:
elements (orm/list): new ORM object instance or list of such
Returns:
Store: ``self`` for chainability
"""
if isinstance(elements, BASE):
# Add the record to the session object
self.session.add(elements)
elif isinstance(elements, list):
# Add all records to the session object
self.session.add_all(elements)
return self
[docs] def create(self, class_id, *args, **kwargs):
r"""Create a new instance of an ORM element.
If attributes are supplied as a tuple they must be in the correct
order. Supplying a `dict` doesn't require the attributes to be in
any particular order.
Args:
class_id (str): choice between "superblock", "block", "interval"
\*args (tuple): list the element attributes in the *correct order*
\**kwargs (dict): element attributes in whatever order you like
Returns:
orm: new ORM instance object
"""
if args:
# unpack tuple
return self.get('class', class_id)(*args)
elif kwargs:
# unpack dictionary
return self.get('class', class_id)(**kwargs)
else:
error_msg = 'Submit attributes as arguments or keyword arguments'
raise TypeError(error_msg)