Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing code moved from kartothek.io.testing to tests.* #456

Open
wants to merge 70 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
85c1761
LML-98: initial version, supports renaming of datasets
martin-haffner-by Mar 8, 2021
70e9b21
LML-98: Copying and renaming of datasets
martin-haffner-by Mar 16, 2021
8e9c72b
LML-98: bugfix and test
martin-haffner-by Mar 16, 2021
6d966b4
LML-98: modifications
martin-haffner-by Mar 17, 2021
1739fdd
LML-98: modifications
martin-haffner-by Mar 17, 2021
42c5fda
LML-98: adapted to recent changes in master
martin-haffner-by Mar 17, 2021
2ca2a44
LML-98: incresed test cov
martin-haffner-by Mar 18, 2021
d202766
LML-98: implemented copying and renaming for cubes based on dataset c…
martin-haffner-by Mar 22, 2021
e9eeee1
LML-98: fixed types in docstrings
martin-haffner-by Mar 22, 2021
5e72a37
LML-98: refactoring after review
martin-haffner-by Mar 24, 2021
6bef567
LML-98: fixed docstring type issues
martin-haffner-by Mar 24, 2021
7de197c
LML-98: fixed docstring type issues
martin-haffner-by Mar 24, 2021
cc178b3
LDE-714: first attempt
martin-haffner-by Mar 25, 2021
302c1ad
LDE-714: added further read tests
martin-haffner-by Mar 25, 2021
6130677
LDE-714: moved read tests
martin-haffner-by Mar 26, 2021
bd2a0a0
LDE-714: removed reading tests from kartothek.io.testing package
martin-haffner-by Mar 26, 2021
916092a
LDE-174: moved write tests
martin-haffner-by Mar 26, 2021
670b874
LDE-714: moved update tests
martin-haffner-by Mar 26, 2021
12d0d6e
LDE-714: moved delete tests
martin-haffner-by Apr 12, 2021
35b5f86
LDE-714: removed remaining empty update test file
martin-haffner-by Apr 12, 2021
eac6f6a
LDE-714: moved cube copy functions
martin-haffner-by Apr 12, 2021
078a12e
LDE-714: moved tests for append_cube
martin-haffner-by Apr 12, 2021
ffbc75d
LDE-714: moved tests for build_cube
martin-haffner-by Apr 13, 2021
05d14a2
LDE-714: moved tests for cleanup_cube
martin-haffner-by Apr 13, 2021
5e35945
LDE-714: moved tests for delete_cube
martin-haffner-by Apr 13, 2021
2e1ea4e
LDE-714: moved tests for extend_cube
martin-haffner-by Apr 13, 2021
b23e13d
LDE-714: moved tests for stats_cube
martin-haffner-by Apr 13, 2021
3942bb7
LDE-714: moved tests for update_cube
martin-haffner-by Apr 13, 2021
1a1f876
LDE-714: moved tests for index
martin-haffner-by Apr 14, 2021
60ccdf7
LDE-714: moved tests for gc
martin-haffner-by Apr 14, 2021
7f78f90
LDE-714: removed package kartothek.io.testing
martin-haffner-by Apr 14, 2021
679e0cf
LML-98: initial version, supports renaming of datasets
martin-haffner-by Mar 8, 2021
032cabf
LML-98: Copying and renaming of datasets
martin-haffner-by Mar 16, 2021
333034e
LML-98: bugfix and test
martin-haffner-by Mar 16, 2021
eac89e0
LML-98: modifications
martin-haffner-by Mar 17, 2021
da0fb52
LML-98: modifications
martin-haffner-by Mar 17, 2021
d7c64b5
LML-98: adapted to recent changes in master
martin-haffner-by Mar 17, 2021
42caca6
LML-98: incresed test cov
martin-haffner-by Mar 18, 2021
6f4310c
LML-98: implemented copying and renaming for cubes based on dataset c…
martin-haffner-by Mar 22, 2021
a4551b3
LML-98: fixed types in docstrings
martin-haffner-by Mar 22, 2021
ffdca8b
LML-98: refactoring after review
martin-haffner-by Mar 24, 2021
f1ca501
LML-98: fixed docstring type issues
martin-haffner-by Mar 24, 2021
658f9a0
LML-98: fixed docstring type issues
martin-haffner-by Mar 24, 2021
64f4f3f
LML-98: added changelog
martin-haffner-by Mar 26, 2021
4b82178
Fix docs
NeroCorleone Apr 7, 2021
28e4f57
Adress review comments
NeroCorleone Apr 12, 2021
0f8ae9e
Restructure copy cube
NeroCorleone Apr 15, 2021
668a334
Restructure and add tests
NeroCorleone Apr 15, 2021
b249554
LDE-714: first attempt
martin-haffner-by Mar 25, 2021
beb007e
LDE-714: added further read tests
martin-haffner-by Mar 25, 2021
c141a30
LDE-714: moved read tests
martin-haffner-by Mar 26, 2021
f4fff12
LDE-714: removed reading tests from kartothek.io.testing package
martin-haffner-by Mar 26, 2021
65f2253
LDE-174: moved write tests
martin-haffner-by Mar 26, 2021
7f6b55c
LDE-714: moved update tests
martin-haffner-by Mar 26, 2021
a3f77eb
LDE-714: moved delete tests
martin-haffner-by Apr 12, 2021
8534ac5
LDE-714: removed remaining empty update test file
martin-haffner-by Apr 12, 2021
1a71140
LDE-714: moved cube copy functions
martin-haffner-by Apr 12, 2021
454ca31
LDE-714: moved tests for append_cube
martin-haffner-by Apr 12, 2021
43593a7
LDE-714: moved tests for build_cube
martin-haffner-by Apr 13, 2021
a2cd93d
LDE-714: moved tests for cleanup_cube
martin-haffner-by Apr 13, 2021
0dcc8a1
LDE-714: moved tests for delete_cube
martin-haffner-by Apr 13, 2021
f4f410f
LDE-714: moved tests for extend_cube
martin-haffner-by Apr 13, 2021
edd572d
LDE-714: moved tests for stats_cube
martin-haffner-by Apr 13, 2021
d3f8541
LDE-714: moved tests for update_cube
martin-haffner-by Apr 13, 2021
7b30d93
LDE-714: moved tests for index
martin-haffner-by Apr 14, 2021
a837214
LDE-714: moved tests for gc
martin-haffner-by Apr 14, 2021
4eaf479
LDE-714: removed package kartothek.io.testing
martin-haffner-by Apr 14, 2021
8fdb036
LDE-714: rebased and merged
martin-haffner-by Apr 15, 2021
113e833
LDE-714: fixed tests for update
martin-haffner-by Apr 15, 2021
b3da54a
LDE-714: fixed doc issues, fixed unsued import
martin-haffner-by Apr 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ Kartothek 4.0.2 (2021-04-xx)
============================

