Skip to content

Commit

Permalink
Track engine and storage format at query compiler level.
Browse files Browse the repository at this point in the history
Signed-off-by: sfc-gh-mvashishtha <[email protected]>
  • Loading branch information
sfc-gh-mvashishtha committed Jan 18, 2025
1 parent e59ea4e commit a471636
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 21 deletions.
1 change: 0 additions & 1 deletion modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

__all__ = [
"EnvironmentVariable",
"DataFrameVariable" # dataframe specific configuration
"Parameter",
"ValueSource",
"context",
Expand Down
13 changes: 6 additions & 7 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ValueSource,
)


class EnvironmentVariable(Parameter, type=str, abstract=True):
"""Base class for environment variables-based configuration."""

Expand Down Expand Up @@ -169,13 +170,10 @@ class IsDebug(EnvironmentVariable, type=bool):
varname = "MODIN_DEBUG"


class DataFrameVariable(EnvironmentVariable, type=str):
@classmethod
def get(cls) -> Any:
return super().get()

class Engine(DataFrameVariable, type=str):
"""Distribution engine to run queries by."""
class Engine(EnvironmentVariable, type=str):
# TODO(hybrid-execution): We should probably rename this to DefaultEngine
# to prevent confusing it with the per-dataframe engine.
"""The default engine for new dataframes."""

varname = "MODIN_ENGINE"
choices = ("Ray", "Dask", "Python", "Unidist")
Expand All @@ -196,6 +194,7 @@ def _get_default(cls) -> str:
str
"""
from modin.utils import MIN_DASK_VERSION, MIN_RAY_VERSION, MIN_UNIDIST_VERSION

# If there's a custom engine, we don't need to check for any engine
# dependencies. Return the default "Python" engine.
if IsDebug.get() or cls.has_custom_engine:
Expand Down
9 changes: 5 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import (
Engine,
IsRayCluster,
MinColumnPartitionSize,
MinRowPartitionSize,
Expand Down Expand Up @@ -1677,7 +1676,9 @@ def copy(self):
)

@lazy_metadata_decorator(apply_axis="both")
def astype(self, col_dtypes, errors: str = "raise"):
def astype(
self, col_dtypes, engine: str, storage_format: str, errors: str = "raise"
):
"""
Convert the columns dtypes to given dtypes.
Expand Down Expand Up @@ -1707,7 +1708,7 @@ def astype(self, col_dtypes, errors: str = "raise"):
new_dtypes = self_dtypes.copy()
# Update the new dtype series to the proper pandas dtype
new_dtype = pandas.api.types.pandas_dtype(dtype)
if self._getEngineConfig().get() == "Dask" and hasattr(dtype, "_is_materialized"):
if engine == "Dask" and hasattr(dtype, "_is_materialized"):

Check warning on line 1711 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L1711

Added line #L1711 was not covered by tests
# FIXME: https://github.com/dask/distributed/issues/8585
_ = dtype._materialize_categories()

Expand Down Expand Up @@ -1736,7 +1737,7 @@ def astype_builder(df):
if not (col_dtypes == self_dtypes).all():
new_dtypes = self_dtypes.copy()
new_dtype = pandas.api.types.pandas_dtype(col_dtypes)
if self._getEngineConfig().get() == "Dask" and hasattr(new_dtype, "_is_materialized"):
if engine == "Dask" and hasattr(new_dtype, "_is_materialized"):

Check warning on line 1740 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L1740

Added line #L1740 was not covered by tests
# FIXME: https://github.com/dask/distributed/issues/8585
_ = new_dtype._materialize_categories()
if isinstance(new_dtype, pandas.CategoricalDtype):
Expand Down
28 changes: 24 additions & 4 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from pandas.core.indexing import check_bool_indexer
from pandas.errors import DataError

