Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev committed Apr 29, 2024
1 parent 9fa2891 commit 7325e4f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 95 deletions.
20 changes: 17 additions & 3 deletions modin/core/execution/dask/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

from collections import UserDict

import pandas
from dask.distributed import wait
from distributed import Future
from distributed.client import default_client


def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -30,6 +31,8 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -38,7 +41,10 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
distributed.Future or list
Dask identifier of the result being put into distributed memory.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class DaskWrapper:
Expand All @@ -50,6 +56,7 @@ def deploy(
func,
f_args=None,
f_kwargs=None,
return_pandas_df=None,
num_returns=1,
pure=True,
):
Expand All @@ -64,6 +71,8 @@ def deploy(
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
The number of returned objects.
pure : bool, default: True
Expand All @@ -82,7 +91,12 @@ def deploy(
else:
# for the case where type(func) is distributed.Future
remote_task_future = client.submit(
_deploy_dask_func, func, *args, pure=pure, **kwargs
_deploy_dask_func,
func,
*args,
pure=pure,
return_pandas_df=return_pandas_df,
**kwargs,
)
if num_returns != 1:
return [
Expand Down
34 changes: 6 additions & 28 deletions modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Module houses class that implements ``BaseIO`` using Dask as an execution engine."""

import numpy as np
import pandas
from distributed.client import default_client

from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -217,40 +216,19 @@ def from_map(cls, func, iterable, *args, **kwargs):
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
client = default_client()
partitions = np.array(
[
[
cls.frame_partition_cls(
client.submit(deploy_map_func, func, obj, *args, **kwargs)
DaskWrapper.deploy(
func,
f_args=(obj,) + args,
f_kwargs=kwargs,
return_pandas_df=True,
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.
Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
18 changes: 14 additions & 4 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from types import FunctionType
from typing import Sequence

import pandas
import ray
from ray.util.client.common import ClientObjectRef

Expand All @@ -30,7 +31,7 @@


@ray.remote
def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
def _deploy_ray_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -40,6 +41,8 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -48,7 +51,10 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
ray.ObjectRef or list
Ray identifier of the result being put to Plasma store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class RayWrapper:
Expand All @@ -57,7 +63,9 @@ class RayWrapper:
_func_cache = {}

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.
Expand All @@ -69,6 +77,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.
Expand All @@ -81,7 +91,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)
).remote(func, *args, return_pandas_df=return_pandas_df, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
31 changes: 3 additions & 28 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import numpy as np
import pandas
import ray
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

Expand Down Expand Up @@ -335,36 +334,12 @@ def from_map(cls, func, iterable, *args, **kwargs):
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
RayWrapper.deploy(
func, f_args=(obj,) + args, return_pandas_df=True, **kwargs
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@ray.remote
def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.
Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
20 changes: 16 additions & 4 deletions modin/core/execution/unidist/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import asyncio

import pandas
import unidist


@unidist.remote
def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
def _deploy_unidist_func(
func, *args, return_pandas_df=None, **kwargs
): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -33,6 +36,8 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -41,14 +46,19 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
unidist.ObjectRef or list[unidist.ObjectRef]
Unidist identifier of the result being put to object store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class UnidistWrapper:
"""Mixin that provides means of running functions remotely and getting local results."""

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.
Expand All @@ -60,6 +70,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.
Expand All @@ -71,7 +83,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_unidist_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
func, *args, return_pandas_df=return_pandas_df, **kwargs
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import numpy as np
import pandas
import unidist
from pandas.io.common import get_handle, stringify_path

from modin.core.execution.unidist.common import SignalActor, UnidistWrapper
Expand Down Expand Up @@ -291,36 +290,15 @@ def from_map(cls, func, iterable, *args, **kwargs):
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
UnidistWrapper.deploy(
func,
f_args=(obj,) + args,
f_kwargs=kwargs,
return_pandas_df=True,
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@unidist.remote
def deploy_map_func(func, obj, *args, **kwargs): # pragma: no cover
"""
Deploy a func to apply to an object.
Parameters
----------
func : callable
Function to map across the iterable object.
obj : object
An object to apply a function to.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
pandas.DataFrame
"""
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result

0 comments on commit 7325e4f

Please sign in to comment.