* Fix a bug in ``MetaPartition._reconstruct_index_columns`` that would raise an ``IndexError`` when loading few columns of a dataset with many primary indices.
* Add :meth:`~kartothek.io.eager.copy_dataset` to copy and optionally rename datasets within one store or between stores (eager only)
* Add renaming option to :meth:`~kartothek.io.eager_cube.copy_cube`
* moved testing code from `~kartothek.io.testing`to `tests`


Kartothek 4.0.1 (2021-04-13)
============================

* Fixed dataset corruption after updates when table names other than "table" are used (#445).


Kartothek 4.0.0 (2021-03-17)
============================

Expand Down Expand Up @@ -476,7 +478,7 @@ Version 3.1.0 (2019-07-10)
- Ensure binary column names are read as type ``str``:

- Ensure dataframe columns are of type ``str`` in :func:`~kartothek.core.common_metadata.empty_dataframe_from_schema`
- Testing: create :func:`~kartothek.io.testing.read.test_binary_column_metadata` which checks column names stored as
- Testing: create `~kartothek.io.testing.read.test_binary_column_metadata` which checks column names stored as
``bytes`` objects are read as type ``str``

- fix issue where it was possible to add an index to an existing dataset by using update functions and partition indices
Expand Down
41 changes: 41 additions & 0 deletions kartothek/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,47 @@ def from_dataset(dataset):
ds_builder.partitions = dataset.partitions
return ds_builder

def modify_uuid(self, target_uuid: str):
"""
Modify the dataset uuid and depending metadata:
- paths to partitioning files
- path to index files

Parameters
----------
target_uuid: str
Modified dataset UUID.

Returns
-------
DatasetMetadataBuilder
modified builder object
"""

# modify file names in partition metadata
modified_partitions = {}
for p_key, p in self.partitions.items():
pdict = p.to_dict()
for table_key, table_file in pdict["files"].items():
if table_file.startswith(f"{self.uuid}/"):
pdict["files"][table_key] = table_file.replace(
self.uuid, target_uuid, 1
)
modified_partitions[p_key] = Partition.from_dict(p_key, pdict)

self.partitions = modified_partitions

for i_key, i in self.indices.items():
if (
isinstance(i, ExplicitSecondaryIndex)
and i.index_storage_key is not None
):
i.index_storage_key = i.index_storage_key.replace(
self.uuid, target_uuid, 1
)
self.uuid = target_uuid
return self

def add_partition(self, name, partition):
"""
Add an (embedded) Partition.
Expand Down
64 changes: 64 additions & 0 deletions kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from kartothek.core.naming import (
DEFAULT_METADATA_STORAGE_FORMAT,
DEFAULT_METADATA_VERSION,
METADATA_BASE_SUFFIX,
METADATA_FORMAT_JSON,
PARQUET_FILE_SUFFIX,
get_partition_file_prefix,
)
Expand Down Expand Up @@ -44,6 +46,8 @@
)
from kartothek.io_components.write import raise_if_dataset_exists
from kartothek.serialization import DataFrameSerializer
from kartothek.utils.ktk_adapters import get_dataset_keys
from kartothek.utils.store import copy_rename_keys

__all__ = (
"delete_dataset",
Expand All @@ -56,6 +60,7 @@
"update_dataset_from_dataframes",
"build_dataset_indices",
"garbage_collect_dataset",
"copy_dataset",
)


Expand Down Expand Up @@ -738,3 +743,62 @@ def garbage_collect_dataset(dataset_uuid=None, store=None, factory=None):
# Given that `nested_files` is a generator with a single element, just
# return the output of `delete_files` on that element.
return delete_files(next(nested_files), store_factory=ds_factory.store_factory)


def copy_dataset(
source_dataset_uuid: str,
store: KeyValueStore,
target_dataset_uuid: Optional[str] = None,
target_store: Optional[KeyValueStore] = None,
):
"""
Copies and optionally renames a dataset, either from one store to another or
within one store.

Parameters
----------
source_dataset_uuid:
UUID of source dataset
store:
Source store
target_dataset_uuid:
UUID of target dataset. May be the same as src_dataset_uuid, if store
and tgt_store are different. If empty, src_dataset_uuid is used
target_store:
Target Store. May be the same as store, if src_dataset_uuid and
target_dataset_uuid are different. If empty, value from parameter store is
used
"""
if target_dataset_uuid is None:
target_dataset_uuid = source_dataset_uuid
if target_store is None:
target_store = store

if (source_dataset_uuid == target_dataset_uuid) & (store == target_store):
raise ValueError(
"Cannot copy to a dataset with the same UUID within the same store!"
)

ds_factory_source = _ensure_factory(
dataset_uuid=source_dataset_uuid, store=store, factory=None,
)

# Create a dict of {source key: target key} entries
keys = get_dataset_keys(ds_factory_source.dataset_metadata)
mapped_keys = {
source_key: source_key.replace(source_dataset_uuid, target_dataset_uuid)
for source_key in keys
}

# Create a dict of metadata which has to be changed. This is only the
# <uuid>.by-dataset-metadata.json file
md_transformed = {
f"{source_dataset_uuid}{METADATA_BASE_SUFFIX}{METADATA_FORMAT_JSON}": DatasetMetadataBuilder.from_dataset(
ds_factory_source.dataset_metadata
)
.modify_uuid(target_dataset_uuid)
.to_json()[1]
}

# Copy the keys from one store to another
copy_rename_keys(mapped_keys, store, target_store, md_transformed)
129 changes: 117 additions & 12 deletions kartothek/io/eager_cube.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""
Eager IO aka "everything is done locally and immediately".
"""
import logging
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Union

import pandas as pd
import simplekv
Expand All @@ -15,18 +16,21 @@
KTK_CUBE_DF_SERIALIZER,
KTK_CUBE_METADATA_STORAGE_FORMAT,
KTK_CUBE_METADATA_VERSION,
KTK_CUBE_UUID_SEPARATOR,
)
from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadata
from kartothek.core.typing import StoreFactory
from kartothek.io.eager import (
copy_dataset,
delete_dataset,
store_dataframes_as_dataset,
update_dataset_from_dataframes,
)
from kartothek.io_components.cube.append import check_existing_datasets
from kartothek.io_components.cube.cleanup import get_keys_to_clean
from kartothek.io_components.cube.common import assert_stores_different
from kartothek.io_components.cube.copy import get_copy_keys
from kartothek.io_components.cube.copy import get_copy_keys, get_datasets_to_copy
from kartothek.io_components.cube.query import load_group, plan_query, quick_concat
from kartothek.io_components.cube.remove import (
prepare_metapartitions_for_removal_action,
Expand All @@ -53,6 +57,8 @@
from kartothek.utils.pandas import concat_dataframes
from kartothek.utils.store import copy_keys

logger = logging.getLogger()

__all__ = (
"append_to_cube",
"build_cube",
Expand Down Expand Up @@ -419,10 +425,57 @@ def delete_cube(cube, store, datasets=None):
store.delete(k)


def copy_cube(cube, src_store, tgt_store, overwrite=False, datasets=None):
def _transform_uuid(
src_uuid: str,
cube_prefix: str,
renamed_cube_prefix: Optional[str],
renamed_datasets: Optional[Dict[str, str]],
):
"""
Transform a uuid from <old cube prefix>++<old dataset> to
<new cube prefix>++<new dataset>
:param src_uuid:
Uuid to transform
:param cube_prefix:
Cube prefix before renaming
:param renamed_cube:
Optional new cube prefix
:param renamed_datasets:
Optional dict of {old dataset name: new dataset name} entries to rename datasets
"""
tgt_uuid = src_uuid
if renamed_cube_prefix:
tgt_uuid = src_uuid.replace(
f"{cube_prefix}{KTK_CUBE_UUID_SEPARATOR}",
f"{renamed_cube_prefix}{KTK_CUBE_UUID_SEPARATOR}",
)

if renamed_datasets:
for ds_old, ds_new in renamed_datasets.items():
if f"{KTK_CUBE_UUID_SEPARATOR}{ds_old}" in tgt_uuid:
tgt_uuid = tgt_uuid.replace(
f"{KTK_CUBE_UUID_SEPARATOR}{ds_old}",
f"{KTK_CUBE_UUID_SEPARATOR}{ds_new}",
)
return tgt_uuid


def copy_cube(
cube: Cube,
src_store: Union[KeyValueStore, Callable[[], KeyValueStore]],
tgt_store: Union[KeyValueStore, Callable[[], KeyValueStore]],
overwrite: bool = False,
datasets: Union[None, Iterable[str], Dict[str, DatasetMetadata]] = None,
renamed_cube_prefix: Optional[str] = None,
renamed_datasets: Optional[Dict[str, str]] = None,
):
"""
Copy cube from one store to another.

.. warning::
A failing copy operation can not be rolled back if the `overwrite` flag is enabled
and might leave the overwritten dataset in an inconsistent state.

Parameters
----------
cube: Cube
Expand All @@ -433,9 +486,16 @@ def copy_cube(cube, src_store, tgt_store, overwrite=False, datasets=None):
Target KV store.
overwrite: bool
If possibly existing datasets in the target store should be overwritten.
datasets: Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]
datasets: Union[None, Iterable[str], Dict[str, DatasetMetadata]]
Datasets to copy, must all be part of the cube. May be either the result of :func:`~kartothek.api.discover.discover_datasets`, a list
of Ktk_cube dataset ID or ``None`` (in which case entire cube will be copied).
renamed_cube_prefix: Optional[str]
Optional new cube prefix. If specified, the cube will be renamed while copying.
renamed_datasets: Optional[Dict[str, str]]
Optional dict with {old dataset name: new dataset name} entries. If provided,
the datasets will be renamed accordingly during copying. When the parameter
datasets is specified, the datasets to rename must be a subset of the datasets
to copy.
"""
if callable(src_store):
src_store = src_store()
Expand All @@ -445,14 +505,59 @@ def copy_cube(cube, src_store, tgt_store, overwrite=False, datasets=None):
src_store, tgt_store, cube.ktk_dataset_uuid(cube.seed_dataset)
)

keys = get_copy_keys(
cube=cube,
src_store=src_store,
tgt_store=tgt_store,
overwrite=overwrite,
datasets=datasets,
)
copy_keys(keys, src_store, tgt_store)
if (renamed_cube_prefix is None) and (renamed_datasets is None):
# If we don't rename the datasets, we can perform an optimized copy operation.
keys = get_copy_keys(
cube=cube,
src_store=src_store,
tgt_store=tgt_store,
overwrite=overwrite,
datasets=datasets,
)
copy_keys(keys, src_store, tgt_store)
else:
datasets_to_copy = get_datasets_to_copy(
cube=cube,
src_store=src_store,
tgt_store=tgt_store,
overwrite=overwrite,
datasets=datasets,
)
copied = [] # type: List[str]
for src_ds_name, src_ds_meta in datasets_to_copy.items():
tgt_ds_uuid = _transform_uuid(
src_uuid=src_ds_meta.uuid,
cube_prefix=cube.uuid_prefix,
renamed_cube_prefix=renamed_cube_prefix,
renamed_datasets=renamed_datasets,
)
try:
copy_dataset(
source_dataset_uuid=src_ds_meta.uuid,
store=src_store,
target_dataset_uuid=tgt_ds_uuid,
target_store=tgt_store,
)
except Exception as e:
if overwrite:
# We can't roll back safely if the target dataset has been partially overwritten.
raise RuntimeError(e)
else:
logger.exception(
f"Copying datasets from source dataset {src_ds_meta.uuid} to target {tgt_ds_uuid} failed."
)

# Roll back the unsuccesful copy operation by deleting all datasets that have been copied so far from the target store.
for ds_uuid in copied:
delete_dataset(dataset_uuid=ds_uuid, store=tgt_store)

logger.info(
"Deleted the copied datasets {} from target store.".format(
", ".join(copied)
)
)
else:
copied.append(tgt_ds_uuid)


def collect_stats(cube, store, datasets=None):
Expand Down
Loading