Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecation warnings added to I/O piplines #490

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions kartothek/io/dask/bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ def read_dataset_as_metapartitions_bag(
dask.bag.Bag:
A dask.bag object containing the metapartions.
"""
if label_filter is not None:
warnings.warn(
"The keyword `label_filter` is deprecated and will be removed in the next major release.",
DeprecationWarning
)

if load_dataset_metadata is not False:
warnings.warn(
"The keyword `load_dataset_metadata` is deprecated and will be removed in the next major release.",
DeprecationWarning
)

if concat_partitions_on_primary_index is not False:
warnings.warn(
"The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major "
"release.",
DeprecationWarning
)

ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
Expand Down
17 changes: 13 additions & 4 deletions kartothek/io_components/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ def dispatch_metapartitions_from_factory(

:meta private:
"""
if dispatch_metadata:
if label_filter is not None:
warnings.warn(
"The keyword `label_filter` is deprecated and will be removed in the next major release.",
DeprecationWarning
)

if dispatch_metadata:
warnings.warn(
"The dispatch of metadata and index information as part of the MetaPartition instance is deprecated. "
"The future behaviour will be that this metadata is not dispatched. To set the future behaviour, "
Expand All @@ -70,9 +75,11 @@ def dispatch_metapartitions_from_factory(
"`concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. "
"Please only provide the `dispatch_by` argument. "
)

if concat_partitions_on_primary_index:
warnings.warn(
"The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. Use `dispatch_by=dataset_factory.partition_keys` to achieve the same behavior instead.",
"The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major "
"release. Use `dispatch_by=dataset_factory.partition_keys` to achieve the same behavior instead.",
DeprecationWarning,
)
dispatch_by = dataset_factory.partition_keys
Expand All @@ -81,8 +88,10 @@ def dispatch_metapartitions_from_factory(
set(dataset_factory.index_columns)
):
raise RuntimeError(
f"Dispatch columns must be indexed.\nRequested index: {dispatch_by} but available index columns: {sorted(dataset_factory.index_columns)}"
f"Dispatch columns must be indexed.\nRequested index: {dispatch_by} "
f"but available index columns: {sorted(dataset_factory.index_columns)}"
)

check_predicates(predicates)

# Determine which indices need to be loaded.
Expand All @@ -102,7 +111,7 @@ def dispatch_metapartitions_from_factory(
list(index_cols), predicates=predicates
)

if label_filter:
if label_filter is not None:
base_df = base_df[base_df.index.map(label_filter)]

indices_to_dispatch = {
Expand Down
22 changes: 22 additions & 0 deletions tests/io/dask/bag/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,25 @@ def test_read_dataset_as_dataframes_partition_size(store_factory, metadata_versi
dataset_uuid="partitioned_uuid", store=store_factory, partition_size=2
)
assert bag.npartitions == 2


@pytest.mark.parametrize("option,value", [
("concat_partitions_on_primary_index", True),
("label_filter", lambda x: True),
("load_dataset_metadata", True),
])
def test_read_dataset_as_metapartitions_bag_deprecated_arguments(store_factory, option, value):
store_dataframes_as_dataset(
store=store_factory,
dataset_uuid="partitioned_uuid",
dfs=[pd.DataFrame({"A": [1, 1], "B": [10, 10]})]
)

deprecated = {option: value}

with pytest.warns(DeprecationWarning, match=option):
read_dataset_as_metapartitions_bag(
dataset_uuid="partitioned_uuid",
store=store_factory,
**deprecated
)
25 changes: 24 additions & 1 deletion tests/io_components/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import pandas as pd
import pytest

from kartothek.core.factory import _ensure_factory
from kartothek.io.eager import store_dataframes_as_dataset
from kartothek.io_components.metapartition import SINGLE_TABLE, MetaPartition
from kartothek.io_components.read import dispatch_metapartitions
from kartothek.io_components.read import dispatch_metapartitions, dispatch_metapartitions_from_factory


def test_dispatch_metapartitions(dataset, store_session):
Expand Down Expand Up @@ -279,3 +280,25 @@ def test_dispatch_metapartitions_complex_or_predicates(store_factory):
}
)
pd.testing.assert_frame_equal(actual, expected)


def test_dispatch_metapartitions_from_factory_deprecated_arguments(store):
store_dataframes_as_dataset(
dfs=[pd.DataFrame({"p": [0], "x": [0]}), pd.DataFrame({"p": [0], "x": [1]})],
dataset_uuid="test",
store=store,
partition_on=["p"],
)

ds_factory = _ensure_factory(
dataset_uuid="test",
store=store,
factory=None,
load_dataset_metadata=False,
)

with pytest.warns(DeprecationWarning):
next(dispatch_metapartitions_from_factory(
dataset_factory=ds_factory,
label_filter=lambda x: True,
))