Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT: remove hard dependency on odo #12

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ benchmarks.db
.dir-locals.el

TAGS
.gdb_history
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: python
sudo: false
python:
- "3.4"
- "3.5"
- "3.6"

Expand All @@ -20,8 +19,10 @@ addons:

install:
- ${CC} --version
- pip install numpy
- pip install numpy pandas sqlalchemy
- python -c "import numpy;print(numpy.__version__)"
- python -c "import pandas;print(pandas.__version__)";
- python -c "import sqlalchemy;print(sqlalchemy.__version__)";
- pip install -e .[dev]

script:
Expand Down
9 changes: 3 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,14 @@
),
],
install_requires=[
'datashape',
'numpy',
'pandas',
'sqlalchemy',
'psycopg2',
'odo',
'toolz',
'networkx<=1.11',
],
extras_require={
'dev': [
'odo',
'pandas==0.18.1',
'networkx<=1.11',
'flake8==3.3.0',
'pycodestyle==2.3.1',
'pyflakes==1.5.0',
Expand Down
254 changes: 6 additions & 248 deletions warp_prism/__init__.py
Original file line number Diff line number Diff line change
@@ -1,251 +1,9 @@
from io import BytesIO
from .query import to_arrays, to_dataframe
from .odo import register_odo_dataframe_edge

from datashape import discover
from datashape.predicates import istabular
import numpy as np
from odo import convert
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from toolz import keymap
__version__ = '0.2.1'

from ._warp_prism import (
raw_to_arrays as _raw_to_arrays,
typeid_map as _raw_typeid_map,
)


__version__ = '0.1.1'


_typeid_map = keymap(np.dtype, _raw_typeid_map)
_object_type_id = _raw_typeid_map['object']


class _CopyToBinary(sa.sql.expression.Executable, sa.sql.ClauseElement):

def __init__(self, element, bind):
self.element = element
self._bind = bind = bind

@property
def bind(self):
return self._bind


def literal_compile(s):
"""Compile a sql expression with bind params inlined as literals.

Parameters
----------
s : Selectable
The expression to compile.

Returns
-------
cs : str
An equivalent sql string.
"""
return str(s.compile(compile_kwargs={'literal_binds': True}))


@compiles(_CopyToBinary, 'postgresql')
def _compile_copy_to_binary_postgres(element, compiler, **kwargs):
selectable = element.element
return compiler.process(
sa.text(
'COPY {stmt} TO STDOUT (FORMAT BINARY)'.format(
stmt=(
compiler.preparer.format_table(selectable)
if isinstance(selectable, sa.Table) else
'({})'.format(literal_compile(selectable))
),
)
),
**kwargs
)


def _warp_prism_types(query):
for name, dtype in discover(query).measure.fields:
try:
np_dtype = getattr(dtype, 'ty', dtype).to_numpy_dtype()
if np_dtype.kind == 'U':
yield _object_type_id
else:
yield _typeid_map[np_dtype]
except KeyError:
raise TypeError(
'warp_prism cannot query columns of type %s' % dtype,
)


def _getbind(selectable, bind):
"""Return an explicitly passed connection or infer the connection from
the selectable.

Parameters
----------
selectable : sa.sql.Selectable
The selectable object being queried.
bind : bind or None
The explicit connection or engine to use to execute the query.

Returns
-------
bind : The bind which should be used to execute the query.
"""
if bind is None:
return selectable.bind

if isinstance(bind, sa.engine.base.Engine):
return bind

return sa.create_engine(bind)


def to_arrays(query, *, bind=None):
"""Run the query returning a the results as np.ndarrays.

Parameters
----------
query : sa.sql.Selectable
The query to run. This can be a select or a table.
bind : sa.Engine, optional
The engine used to create the connection. If not provided
``query.bind`` will be used.

Returns
-------
arrays : dict[str, (np.ndarray, np.ndarray)]
A map from column name to the result arrays. The first array holds the
values and the second array is a boolean mask for NULLs. The values
where the mask is False are 0 interpreted by the type.
"""
# check types before doing any work
types = tuple(_warp_prism_types(query))

buf = BytesIO()
bind = _getbind(query, bind)

stmt = _CopyToBinary(query, bind)
with bind.connect() as conn:
conn.connection.cursor().copy_expert(literal_compile(stmt), buf)
out = _raw_to_arrays(buf.getbuffer(), types)
column_names = query.c.keys()
return {column_names[n]: v for n, v in enumerate(out)}


null_values = keymap(np.dtype, {
'float32': np.nan,
'float64': np.nan,
'int16': np.nan,
'int32': np.nan,
'int64': np.nan,
'bool': np.nan,
'datetime64[ns]': np.datetime64('nat', 'ns'),
'object': None,
})

# alias because ``to_dataframe`` shadows this name
_default_null_values_for_type = null_values


def to_dataframe(query, *, bind=None, null_values=None):
"""Run the query returning a the results as a pd.DataFrame.

Parameters
----------
query : sa.sql.Selectable
The query to run. This can be a select or a table.
bind : sa.Engine, optional
The engine used to create the connection. If not provided
``query.bind`` will be used.
null_values : dict[str, any]
The null values to use for each column. This falls back to
``warp_prism.null_values`` for columns that are not specified.

Returns
-------
df : pd.DataFrame
A pandas DataFrame holding the results of the query. The columns
of the DataFrame will be named the same and be in the same order as the
query.
"""
arrays = to_arrays(query, bind=bind)

if null_values is None:
null_values = {}

for name, (array, mask) in arrays.items():
if array.dtype.kind == 'i':
if not mask.all():
try:
null = null_values[name]
except KeyError:
# no explicit override, cast to float and use NaN as null
array = array.astype('float64')
null = np.nan

array[~mask] = null

arrays[name] = array
continue

if array.dtype.kind == 'M':
# pandas needs datetime64[ns], not ``us`` or ``D``
array = array.astype('datetime64[ns]')

try:
null = null_values[name]
except KeyError:
null = _default_null_values_for_type[array.dtype]

array[~mask] = null
arrays[name] = array

return pd.DataFrame(arrays, columns=[column.name for column in query.c])


def register_odo_dataframe_edge():
"""Register an odo edge for sqlalchemy selectable objects to dataframe.

This edge will have a lower cost that the default edge so it will be
selected as the fasted path.

If the selectable is not in a postgres database, it will fallback to the
default odo edge.
"""
# estimating 8 times faster
df_cost = convert.graph.edge[sa.sql.Select][pd.DataFrame]['cost'] / 8

@convert.register(
pd.DataFrame,
(sa.sql.Select, sa.sql.Selectable),
cost=df_cost,
)
def select_or_selectable_to_frame(el, bind=None, dshape=None, **kwargs):
bind = _getbind(el, bind)

if bind.dialect.name != 'postgresql':
# fall back to the general edge
raise NotImplementedError()

return to_dataframe(el, bind=bind)

# higher priority than df edge so that
# ``odo('select one_column from ...', list)`` returns a list of scalars
# instead of a list of tuples of length 1
@convert.register(
pd.Series,
(sa.sql.Select, sa.sql.Selectable),
cost=df_cost - 1,
)
def select_or_selectable_to_series(el, bind=None, dshape=None, **kwargs):
bind = _getbind(el, bind)

if istabular(dshape) or bind.dialect.name != 'postgresql':
# fall back to the general edge
raise NotImplementedError()

return to_dataframe(el, bind=bind).iloc[:, 0]
__all__ = [
'to_arrays', 'to_dataframe', 'register_odo_dataframe_edge',
]
51 changes: 51 additions & 0 deletions warp_prism/odo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from .query import to_dataframe
from .sql import getbind


def register_odo_dataframe_edge():
"""Register an odo edge for sqlalchemy selectable objects to dataframe.

This edge will have a lower cost that the default edge so it will be
selected as the fasted path.

If the selectable is not in a postgres database, it will fallback to the
default odo edge.
"""
from datashape.predicates import istabular
from odo import convert
import pandas as pd
import sqlalchemy as sa

# estimating 8 times faster
df_cost = convert.graph.edge[sa.sql.Select][pd.DataFrame]['cost'] / 8

@convert.register(
pd.DataFrame,
(sa.sql.Select, sa.sql.Selectable),
cost=df_cost,
)
def select_or_selectable_to_frame(el, bind=None, dshape=None, **kwargs):
bind = getbind(el, bind)

if bind.dialect.name != 'postgresql':
# fall back to the general edge
raise NotImplementedError()

return to_dataframe(el, bind=bind)

# higher priority than df edge so that
# ``odo('select one_column from ...', list)`` returns a list of scalars
# instead of a list of tuples of length 1
@convert.register(
pd.Series,
(sa.sql.Select, sa.sql.Selectable),
cost=df_cost - 1,
)
def select_or_selectable_to_series(el, bind=None, dshape=None, **kwargs):
bind = getbind(el, bind)

if istabular(dshape) or bind.dialect.name != 'postgresql':
# fall back to the general edge
raise NotImplementedError()

return to_dataframe(el, bind=bind).iloc[:, 0]
Loading