-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#6492: Add from_map
feature to create dataframe
#7215
Conversation
Signed-off-by: Igoshev, Iaroslav <[email protected]>
Signed-off-by: Igoshev, Iaroslav <[email protected]>
6175037
to
139d3b5
Compare
Signed-off-by: Igoshev, Iaroslav <[email protected]>
Signed-off-by: Igoshev, Iaroslav <[email protected]>
@@ -258,3 +261,66 @@ def func(df, **kw): # pragma: no cover | |||
UnidistWrapper.materialize( | |||
[part.list_of_blocks[0] for row in result for part in row] | |||
) | |||
|
|||
@classmethod | |||
def from_map(cls, func, iterable, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use already implemented functions with num_splits=1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get what would you like use use instead. Please elaborate. We are adding a new from_map
by analogy with other io functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can't use anything from existing functionality as every method of a Modin Dataframe assumes there is a dataframe with partitions to apply a function to.
@@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame: | |||
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj)) | |||
|
|||
|
|||
def from_map(func, iterable, *args, **kwargs) -> DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation needs to be updated I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have docs for such methods as from_pandas, from_ray, from_dask, etc. Do you think we should update docs on this matter in a separate issue in one go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ок
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YarShev are you going to do this before release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be great - #7256.
[ | ||
[ | ||
cls.frame_partition_cls( | ||
deploy_map_func.remote(func, obj, *args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use RayWrraper.deploy
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And corresponding wrappers for other engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RayWrapper.deploy deploys a function that can return any object but here we intentionally wrap a result in a pandas DataFrame if the user hasn't done so. I would leave the changes as is. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce the likelihood of error, we need to either have all launch options in one place, or use only one method. There is a tendency that launching functions becomes more difficult due to additional parameters. A good example is resources=RayTaskCustomResources.get()
, which is currently not taken into account here.
We can move this function to engine_wrapper.py
and call it inside Raywrapper.deploy
using an additional parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-used *.deploy.
Signed-off-by: Igoshev, Iaroslav <[email protected]>
partitions = np.array( | ||
[ | ||
[ | ||
cls.frame_partition_cls( | ||
DaskWrapper.deploy( | ||
func, | ||
f_args=(obj,) + args, | ||
f_kwargs=kwargs, | ||
return_pandas_df=True, | ||
) | ||
) | ||
] | ||
for obj in iterable | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the information required to perform this task, it seems that a more appropriate level at which to define the function would be a partition manager, for example somewhere around:
def create_partition_from_metadata(cls, **metadata): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave it here. Imagine a case when iterable is a list files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imagine a case when iterable is a list files.
We'll be abstracting from the parameters just like we're doing now, so I don't see any difference.
|
||
|
||
@pytest.mark.skipif( | ||
condition=Engine.get() not in ("Ray", "Dask", "Unidist"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be more correct to limit it not by engines, but by storage format: pandas
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PandasOnPython wouldn't work. Let's leave as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PandasOnPython wouldn't work.
As far as I can see, there are no restrictions on its operation. We just need to add essentially the same code as for the other engines.
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date