From 50f94700a0579c54c47db2e28f575e246d5e8ae8 Mon Sep 17 00:00:00 2001 From: Ilia Zaitcev Date: Wed, 4 Aug 2021 14:43:22 +0200 Subject: [PATCH] Deprecation warnings added to I/O piplines --- kartothek/io/dask/bag.py | 19 +++++++++++++++++++ kartothek/io_components/read.py | 17 +++++++++++++---- tests/io/dask/bag/test_read.py | 22 ++++++++++++++++++++++ tests/io_components/test_read.py | 25 ++++++++++++++++++++++++- 4 files changed, 78 insertions(+), 5 deletions(-) diff --git a/kartothek/io/dask/bag.py b/kartothek/io/dask/bag.py index a76d218f..68611316 100644 --- a/kartothek/io/dask/bag.py +++ b/kartothek/io/dask/bag.py @@ -79,6 +79,25 @@ def read_dataset_as_metapartitions_bag( dask.bag.Bag: A dask.bag object containing the metapartions. """ + if label_filter is not None: + warnings.warn( + "The keyword `label_filter` is deprecated and will be removed in the next major release.", + DeprecationWarning + ) + + if load_dataset_metadata is not False: + warnings.warn( + "The keyword `load_dataset_metadata` is deprecated and will be removed in the next major release.", + DeprecationWarning + ) + + if concat_partitions_on_primary_index is not False: + warnings.warn( + "The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major " + "release.", + DeprecationWarning + ) + ds_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store, diff --git a/kartothek/io_components/read.py b/kartothek/io_components/read.py index b4b36c81..b72c1847 100644 --- a/kartothek/io_components/read.py +++ b/kartothek/io_components/read.py @@ -55,8 +55,13 @@ def dispatch_metapartitions_from_factory( :meta private: """ - if dispatch_metadata: + if label_filter is not None: + warnings.warn( + "The keyword `label_filter` is deprecated and will be removed in the next major release.", + DeprecationWarning + ) + if dispatch_metadata: warnings.warn( "The dispatch of metadata and index information as part of the MetaPartition instance is deprecated. " "The future behaviour will be that this metadata is not dispatched. To set the future behaviour, " @@ -70,9 +75,11 @@ def dispatch_metapartitions_from_factory( "`concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. " "Please only provide the `dispatch_by` argument. " ) + if concat_partitions_on_primary_index: warnings.warn( - "The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major release. Use `dispatch_by=dataset_factory.partition_keys` to achieve the same behavior instead.", + "The keyword `concat_partitions_on_primary_index` is deprecated and will be removed in the next major " + "release. Use `dispatch_by=dataset_factory.partition_keys` to achieve the same behavior instead.", DeprecationWarning, ) dispatch_by = dataset_factory.partition_keys @@ -81,8 +88,10 @@ def dispatch_metapartitions_from_factory( set(dataset_factory.index_columns) ): raise RuntimeError( - f"Dispatch columns must be indexed.\nRequested index: {dispatch_by} but available index columns: {sorted(dataset_factory.index_columns)}" + f"Dispatch columns must be indexed.\nRequested index: {dispatch_by} " + f"but available index columns: {sorted(dataset_factory.index_columns)}" ) + check_predicates(predicates) # Determine which indices need to be loaded. @@ -102,7 +111,7 @@ def dispatch_metapartitions_from_factory( list(index_cols), predicates=predicates ) - if label_filter: + if label_filter is not None: base_df = base_df[base_df.index.map(label_filter)] indices_to_dispatch = { diff --git a/tests/io/dask/bag/test_read.py b/tests/io/dask/bag/test_read.py index 23d9af07..0dafcbed 100644 --- a/tests/io/dask/bag/test_read.py +++ b/tests/io/dask/bag/test_read.py @@ -65,3 +65,25 @@ def test_read_dataset_as_dataframes_partition_size(store_factory, metadata_versi dataset_uuid="partitioned_uuid", store=store_factory, partition_size=2 ) assert bag.npartitions == 2 + + +@pytest.mark.parametrize("option,value", [ + ("concat_partitions_on_primary_index", True), + ("label_filter", lambda x: True), + ("load_dataset_metadata", True), +]) +def test_read_dataset_as_metapartitions_bag_deprecated_arguments(store_factory, option, value): + store_dataframes_as_dataset( + store=store_factory, + dataset_uuid="partitioned_uuid", + dfs=[pd.DataFrame({"A": [1, 1], "B": [10, 10]})] + ) + + deprecated = {option: value} + + with pytest.warns(DeprecationWarning, match=option): + read_dataset_as_metapartitions_bag( + dataset_uuid="partitioned_uuid", + store=store_factory, + **deprecated + ) diff --git a/tests/io_components/test_read.py b/tests/io_components/test_read.py index ed967052..7d7c6542 100644 --- a/tests/io_components/test_read.py +++ b/tests/io_components/test_read.py @@ -6,9 +6,10 @@ import pandas as pd import pytest +from kartothek.core.factory import _ensure_factory from kartothek.io.eager import store_dataframes_as_dataset from kartothek.io_components.metapartition import SINGLE_TABLE, MetaPartition -from kartothek.io_components.read import dispatch_metapartitions +from kartothek.io_components.read import dispatch_metapartitions, dispatch_metapartitions_from_factory def test_dispatch_metapartitions(dataset, store_session): @@ -279,3 +280,25 @@ def test_dispatch_metapartitions_complex_or_predicates(store_factory): } ) pd.testing.assert_frame_equal(actual, expected) + + +def test_dispatch_metapartitions_from_factory_deprecated_arguments(store): + store_dataframes_as_dataset( + dfs=[pd.DataFrame({"p": [0], "x": [0]}), pd.DataFrame({"p": [0], "x": [1]})], + dataset_uuid="test", + store=store, + partition_on=["p"], + ) + + ds_factory = _ensure_factory( + dataset_uuid="test", + store=store, + factory=None, + load_dataset_metadata=False, + ) + + with pytest.warns(DeprecationWarning): + next(dispatch_metapartitions_from_factory( + dataset_factory=ds_factory, + label_filter=lambda x: True, + ))