From d517e8747d223e9678c505264e3093045b0d3edc Mon Sep 17 00:00:00 2001 From: sfc-gh-mvashishtha Date: Fri, 17 Jan 2025 23:04:24 -0800 Subject: [PATCH] Track engine and storage format at BasePandasDataset level. Signed-off-by: sfc-gh-mvashishtha --- modin/config/envvars.py | 2 +- .../dataframe/pandas/dataframe/dataframe.py | 4 +- modin/numpy/arr.py | 6 ++ modin/pandas/base.py | 42 +++++++----- modin/pandas/dataframe.py | 39 ++++++++--- modin/pandas/indexing.py | 20 +++--- modin/pandas/io.py | 4 +- modin/pandas/series.py | 28 ++++++-- per_dataframe_engine_demo.ipynb | 67 +++++++++++++++++++ 9 files changed, 167 insertions(+), 45 deletions(-) create mode 100644 per_dataframe_engine_demo.ipynb diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 49ddb6a90a0..428bfd3a027 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -174,7 +174,7 @@ class DataFrameVariable(EnvironmentVariable, type=str): def get(cls) -> Any: return super().get() -class Engine(DataFrameVariable, type=str): +class Engine(EnvironmentVariable, type=str): """Distribution engine to run queries by.""" varname = "MODIN_ENGINE" diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index e401a4a5d65..4cf288f1c0f 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -1707,7 +1707,7 @@ def astype(self, col_dtypes, errors: str = "raise"): new_dtypes = self_dtypes.copy() # Update the new dtype series to the proper pandas dtype new_dtype = pandas.api.types.pandas_dtype(dtype) - if self._getEngineConfig().get() == "Dask" and hasattr(dtype, "_is_materialized"): + if self._engine == "Dask" and hasattr(dtype, "_is_materialized"): # FIXME: https://github.com/dask/distributed/issues/8585 _ = dtype._materialize_categories() @@ -1736,7 +1736,7 @@ def astype_builder(df): if not (col_dtypes == self_dtypes).all(): new_dtypes = self_dtypes.copy() new_dtype = pandas.api.types.pandas_dtype(col_dtypes) - if self._getEngineConfig().get() == "Dask" and hasattr(new_dtype, "_is_materialized"): + if self._engine == "Dask" and hasattr(new_dtype, "_is_materialized"): # FIXME: https://github.com/dask/distributed/issues/8585 _ = new_dtype._materialize_categories() if isinstance(new_dtype, pandas.CategoricalDtype): diff --git a/modin/numpy/arr.py b/modin/numpy/arr.py index ab40ecab2cc..c424ad3f4cd 100644 --- a/modin/numpy/arr.py +++ b/modin/numpy/arr.py @@ -151,6 +151,12 @@ class array(object): provide functionality. """ + # TODO(hybrid-execution): save the possible engine / storage format values + # to a constant and restrict the types of _engine and _storage_format to + # those values + _engine: str + _storage_format: str + def __init__( self, object=None, diff --git a/modin/pandas/base.py b/modin/pandas/base.py index bd6e9908670..6754778ad54 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -19,7 +19,7 @@ import pickle as pkl import re import warnings -from functools import cached_property +from functools import cached_property, wraps from typing import ( TYPE_CHECKING, Any, @@ -193,6 +193,17 @@ def _get_repr_axis_label_indexer(labels, num_for_repr): [] if back_repr_num == 0 else list(all_positions[-back_repr_num:]) ) +def _ensure_engine_and_storage_format_set_post_init(init): + @wraps(init) + def wrapped_init(self, *args, **kwargs): + init(self, *args, **kwargs) + assert self._engine is not None, ( + f"Internal error: must initialize {type(self)} with engine" + ) + assert self._engine is not None, ( + f"Internal error: must initialize {type(self)} with storage format" + ) + return wrapped_init @_inherit_docstrings(pandas.DataFrame, apilink=["pandas.DataFrame", "pandas.Series"]) class BasePandasDataset(ClassLogger): @@ -209,20 +220,15 @@ class BasePandasDataset(ClassLogger): _pandas_class = pandas.core.generic.NDFrame _query_compiler: BaseQueryCompiler _siblings: list[BasePandasDataset] - _engine_override: Engine = None - _storage_override: StorageFormat = None + # TODO(hybrid-execution): save the possible engine / storage format values + # to a constant and restrict the types of _engine and _storage_format to + # those values + _engine: str = None + _storage_format: str = None - def _getEngineConfig(self) -> Engine: - if self._engine_override is not None: - return self._engine_override - else: - return Engine - - def _getStorageConfig(self) -> Engine: - if self._storage_override is not None: - return self._storage_override - else: - return StorageFormat + engine = property(lambda self: self._engine) + + storage_format = property(lambda self: self._storage_format) @cached_property def _is_dataframe(self) -> bool: @@ -242,7 +248,7 @@ def _is_dataframe(self) -> bool: @abc.abstractmethod def _create_or_update_from_compiler( - self, new_query_compiler: BaseQueryCompiler, inplace: bool = False + self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False ) -> Self | None: """ Return or update a ``DataFrame`` or ``Series`` with given `new_query_compiler`. @@ -317,7 +323,7 @@ def _build_repr_df( indexer = row_indexer return self.iloc[indexer]._query_compiler.to_pandas() - def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None: + def _update_inplace(self, new_query_compiler: BaseQueryCompiler, new_storage_format: str, new_engine: str) -> None: """ Update the current DataFrame inplace. @@ -331,6 +337,8 @@ def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None: for sib in self._siblings: sib._query_compiler = new_query_compiler old_query_compiler.free() + self._storage_format = new_storage_format + self._new_engine = new_engine def _validate_other( self, @@ -538,7 +546,7 @@ def _binary_op(self, op, other, **kwargs) -> Self: if not self._is_dataframe and op in series_specialize_list: op = "series_" + op new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs) - return self._create_or_update_from_compiler(new_query_compiler) + return self._create_or_update_from_compiler(new_query_compiler, self.engine, self.storage_format) def _default_to_pandas(self, op, *args, reason: str = None, **kwargs): """ diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 4f47c9374e9..9530530398d 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -57,7 +57,7 @@ from pandas.io.formats.info import DataFrameInfo from pandas.util._validators import validate_bool_kwarg -from modin.config import PersistentPickle +from modin.config import PersistentPickle, StorageFormat, Engine from modin.error_message import ErrorMessage from modin.logging import disable_logging from modin.pandas import Categorical @@ -72,7 +72,7 @@ ) from .accessor import CachedAccessor, SparseFrameAccessor -from .base import _ATTRS_NO_LOOKUP, BasePandasDataset +from .base import _ATTRS_NO_LOOKUP, BasePandasDataset, _ensure_engine_and_storage_format_set_post_init from .groupby import DataFrameGroupBy from .iterator import PartitionIterator from .series import Series @@ -137,6 +137,7 @@ class DataFrame(BasePandasDataset): _pandas_class = pandas.DataFrame + @_ensure_engine_and_storage_format_set_post_init def __init__( self, data=None, @@ -145,6 +146,8 @@ def __init__( dtype=None, copy=None, query_compiler: BaseQueryCompiler = None, + engine: str = None, + storage_format: str = None ) -> None: from modin.numpy import array @@ -153,6 +156,8 @@ def __init__( self._siblings = [] if isinstance(data, (DataFrame, Series)): self._query_compiler = data._query_compiler.copy() + self._engine = data.engine + self._storage_format = data.storage_format if index is not None and any(i not in data.index for i in index): raise NotImplementedError( "Passing non-existant columns or index values to constructor not" @@ -185,22 +190,26 @@ def __init__( self._query_compiler = data.loc[index, columns]._query_compiler elif isinstance(data, array): self._query_compiler = data._query_compiler.copy() + self._engine = data.engine + self._storage_format = data.storage_format if copy is not None and not copy: data._add_sibling(self) if columns is not None and not isinstance(columns, pandas.Index): columns = pandas.Index(columns) if columns is not None: obj_with_new_columns = self.set_axis(columns, axis=1, copy=False) - self._update_inplace(obj_with_new_columns._query_compiler) + self._update_inplace(obj_with_new_columns._query_compiler, obj_with_new_columns.engine, obj_with_new_columns.storage_format) if index is not None: obj_with_new_index = self.set_axis(index, axis=0, copy=False) - self._update_inplace(obj_with_new_index._query_compiler) + self._update_inplace(obj_with_new_index._query_compiler, obj_with_new_index.engine, obj_with_new_index.storage_format) if dtype is not None: casted_obj = self.astype(dtype, copy=False) self._query_compiler = casted_obj._query_compiler # Check type of data and use appropriate constructor elif query_compiler is None: distributed_frame = from_non_pandas(data, index, columns, dtype) + self._storage_format = StorageFormat.get() + self._engine = Engine.get() if distributed_frame is not None: self._query_compiler = distributed_frame._query_compiler return @@ -227,6 +236,11 @@ def __init__( data = {key: value for key, value in data.items() if key in columns} if len(data) and all(isinstance(v, Series) for v in data.values()): + if ( + len(set(v.storage_format for v in data.values())) > 1 or + len(set(v.engine for v in data.values())) > 1 + ): + raise NotImplementedError('multiple executions in input data') from .general import concat new_qc = concat( @@ -258,7 +272,16 @@ def __init__( ) self._query_compiler = from_pandas(pandas_df)._query_compiler else: + assert engine is not None, ( + "When initializing dataframe with query compiler, must provide Engine" + ) + assert storage_format is not None, ( + "When initializing dataframe with query compiler, must provide Engine" + ) self._query_compiler = query_compiler + self._engine = engine + self._storage_format = storage_format + def __repr__(self) -> str: """ @@ -2641,7 +2664,7 @@ def __setattr__(self, key, value) -> None: # __dict__ # - `_siblings`, which Modin initializes before it appears in __dict__ # before it appears in __dict__. - if key in ("_query_compiler", "_siblings") or key in self.__dict__: + if key in ("_query_compiler", "_siblings", "_engine", "_storage_format") or key in self.__dict__: pass # we have to check for the key in `dir(self)` first in order not to trigger columns computation elif key not in dir(self) and key in self: @@ -2983,7 +3006,7 @@ def reindex_like( ) def _create_or_update_from_compiler( - self, new_query_compiler, inplace=False + self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False ) -> Union[DataFrame, None]: """ Return or update a ``DataFrame`` with given `new_query_compiler`. @@ -3004,9 +3027,9 @@ def _create_or_update_from_compiler( new_query_compiler, self._query_compiler.__class__.__bases__ ), "Invalid Query Compiler object: {}".format(type(new_query_compiler)) if not inplace: - return self.__constructor__(query_compiler=new_query_compiler) + return self.__constructor__(query_compiler=new_query_compiler, engine=new_engine, storage_format=new_storage_format) else: - self._update_inplace(new_query_compiler=new_query_compiler) + self._update_inplace(new_query_compiler=new_query_compiler, new_storage_format=new_storage_format, new_engine=new_engine) def _get_numeric_data(self, axis: int) -> DataFrame: """ diff --git a/modin/pandas/indexing.py b/modin/pandas/indexing.py index eb1808f0235..e4c335abe21 100644 --- a/modin/pandas/indexing.py +++ b/modin/pandas/indexing.py @@ -379,9 +379,9 @@ def _get_pandas_object_from_qc_view( common ``Indexer`` object or range and ``np.ndarray`` only. """ if ndim == 2: - return self.df.__constructor__(query_compiler=qc_view) + return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format) if isinstance(self.df, Series) and not row_scalar: - return self.df.__constructor__(query_compiler=qc_view) + return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format) if isinstance(self.df, Series): axis = 0 @@ -400,7 +400,7 @@ def _get_pandas_object_from_qc_view( else 1 if col_scalar or col_multiindex_full_lookup else 0 ) - res_df = self.df.__constructor__(query_compiler=qc_view) + res_df = self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format) return res_df.squeeze(axis=axis) def _setitem_positional(self, row_lookup, col_lookup, item, axis=None): @@ -573,7 +573,8 @@ def _handle_boolean_masking(self, row_loc, col_loc): extra_log=f"Only ``modin.pandas.Series`` boolean masks are acceptable, got: {type(row_loc)}", ) masked_df = self.df.__constructor__( - query_compiler=self.qc.getitem_array(row_loc._query_compiler) + query_compiler=self.qc.getitem_array(row_loc._query_compiler), + engine=self.df.engine, storage_format=self.df.storage_format ) if isinstance(masked_df, Series): assert col_loc == slice(None) @@ -789,8 +790,10 @@ def _loc(df): df.loc[key] = item return df + result_df = self.df._default_to_pandas(_loc) self.df._update_inplace( - new_query_compiler=self.df._default_to_pandas(_loc)._query_compiler + new_query_compiler=result_df._query_compiler, + engine=result_df.engine, storage_format=result_df.storage_format ) return row_loc, col_loc, ndims = self._parse_row_and_column_locators(key) @@ -807,7 +810,7 @@ def _loc(df): if is_scalar(row_loc) or len(row_loc) == 1: index = self.qc.index.insert(len(self.qc.index), row_loc) self.qc = self.qc.reindex(labels=index, axis=0, fill_value=0) - self.df._update_inplace(new_query_compiler=self.qc) + self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format ) self._set_item_existing_loc(row_loc, col_loc, item) else: self._set_item_existing_loc(row_loc, col_loc, item) @@ -845,7 +848,7 @@ def _setitem_with_new_columns(self, row_loc, col_loc, item): if not common_label_loc[i]: columns = columns.insert(len(columns), col_loc[i]) self.qc = self.qc.reindex(labels=columns, axis=1, fill_value=np.nan) - self.df._update_inplace(new_query_compiler=self.qc) + self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format ) self._set_item_existing_loc(row_loc, np.array(col_loc), item) def _set_item_existing_loc(self, row_loc, col_loc, item): @@ -1061,7 +1064,8 @@ def _iloc(df): return df self.df._update_inplace( - new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler + new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler, + engine=self.df.engine, storage_format=self.df.storage_format ) return row_loc, col_loc, _ = self._parse_row_and_column_locators(key) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 508d1b2a4d5..a3b28042a69 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -64,7 +64,7 @@ from pandas.io.parsers import TextFileReader from pandas.io.parsers.readers import _c_parser_defaults -from modin.config import ModinNumpy +from modin.config import ModinNumpy, StorageFormat, Engine from modin.error_message import ErrorMessage from modin.logging import ClassLogger, enable_logging from modin.utils import ( @@ -991,7 +991,7 @@ def from_pandas(df) -> DataFrame: """ from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher - return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df)) + return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df), engine=Engine.get(), storage_format=StorageFormat.get()) def from_arrow(at) -> DataFrame: diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 11188e85879..2dfc63c8c74 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -31,7 +31,7 @@ from pandas.io.formats.info import SeriesInfo from pandas.util._validators import validate_bool_kwarg -from modin.config import PersistentPickle +from modin.config import PersistentPickle, StorageFormat, Engine from modin.logging import disable_logging from modin.pandas.io import from_pandas, to_pandas from modin.utils import ( @@ -41,7 +41,7 @@ ) from .accessor import CachedAccessor, SparseAccessor -from .base import _ATTRS_NO_LOOKUP, BasePandasDataset +from .base import _ATTRS_NO_LOOKUP, BasePandasDataset, _ensure_engine_and_storage_format_set_post_init from .iterator import PartitionIterator from .series_utils import ( CategoryMethods, @@ -99,6 +99,7 @@ class Series(BasePandasDataset): _pandas_class = pandas.Series __array_priority__ = pandas.Series.__array_priority__ + @_ensure_engine_and_storage_format_set_post_init def __init__( self, data=None, @@ -108,6 +109,8 @@ def __init__( copy=None, fastpath=lib.no_default, query_compiler: BaseQueryCompiler = None, + engine: str = None, + storage_format: str = None ) -> None: from modin.numpy import array @@ -116,6 +119,8 @@ def __init__( self._siblings = [] if isinstance(data, type(self)): query_compiler = data._query_compiler.copy() + self._engine = data.engine + self._storage_format = data.storage_format if index is not None: if any(i not in data.index for i in index): raise NotImplementedError( @@ -124,6 +129,8 @@ def __init__( ) query_compiler = data.loc[index]._query_compiler if isinstance(data, array): + self._engine = data.engine + self._storage_format = data.storage_format if data._ndim == 2: raise ValueError("Data must be 1-dimensional") query_compiler = data._query_compiler.copy() @@ -159,6 +166,13 @@ def __init__( ) ) query_compiler = from_pandas(pandas_df)._query_compiler + self._engine = Engine.get() + self._storage_format = StorageFormat.get() + else: + assert storage_format is not None + assert engine is not None + self._engine = engine + self._storage_format = storage_format self._query_compiler = query_compiler.columnarize() if name is not None: self.name = name @@ -2545,7 +2559,7 @@ def _get_numeric_data(self, axis: int) -> Series: """ return self - def _update_inplace(self, new_query_compiler) -> None: + def _update_inplace(self, new_query_compiler, engine: str, storage_format: str) -> None: """ Update the current Series in-place using `new_query_compiler`. @@ -2554,7 +2568,7 @@ def _update_inplace(self, new_query_compiler) -> None: new_query_compiler : BaseQueryCompiler QueryCompiler to use to manage the data. """ - super(Series, self)._update_inplace(new_query_compiler=new_query_compiler) + super(Series, self)._update_inplace(new_query_compiler=new_query_compiler, new_storage_format=storage_format, new_engine=engine) # Propagate changes back to parent so that column in dataframe had the same contents if self._parent is not None: if self._parent_axis == 0: @@ -2563,7 +2577,7 @@ def _update_inplace(self, new_query_compiler) -> None: self._parent[self.name] = self def _create_or_update_from_compiler( - self, new_query_compiler, inplace=False + self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False ) -> Union[Series, None]: """ Return or update a Series with given `new_query_compiler`. @@ -2585,12 +2599,12 @@ def _create_or_update_from_compiler( or type(new_query_compiler) in self._query_compiler.__class__.__bases__ ), "Invalid Query Compiler object: {}".format(type(new_query_compiler)) if not inplace and new_query_compiler.is_series_like(): - return self.__constructor__(query_compiler=new_query_compiler) + return self.__constructor__(query_compiler=new_query_compiler, engine=new_engine, storage_format=new_storage_format) elif not inplace: # This can happen with things like `reset_index` where we can add columns. from .dataframe import DataFrame - return DataFrame(query_compiler=new_query_compiler) + return DataFrame(query_compiler=new_query_compiler, engine=new_engine, storage_format=new_storage_format) else: self._update_inplace(new_query_compiler=new_query_compiler) diff --git a/per_dataframe_engine_demo.ipynb b/per_dataframe_engine_demo.ipynb new file mode 100644 index 00000000000..a2885e31090 --- /dev/null +++ b/per_dataframe_engine_demo.ipynb @@ -0,0 +1,67 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "('Ray', 'Pandas')\n", + "('Python', 'Pandas')\n" + ] + } + ], + "source": [ + "import modin.pandas as pd\n", + "from modin.config import StorageFormat, Engine\n", + "\n", + "StorageFormat.put('Pandas')\n", + "Engine.put('ray')\n", + "\n", + "ray_df = pd.DataFrame([[0, 1], [2, 3]])\n", + "\n", + "print((ray_df.engine, ray_df.storage_format))\n", + "\n", + "# We should propagate engine through all operations,\n", + "# but we don't support these yet.\n", + "# print([df.engine for df in\n", + "# (\n", + "# ray_df.sort_values(0),\n", + "# ray_df.astype(str),\n", + "# ray_df.groupby(0).sum(),\n", + "# ray_df * ray_df,\n", + "# )\n", + "# ])\n", + "\n", + "# Make a python dataframe\n", + "Engine.put('Python')\n", + "python_df = pd.DataFrame([[4, 5], [6, 7]])\n", + "print((python_df.engine, python_df.storage_format))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "modin-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.21" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}