From a4716369f1c8d1431de428c5426e16161afeb2f1 Mon Sep 17 00:00:00 2001 From: sfc-gh-mvashishtha Date: Fri, 17 Jan 2025 22:54:02 -0800 Subject: [PATCH] Track engine and storage format at query compiler level. Signed-off-by: sfc-gh-mvashishtha --- modin/config/__init__.py | 1 - modin/config/envvars.py | 13 ++- .../dataframe/pandas/dataframe/dataframe.py | 9 ++- .../storage_formats/pandas/query_compiler.py | 28 ++++++- .../pandas/query_compiler_caster.py | 15 ++++ modin/pandas/base.py | 14 ++-- per_dataframe_engine_demo.ipynb | 81 +++++++++++++++++++ 7 files changed, 140 insertions(+), 21 deletions(-) create mode 100644 per_dataframe_engine_demo.ipynb diff --git a/modin/config/__init__.py b/modin/config/__init__.py index 66ac1576134..d38596eff5c 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -60,7 +60,6 @@ __all__ = [ "EnvironmentVariable", - "DataFrameVariable" # dataframe specific configuration "Parameter", "ValueSource", "context", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 49ddb6a90a0..67fcf5f7f7e 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -32,6 +32,7 @@ ValueSource, ) + class EnvironmentVariable(Parameter, type=str, abstract=True): """Base class for environment variables-based configuration.""" @@ -169,13 +170,10 @@ class IsDebug(EnvironmentVariable, type=bool): varname = "MODIN_DEBUG" -class DataFrameVariable(EnvironmentVariable, type=str): - @classmethod - def get(cls) -> Any: - return super().get() - -class Engine(DataFrameVariable, type=str): - """Distribution engine to run queries by.""" +class Engine(EnvironmentVariable, type=str): + # TODO(hybrid-execution): We should probably rename this to DefaultEngine + # to prevent confusing it with the per-dataframe engine. + """The default engine for new dataframes.""" varname = "MODIN_ENGINE" choices = ("Ray", "Dask", "Python", "Unidist") @@ -196,6 +194,7 @@ def _get_default(cls) -> str: str """ from modin.utils import MIN_DASK_VERSION, MIN_RAY_VERSION, MIN_UNIDIST_VERSION + # If there's a custom engine, we don't need to check for any engine # dependencies. Return the default "Python" engine. if IsDebug.get() or cls.has_custom_engine: diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index e401a4a5d65..b8e0e329f1b 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,7 +33,6 @@ from pandas.core.indexes.api import Index, RangeIndex from modin.config import ( - Engine, IsRayCluster, MinColumnPartitionSize, MinRowPartitionSize, @@ -1677,7 +1676,9 @@ def copy(self): ) @lazy_metadata_decorator(apply_axis="both") - def astype(self, col_dtypes, errors: str = "raise"): + def astype( + self, col_dtypes, engine: str, storage_format: str, errors: str = "raise" + ): """ Convert the columns dtypes to given dtypes. @@ -1707,7 +1708,7 @@ def astype(self, col_dtypes, errors: str = "raise"): new_dtypes = self_dtypes.copy() # Update the new dtype series to the proper pandas dtype new_dtype = pandas.api.types.pandas_dtype(dtype) - if self._getEngineConfig().get() == "Dask" and hasattr(dtype, "_is_materialized"): + if engine == "Dask" and hasattr(dtype, "_is_materialized"): # FIXME: https://github.com/dask/distributed/issues/8585 _ = dtype._materialize_categories() @@ -1736,7 +1737,7 @@ def astype_builder(df): if not (col_dtypes == self_dtypes).all(): new_dtypes = self_dtypes.copy() new_dtype = pandas.api.types.pandas_dtype(col_dtypes) - if self._getEngineConfig().get() == "Dask" and hasattr(new_dtype, "_is_materialized"): + if engine == "Dask" and hasattr(new_dtype, "_is_materialized"): # FIXME: https://github.com/dask/distributed/issues/8585 _ = new_dtype._materialize_categories() if isinstance(new_dtype, pandas.CategoricalDtype): diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b62f2aa4474..4cbeec83bd5 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -45,7 +45,7 @@ from pandas.core.indexing import check_bool_indexer from pandas.errors import DataError -from modin.config import CpuCount, RangePartitioning +from modin.config import CpuCount, Engine, RangePartitioning, StorageFormat from modin.core.dataframe.algebra import ( Binary, Fold, @@ -292,9 +292,20 @@ class PandasQueryCompiler(BaseQueryCompiler, QueryCompilerCaster): _modin_frame: PandasDataframe _shape_hint: Optional[str] - def __init__(self, modin_frame: PandasDataframe, shape_hint: Optional[str] = None): + def __init__( + self, + modin_frame: PandasDataframe, + storage_format: str, + engine: str, + shape_hint: Optional[str] = None, + ): self._modin_frame = modin_frame self._shape_hint = shape_hint + self._storage_format = storage_format + self._engine = engine + + storage_format = property(lambda self: self._storage_format) + engine = property(lambda self: self._engine) @property def lazy_row_labels(self): @@ -373,7 +384,11 @@ def to_pandas(self): @classmethod def from_pandas(cls, df, data_cls): - return cls(data_cls.from_pandas(df)) + return cls( + data_cls.from_pandas(df), + engine=Engine.get(), + storage_format=StorageFormat.get(), + ) @classmethod def from_arrow(cls, at, data_cls): @@ -2315,7 +2330,12 @@ def astype(self, col_dtypes, errors: str = "raise"): # layer. This query compiler assumes there won't be any errors due to # invalid type keys. return self.__constructor__( - self._modin_frame.astype(col_dtypes, errors=errors), + self._modin_frame.astype( + col_dtypes, + errors=errors, + engine=self.engine, + storage_format=self.storage_format, + ), shape_hint=self._shape_hint, ) diff --git a/modin/core/storage_formats/pandas/query_compiler_caster.py b/modin/core/storage_formats/pandas/query_compiler_caster.py index 211860a8427..55b3439659d 100644 --- a/modin/core/storage_formats/pandas/query_compiler_caster.py +++ b/modin/core/storage_formats/pandas/query_compiler_caster.py @@ -21,6 +21,7 @@ import functools import inspect +from functools import cached_property from types import FunctionType, MethodType from typing import Any, Dict, Tuple, TypeVar @@ -54,6 +55,20 @@ def __init_subclass__( super().__init_subclass__(**kwargs) apply_argument_cast(cls) + # TODO(hybrid-execution): This is a trick to propagate the storage + # format and engine through nearly all query compiler methods. Since + # the query compiler caster tries to convert other query compiler + # inputs to the type of this query compiler, we can assume that the + # output query compiler has the same type as the current query + # compiler. Caveat: the query compiler caster looks at type only, but + # doesn't differentiate between two query compilers with different + # storage formats / engines + cls.__constructor__ = cached_property( + lambda self: functools.partial( + type(self), engine=self.engine, storage_format=self.storage_format + ) + ) + def cast_nested_args_to_current_qc_type(arguments, current_qc): """ diff --git a/modin/pandas/base.py b/modin/pandas/base.py index bd6e9908670..6945423b953 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -31,7 +31,6 @@ Union, ) -from modin.config.envvars import Engine, StorageFormat import numpy as np import pandas import pandas.core.generic @@ -75,6 +74,7 @@ ) from modin import pandas as pd +from modin.config.envvars import Engine, StorageFormat from modin.error_message import ErrorMessage from modin.logging import ClassLogger, disable_logging from modin.pandas.accessor import CachedAccessor, ModinAPI @@ -211,19 +211,23 @@ class BasePandasDataset(ClassLogger): _siblings: list[BasePandasDataset] _engine_override: Engine = None _storage_override: StorageFormat = None - + def _getEngineConfig(self) -> Engine: if self._engine_override is not None: return self._engine_override - else: + else: return Engine - + def _getStorageConfig(self) -> Engine: if self._storage_override is not None: return self._storage_override - else: + else: return StorageFormat + engine = property(lambda self: self._query_compiler.engine) + + storage_format = property(lambda self: self._query_compiler.storage_format) + @cached_property def _is_dataframe(self) -> bool: """ diff --git a/per_dataframe_engine_demo.ipynb b/per_dataframe_engine_demo.ipynb new file mode 100644 index 00000000000..56a8599ccdc --- /dev/null +++ b/per_dataframe_engine_demo.ipynb @@ -0,0 +1,81 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-01-17 22:53:54,581\tINFO worker.py:1821 -- Started a local Ray instance.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "('Ray', 'Pandas')\n", + "['Ray', 'Ray', 'Ray', 'Ray']\n", + "('Python', 'Pandas')\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.\n" + ] + } + ], + "source": [ + "import modin.pandas as pd\n", + "from modin.config import StorageFormat, Engine\n", + "\n", + "StorageFormat.put('Pandas')\n", + "Engine.put('ray')\n", + "\n", + "ray_df = pd.DataFrame([[0, 1], [2, 3]])\n", + "\n", + "print((ray_df.engine, ray_df.storage_format))\n", + "\n", + "# We should propagate engine through all operations\n", + "print([df.engine for df in\n", + " (\n", + " ray_df.sort_values(0),\n", + " ray_df.astype(str),\n", + " ray_df.groupby(0).sum(),\n", + " ray_df * ray_df,\n", + " )\n", + "])\n", + "\n", + "# Make a python dataframe\n", + "Engine.put('Python')\n", + "python_df = pd.DataFrame([[4, 5], [6, 7]])\n", + "print((python_df.engine, python_df.storage_format))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "modin-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.21" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}