Skip to content

Commit

Permalink
MAINT: remove hard dependency on odo
Browse files Browse the repository at this point in the history
Odo is currently a hard dependency of warp_prism to convert the sqlalchemy types
into a numpy dtypes. Odo is no longer actively maintained and breaks with newer
versions of pandas. This change reimplements the needed functionality in
warp_prism directly without using odo. This PR does leave the odo edge
registration code so that existing users don't see a change in functionality.
  • Loading branch information
Joe Jevnik committed Jan 8, 2020
1 parent dbd61bf commit ca832a4
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 16 deletions.
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@
),
],
install_requires=[
'datashape',
'numpy',
'pandas',
'sqlalchemy',
'psycopg2',
'odo',
'toolz',
'networkx<=1.11',
],
extras_require={
'dev': [
'odo',
'networkx<=1.11',
'flake8==3.3.0',
'pycodestyle==2.3.1',
'pyflakes==1.5.0',
Expand Down
120 changes: 107 additions & 13 deletions warp_prism/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from io import BytesIO
import numbers

from datashape import discover
from datashape.predicates import istabular
import numpy as np
from odo import convert
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql as _postgresql
from sqlalchemy.ext.compiler import compiles
from toolz import keymap

from ._warp_prism import (
raw_to_arrays as _raw_to_arrays,
Expand All @@ -18,7 +16,7 @@
__version__ = '0.1.1'


_typeid_map = keymap(np.dtype, _raw_typeid_map)
_typeid_map = {np.dtype(k): v for k, v in _raw_typeid_map.items()}
_object_type_id = _raw_typeid_map['object']


Expand Down Expand Up @@ -66,14 +64,107 @@ def _compile_copy_to_binary_postgres(element, compiler, **kwargs):
)


types = {np.dtype(k): v for k, v in {
'i8': sa.BigInteger,
'i4': sa.Integer,
'i2': sa.SmallInteger,
'f4': sa.REAL,
'f8': sa.FLOAT,
'O': sa.Text,
'M8[D]': sa.Date,
'M8[us]': sa.DateTime,
'?': sa.Boolean,
"m8[D]": sa.Interval(second_precision=0, day_precision=9),
"m8[h]": sa.Interval(second_precision=0, day_precision=0),
"m8[m]": sa.Interval(second_precision=0, day_precision=0),
"m8[s]": sa.Interval(second_precision=0, day_precision=0),
"m8[ms]": sa.Interval(second_precision=3, day_precision=0),
"m8[us]": sa.Interval(second_precision=6, day_precision=0),
"m8[ns]": sa.Interval(second_precision=9, day_precision=0),
}.items()}

_revtypes = dict(map(reversed, types.items()))
_revtypes.update({
sa.DATETIME: np.dtype('M8[us]'),
sa.TIMESTAMP: np.dtype('M8[us]'),
sa.FLOAT: np.dtype('f8'),
sa.DATE: np.dtype('M8[D]'),
sa.BIGINT: np.dtype('i8'),
sa.INTEGER: np.dtype('i4'),
sa.BIGINT: np.dtype('i8'),
sa.types.NullType: np.dtype('O'),
sa.REAL: np.dtype('f4'),
sa.Float: np.dtype('f8'),
})

_precision_types = {
sa.Float,
_postgresql.base.DOUBLE_PRECISION,
}


def _precision_to_dtype(precision):
if isinstance(precision, numbers.Integral):
if 1 <= precision <= 24:
return np.dtype('f4')
elif 25 <= precision <= 53:
return np.dtype('f8')
raise ValueError('%s is not a supported precision' % precision)


_units_of_power = {
0: 's',
3: 'ms',
6: 'us',
9: 'ns'
}


def _discover_type(type_):
if isinstance(type_, sa.Interval):
if type_.second_precision is None and type_.day_precision is None:
return np.dtype('m8[us]')
elif type_.second_precision == 0 and type_.day_precision == 0:
return np.dtype('m8[s]')

if (type_.second_precision in _units_of_power and
not type_.day_precision):
unit = _units_of_power[type_.second_precision]
elif type_.day_precision > 0:
unit = 'D'
else:
raise ValueError(
'Cannot infer INTERVAL type_e with parameters'
'second_precision=%d, day_precision=%d' %
(type_.second_precision, type_.day_precision),
)
return np.dtype('m8[%s]' % unit)
if type(type_) in _precision_types and type_.precision is not None:
return _precision_to_dtype(type_.precision)
if type_ in _revtypes:
return _revtypes[type_]
if type(type_) in _revtypes:
return _revtypes[type(type_)]
if isinstance(type_, sa.Numeric):
raise ValueError('Cannot adapt numeric type to numpy dtype')
if isinstance(type_, (sa.String, sa.Unicode)):
return np.dtype('O')
else:
for k, v in _revtypes.items():
if isinstance(k, type) and (isinstance(type_, k) or
hasattr(type_, 'impl') and
isinstance(type_.impl, k)):
return v
if k == type_:
return v
raise NotImplementedError('No SQL-numpy match for type %s' % type_)


def _warp_prism_types(query):
for name, dtype in discover(query).measure.fields:
for col in query.columns:
dtype = _discover_type(col.type)
try:
np_dtype = getattr(dtype, 'ty', dtype).to_numpy_dtype()
if np_dtype.kind == 'U':
yield _object_type_id
else:
yield _typeid_map[np_dtype]
yield _typeid_map[dtype]
except KeyError:
raise TypeError(
'warp_prism cannot query columns of type %s' % dtype,
Expand Down Expand Up @@ -136,7 +227,7 @@ def to_arrays(query, *, bind=None):
return {column_names[n]: v for n, v in enumerate(out)}


null_values = keymap(np.dtype, {
null_values = {np.dtype(k): v for k, v in {
'float32': np.nan,
'float64': np.nan,
'int16': np.nan,
Expand All @@ -145,7 +236,7 @@ def to_arrays(query, *, bind=None):
'bool': np.nan,
'datetime64[ns]': np.datetime64('nat', 'ns'),
'object': None,
})
}.items()}

# alias because ``to_dataframe`` shadows this name
_default_null_values_for_type = null_values
Expand Down Expand Up @@ -216,6 +307,9 @@ def register_odo_dataframe_edge():
If the selectable is not in a postgres database, it will fallback to the
default odo edge.
"""
from odo import convert
from datashape.predicates import istabular

# estimating 8 times faster
df_cost = convert.graph.edge[sa.sql.Select][pd.DataFrame]['cost'] / 8

Expand Down

0 comments on commit ca832a4

Please sign in to comment.