Skip to content

Commit

Permalink
Track engine and storage format at BasePandasDataset level.
Browse files Browse the repository at this point in the history
Signed-off-by: sfc-gh-mvashishtha <[email protected]>
  • Loading branch information
sfc-gh-mvashishtha committed Jan 18, 2025
1 parent e59ea4e commit d517e87
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 45 deletions.
2 changes: 1 addition & 1 deletion modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DataFrameVariable(EnvironmentVariable, type=str):
def get(cls) -> Any:
return super().get()

class Engine(DataFrameVariable, type=str):
class Engine(EnvironmentVariable, type=str):
"""Distribution engine to run queries by."""

varname = "MODIN_ENGINE"
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,7 @@ def astype(self, col_dtypes, errors: str = "raise"):
new_dtypes = self_dtypes.copy()
# Update the new dtype series to the proper pandas dtype
new_dtype = pandas.api.types.pandas_dtype(dtype)
if self._getEngineConfig().get() == "Dask" and hasattr(dtype, "_is_materialized"):
if self._engine == "Dask" and hasattr(dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = dtype._materialize_categories()

Expand Down Expand Up @@ -1736,7 +1736,7 @@ def astype_builder(df):
if not (col_dtypes == self_dtypes).all():
new_dtypes = self_dtypes.copy()
new_dtype = pandas.api.types.pandas_dtype(col_dtypes)
if self._getEngineConfig().get() == "Dask" and hasattr(new_dtype, "_is_materialized"):
if self._engine == "Dask" and hasattr(new_dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = new_dtype._materialize_categories()
if isinstance(new_dtype, pandas.CategoricalDtype):
Expand Down
6 changes: 6 additions & 0 deletions modin/numpy/arr.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class array(object):
provide functionality.
"""

# TODO(hybrid-execution): save the possible engine / storage format values
# to a constant and restrict the types of _engine and _storage_format to
# those values
_engine: str
_storage_format: str

def __init__(
self,
object=None,
Expand Down
42 changes: 25 additions & 17 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pickle as pkl
import re
import warnings
from functools import cached_property
from functools import cached_property, wraps
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -193,6 +193,17 @@ def _get_repr_axis_label_indexer(labels, num_for_repr):
[] if back_repr_num == 0 else list(all_positions[-back_repr_num:])
)

def _ensure_engine_and_storage_format_set_post_init(init):
@wraps(init)
def wrapped_init(self, *args, **kwargs):
init(self, *args, **kwargs)
assert self._engine is not None, (
f"Internal error: must initialize {type(self)} with engine"
)
assert self._engine is not None, (
f"Internal error: must initialize {type(self)} with storage format"
)
return wrapped_init

@_inherit_docstrings(pandas.DataFrame, apilink=["pandas.DataFrame", "pandas.Series"])
class BasePandasDataset(ClassLogger):
Expand All @@ -209,20 +220,15 @@ class BasePandasDataset(ClassLogger):
_pandas_class = pandas.core.generic.NDFrame
_query_compiler: BaseQueryCompiler
_siblings: list[BasePandasDataset]
_engine_override: Engine = None
_storage_override: StorageFormat = None
# TODO(hybrid-execution): save the possible engine / storage format values
# to a constant and restrict the types of _engine and _storage_format to
# those values
_engine: str = None
_storage_format: str = None

def _getEngineConfig(self) -> Engine:
if self._engine_override is not None:
return self._engine_override
else:
return Engine

def _getStorageConfig(self) -> Engine:
if self._storage_override is not None:
return self._storage_override
else:
return StorageFormat
engine = property(lambda self: self._engine)

storage_format = property(lambda self: self._storage_format)

@cached_property
def _is_dataframe(self) -> bool:
Expand All @@ -242,7 +248,7 @@ def _is_dataframe(self) -> bool:

@abc.abstractmethod
def _create_or_update_from_compiler(
self, new_query_compiler: BaseQueryCompiler, inplace: bool = False
self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False
) -> Self | None:
"""
Return or update a ``DataFrame`` or ``Series`` with given `new_query_compiler`.
Expand Down Expand Up @@ -317,7 +323,7 @@ def _build_repr_df(
indexer = row_indexer
return self.iloc[indexer]._query_compiler.to_pandas()

def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None:
def _update_inplace(self, new_query_compiler: BaseQueryCompiler, new_storage_format: str, new_engine: str) -> None:
"""
Update the current DataFrame inplace.
Expand All @@ -331,6 +337,8 @@ def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None:
for sib in self._siblings:
sib._query_compiler = new_query_compiler
old_query_compiler.free()
self._storage_format = new_storage_format
self._new_engine = new_engine

def _validate_other(
self,
Expand Down Expand Up @@ -538,7 +546,7 @@ def _binary_op(self, op, other, **kwargs) -> Self:
if not self._is_dataframe and op in series_specialize_list:
op = "series_" + op
new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs)
return self._create_or_update_from_compiler(new_query_compiler)
return self._create_or_update_from_compiler(new_query_compiler, self.engine, self.storage_format)

def _default_to_pandas(self, op, *args, reason: str = None, **kwargs):
"""
Expand Down
39 changes: 31 additions & 8 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from pandas.io.formats.info import DataFrameInfo
from pandas.util._validators import validate_bool_kwarg

from modin.config import PersistentPickle
from modin.config import PersistentPickle, StorageFormat, Engine
from modin.error_message import ErrorMessage
from modin.logging import disable_logging
from modin.pandas import Categorical
Expand All @@ -72,7 +72,7 @@
)

from .accessor import CachedAccessor, SparseFrameAccessor
from .base import _ATTRS_NO_LOOKUP, BasePandasDataset
from .base import _ATTRS_NO_LOOKUP, BasePandasDataset, _ensure_engine_and_storage_format_set_post_init
from .groupby import DataFrameGroupBy
from .iterator import PartitionIterator
from .series import Series
Expand Down Expand Up @@ -137,6 +137,7 @@ class DataFrame(BasePandasDataset):

_pandas_class = pandas.DataFrame

@_ensure_engine_and_storage_format_set_post_init
def __init__(
self,
data=None,
Expand All @@ -145,6 +146,8 @@ def __init__(
dtype=None,
copy=None,
query_compiler: BaseQueryCompiler = None,
engine: str = None,
storage_format: str = None
) -> None:
from modin.numpy import array

Expand All @@ -153,6 +156,8 @@ def __init__(
self._siblings = []
if isinstance(data, (DataFrame, Series)):
self._query_compiler = data._query_compiler.copy()
self._engine = data.engine
self._storage_format = data.storage_format
if index is not None and any(i not in data.index for i in index):
raise NotImplementedError(
"Passing non-existant columns or index values to constructor not"
Expand Down Expand Up @@ -185,22 +190,26 @@ def __init__(
self._query_compiler = data.loc[index, columns]._query_compiler
elif isinstance(data, array):
self._query_compiler = data._query_compiler.copy()
self._engine = data.engine
self._storage_format = data.storage_format
if copy is not None and not copy:
data._add_sibling(self)
if columns is not None and not isinstance(columns, pandas.Index):
columns = pandas.Index(columns)
if columns is not None:
obj_with_new_columns = self.set_axis(columns, axis=1, copy=False)
self._update_inplace(obj_with_new_columns._query_compiler)
self._update_inplace(obj_with_new_columns._query_compiler, obj_with_new_columns.engine, obj_with_new_columns.storage_format)
if index is not None:
obj_with_new_index = self.set_axis(index, axis=0, copy=False)
self._update_inplace(obj_with_new_index._query_compiler)
self._update_inplace(obj_with_new_index._query_compiler, obj_with_new_index.engine, obj_with_new_index.storage_format)
if dtype is not None:
casted_obj = self.astype(dtype, copy=False)
self._query_compiler = casted_obj._query_compiler
# Check type of data and use appropriate constructor
elif query_compiler is None:
distributed_frame = from_non_pandas(data, index, columns, dtype)
self._storage_format = StorageFormat.get()
self._engine = Engine.get()
if distributed_frame is not None:
self._query_compiler = distributed_frame._query_compiler
return
Expand All @@ -227,6 +236,11 @@ def __init__(
data = {key: value for key, value in data.items() if key in columns}

if len(data) and all(isinstance(v, Series) for v in data.values()):
if (
len(set(v.storage_format for v in data.values())) > 1 or
len(set(v.engine for v in data.values())) > 1
):
raise NotImplementedError('multiple executions in input data')
from .general import concat

new_qc = concat(
Expand Down Expand Up @@ -258,7 +272,16 @@ def __init__(
)
self._query_compiler = from_pandas(pandas_df)._query_compiler
else:
assert engine is not None, (
"When initializing dataframe with query compiler, must provide Engine"
)
assert storage_format is not None, (
"When initializing dataframe with query compiler, must provide Engine"
)
self._query_compiler = query_compiler
self._engine = engine
self._storage_format = storage_format


def __repr__(self) -> str:
"""
Expand Down Expand Up @@ -2641,7 +2664,7 @@ def __setattr__(self, key, value) -> None:
# __dict__
# - `_siblings`, which Modin initializes before it appears in __dict__
# before it appears in __dict__.
if key in ("_query_compiler", "_siblings") or key in self.__dict__:
if key in ("_query_compiler", "_siblings", "_engine", "_storage_format") or key in self.__dict__:
pass
# we have to check for the key in `dir(self)` first in order not to trigger columns computation
elif key not in dir(self) and key in self:
Expand Down Expand Up @@ -2983,7 +3006,7 @@ def reindex_like(
)

def _create_or_update_from_compiler(
self, new_query_compiler, inplace=False
self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False
) -> Union[DataFrame, None]:
"""
Return or update a ``DataFrame`` with given `new_query_compiler`.
Expand All @@ -3004,9 +3027,9 @@ def _create_or_update_from_compiler(
new_query_compiler, self._query_compiler.__class__.__bases__
), "Invalid Query Compiler object: {}".format(type(new_query_compiler))
if not inplace:
return self.__constructor__(query_compiler=new_query_compiler)
return self.__constructor__(query_compiler=new_query_compiler, engine=new_engine, storage_format=new_storage_format)
else:
self._update_inplace(new_query_compiler=new_query_compiler)
self._update_inplace(new_query_compiler=new_query_compiler, new_storage_format=new_storage_format, new_engine=new_engine)

def _get_numeric_data(self, axis: int) -> DataFrame:
"""
Expand Down
20 changes: 12 additions & 8 deletions modin/pandas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ def _get_pandas_object_from_qc_view(
common ``Indexer`` object or range and ``np.ndarray`` only.
"""
if ndim == 2:
return self.df.__constructor__(query_compiler=qc_view)
return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)
if isinstance(self.df, Series) and not row_scalar:
return self.df.__constructor__(query_compiler=qc_view)
return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)

if isinstance(self.df, Series):
axis = 0
Expand All @@ -400,7 +400,7 @@ def _get_pandas_object_from_qc_view(
else 1 if col_scalar or col_multiindex_full_lookup else 0
)

res_df = self.df.__constructor__(query_compiler=qc_view)
res_df = self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)
return res_df.squeeze(axis=axis)

def _setitem_positional(self, row_lookup, col_lookup, item, axis=None):
Expand Down Expand Up @@ -573,7 +573,8 @@ def _handle_boolean_masking(self, row_loc, col_loc):
extra_log=f"Only ``modin.pandas.Series`` boolean masks are acceptable, got: {type(row_loc)}",
)
masked_df = self.df.__constructor__(
query_compiler=self.qc.getitem_array(row_loc._query_compiler)
query_compiler=self.qc.getitem_array(row_loc._query_compiler),
engine=self.df.engine, storage_format=self.df.storage_format
)
if isinstance(masked_df, Series):
assert col_loc == slice(None)
Expand Down Expand Up @@ -789,8 +790,10 @@ def _loc(df):
df.loc[key] = item
return df

result_df = self.df._default_to_pandas(_loc)
self.df._update_inplace(
new_query_compiler=self.df._default_to_pandas(_loc)._query_compiler
new_query_compiler=result_df._query_compiler,
engine=result_df.engine, storage_format=result_df.storage_format
)
return
row_loc, col_loc, ndims = self._parse_row_and_column_locators(key)
Expand All @@ -807,7 +810,7 @@ def _loc(df):
if is_scalar(row_loc) or len(row_loc) == 1:
index = self.qc.index.insert(len(self.qc.index), row_loc)
self.qc = self.qc.reindex(labels=index, axis=0, fill_value=0)
self.df._update_inplace(new_query_compiler=self.qc)
self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format )
self._set_item_existing_loc(row_loc, col_loc, item)
else:
self._set_item_existing_loc(row_loc, col_loc, item)
Expand Down Expand Up @@ -845,7 +848,7 @@ def _setitem_with_new_columns(self, row_loc, col_loc, item):
if not common_label_loc[i]:
columns = columns.insert(len(columns), col_loc[i])
self.qc = self.qc.reindex(labels=columns, axis=1, fill_value=np.nan)
self.df._update_inplace(new_query_compiler=self.qc)
self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format )
self._set_item_existing_loc(row_loc, np.array(col_loc), item)

def _set_item_existing_loc(self, row_loc, col_loc, item):
Expand Down Expand Up @@ -1061,7 +1064,8 @@ def _iloc(df):
return df

self.df._update_inplace(
new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler
new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler,
engine=self.df.engine, storage_format=self.df.storage_format
)
return
row_loc, col_loc, _ = self._parse_row_and_column_locators(key)
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from pandas.io.parsers import TextFileReader
from pandas.io.parsers.readers import _c_parser_defaults

from modin.config import ModinNumpy
from modin.config import ModinNumpy, StorageFormat, Engine
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger, enable_logging
from modin.utils import (
Expand Down Expand Up @@ -991,7 +991,7 @@ def from_pandas(df) -> DataFrame:
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df))
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df), engine=Engine.get(), storage_format=StorageFormat.get())


def from_arrow(at) -> DataFrame:
Expand Down
Loading

0 comments on commit d517e87

Please sign in to comment.