from modin.config import CpuCount, RangePartitioning
from modin.config import CpuCount, Engine, RangePartitioning, StorageFormat
from modin.core.dataframe.algebra import (
Binary,
Fold,
Expand Down Expand Up @@ -292,9 +292,20 @@ class PandasQueryCompiler(BaseQueryCompiler, QueryCompilerCaster):
_modin_frame: PandasDataframe
_shape_hint: Optional[str]

def __init__(self, modin_frame: PandasDataframe, shape_hint: Optional[str] = None):
def __init__(
self,
modin_frame: PandasDataframe,
storage_format: str,
engine: str,
shape_hint: Optional[str] = None,
):
self._modin_frame = modin_frame
self._shape_hint = shape_hint
self._storage_format = storage_format
self._engine = engine

Check warning on line 305 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L304-L305

Added lines #L304 - L305 were not covered by tests

storage_format = property(lambda self: self._storage_format)
engine = property(lambda self: self._engine)

@property
def lazy_row_labels(self):
Expand Down Expand Up @@ -373,7 +384,11 @@ def to_pandas(self):

@classmethod
def from_pandas(cls, df, data_cls):
return cls(data_cls.from_pandas(df))
return cls(

Check warning on line 387 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L387

Added line #L387 was not covered by tests
data_cls.from_pandas(df),
engine=Engine.get(),
storage_format=StorageFormat.get(),
)

@classmethod
def from_arrow(cls, at, data_cls):
Expand Down Expand Up @@ -2315,7 +2330,12 @@ def astype(self, col_dtypes, errors: str = "raise"):
# layer. This query compiler assumes there won't be any errors due to
# invalid type keys.
return self.__constructor__(
self._modin_frame.astype(col_dtypes, errors=errors),
self._modin_frame.astype(
col_dtypes,
errors=errors,
engine=self.engine,
storage_format=self.storage_format,
),
shape_hint=self._shape_hint,
)

Expand Down
15 changes: 15 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import functools
import inspect
from functools import cached_property
from types import FunctionType, MethodType
from typing import Any, Dict, Tuple, TypeVar

Expand Down Expand Up @@ -54,6 +55,20 @@ def __init_subclass__(
super().__init_subclass__(**kwargs)
apply_argument_cast(cls)

# TODO(hybrid-execution): This is a trick to propagate the storage
# format and engine through nearly all query compiler methods. Since
# the query compiler caster tries to convert other query compiler
# inputs to the type of this query compiler, we can assume that the
# output query compiler has the same type as the current query
# compiler. Caveat: the query compiler caster looks at type only, but
# doesn't differentiate between two query compilers with different
# storage formats / engines
cls.__constructor__ = cached_property(
lambda self: functools.partial(
type(self), engine=self.engine, storage_format=self.storage_format
)
)


def cast_nested_args_to_current_qc_type(arguments, current_qc):
"""
Expand Down
14 changes: 9 additions & 5 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
Union,
)

from modin.config.envvars import Engine, StorageFormat
import numpy as np
import pandas
import pandas.core.generic
Expand Down Expand Up @@ -75,6 +74,7 @@
)

from modin import pandas as pd
from modin.config.envvars import Engine, StorageFormat
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger, disable_logging
from modin.pandas.accessor import CachedAccessor, ModinAPI
Expand Down Expand Up @@ -211,19 +211,23 @@ class BasePandasDataset(ClassLogger):
_siblings: list[BasePandasDataset]
_engine_override: Engine = None
_storage_override: StorageFormat = None

def _getEngineConfig(self) -> Engine:
if self._engine_override is not None:
return self._engine_override
else:
else:
return Engine

def _getStorageConfig(self) -> Engine:
if self._storage_override is not None:
return self._storage_override
else:
else:
return StorageFormat

engine = property(lambda self: self._query_compiler.engine)

storage_format = property(lambda self: self._query_compiler.storage_format)

@cached_property
def _is_dataframe(self) -> bool:
"""
Expand Down
81 changes: 81 additions & 0 deletions per_dataframe_engine_demo.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-01-17 22:53:54,581\tINFO worker.py:1821 -- Started a local Ray instance.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"('Ray', 'Pandas')\n",
"['Ray', 'Ray', 'Ray', 'Ray']\n",
"('Python', 'Pandas')\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.\n"
]
}
],
"source": [
"import modin.pandas as pd\n",
"from modin.config import StorageFormat, Engine\n",
"\n",
"StorageFormat.put('Pandas')\n",
"Engine.put('ray')\n",
"\n",
"ray_df = pd.DataFrame([[0, 1], [2, 3]])\n",
"\n",
"print((ray_df.engine, ray_df.storage_format))\n",
"\n",
"# We should propagate engine through all operations\n",
"print([df.engine for df in\n",
" (\n",
" ray_df.sort_values(0),\n",
" ray_df.astype(str),\n",
" ray_df.groupby(0).sum(),\n",
" ray_df * ray_df,\n",
" )\n",
"])\n",
"\n",
"# Make a python dataframe\n",
"Engine.put('Python')\n",
"python_df = pd.DataFrame([[4, 5], [6, 7]])\n",
"print((python_df.engine, python_df.storage_format))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "modin-dev",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.21"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit a471636

Please sign in to comment.