Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#7230: Don't preserve bad partition for merge #7229

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@

"""Contains implementations for Merge/Join."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pandas
from pandas.core.dtypes.common import is_list_like
from pandas.errors import MergeError

from modin.config import MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.utils import join_columns
from modin.core.dataframe.pandas.metadata import ModinDtypes

from .utils import merge_partitioning

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler

Check warning on line 31 in modin/core/storage_formats/pandas/merge.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/merge.py#L31

Added line #L31 was not covered by tests


# TODO: add methods for 'join' here
class MergeImpl:
Expand Down Expand Up @@ -93,7 +101,9 @@
).reset_index(drop=True)

@classmethod
def row_axis_merge(cls, left, right, kwargs):
def row_axis_merge(
cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict
):
"""
Execute merge using row-axis implementation.

Expand Down Expand Up @@ -164,6 +174,16 @@
left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y"))
)

# We rebalance when the ratio of the number of existing partitions to
# the ideal number of partitions is smaller than this threshold. The
# threshold is a heuristic that may need to be tuned for performance.
if (
left._modin_frame._partitions.shape[0] < 0.3 * NPartitions.get()
# to avoid empty partitions after repartition; can materialize index
and len(left._modin_frame) > NPartitions.get() * MinPartitionSize.get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change slow down some of our benchmarks?

):
left = left.repartition(axis=0)

new_left = left.__constructor__(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
Expand Down
35 changes: 35 additions & 0 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,41 @@ def test_merge_partitioning(
)


def test_merge_with_bad_partitioning():
# https://github.com/modin-project/modin/pull/7229

left_partitioning = [256]
right_partitioning = [32, 32, 32, 32]

left_df = pandas.DataFrame(
[np.arange(sum(left_partitioning)) for _ in range(sum(left_partitioning))]
)
right_df = pandas.DataFrame(
[np.arange(sum(right_partitioning)) for _ in range(sum(right_partitioning))]
)

left = construct_modin_df_by_scheme(
left_df, {"row_lengths": left_partitioning, "column_widths": left_partitioning}
)
right = construct_modin_df_by_scheme(
right_df,
{"row_lengths": right_partitioning, "column_widths": right_partitioning},
)

left_frame = left._query_compiler._modin_frame
right_frame = right._query_compiler._modin_frame
assert left_frame.row_lengths == left_frame.column_widths == left_partitioning
assert right_frame.row_lengths == right_frame.column_widths == right_partitioning

# just a dummy value
return_value = pd.DataFrame([1, 2, 3, 4])._query_compiler
with mock.patch.object(
left._query_compiler, "repartition", return_value=return_value
) as repartition:
_ = left.merge(right)
repartition.assert_called_once_with(axis=0)
Comment on lines +842 to +846
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we test here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That in case of bad partitioning repartition was called (when performing merge operation).



def test_groupby_with_empty_partition():
# see #5461 for details
md_df = construct_modin_df_by_scheme(
Expand Down
Loading