Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge_many_datasets_as_delayed #243

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

xhochy
Copy link
Contributor

@xhochy xhochy commented Mar 11, 2020

This implements a merge that works on multiple datasets. For the moment, I have kept the code separate from the existing merge. Aligning partitions is done depending on match_how as in some cases, we can speed it up greatly by using e.g. the indices dataframes.

Fixes #235

Missing:

  • predicate support
  • columns support
  • exact matching
  • merge with existing code

@codecov
Copy link

codecov bot commented Mar 11, 2020

Codecov Report

Merging #243 into master will decrease coverage by 0.08%.
The diff coverage is 86.02%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #243      +/-   ##
==========================================
- Coverage   89.73%   89.64%   -0.09%     
==========================================
  Files          39       39              
  Lines        3720     3795      +75     
  Branches      901      927      +26     
==========================================
+ Hits         3338     3402      +64     
- Misses        224      230       +6     
- Partials      158      163       +5
Impacted Files Coverage Δ
kartothek/core/factory.py 85.29% <100%> (+4.73%) ⬆️
kartothek/io/dask/delayed.py 100% <100%> (ø) ⬆️
kartothek/io_components/read.py 89.28% <71.42%> (-3.45%) ⬇️
kartothek/io_components/metapartition.py 90.86% <73.68%> (-0.52%) ⬇️
kartothek/io_components/merge.py 92.94% <87.23%> (-7.06%) ⬇️
kartothek/core/common_metadata.py 94.73% <0%> (-0.66%) ⬇️
kartothek/io/eager.py 76.96% <0%> (-0.13%) ⬇️
kartothek/core/dataset.py 87.78% <0%> (-0.04%) ⬇️
kartothek/io_components/utils.py 84.45% <0%> (ø) ⬆️
... and 1 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 65948dc...3bfa595. Read the comment docs.

from kartothek.io_components.utils import _instantiate_store, _make_callable

if TYPE_CHECKING:
from simplekv import KeyValueStore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from simplekv import KeyValueStore
from simplekv import KeyValueStore # noqa: F401

linting fails: kartothek/io_components/merge.py:14:5: F401 'simplekv.KeyValueStore' imported but unused

----------
dataset_uuids : List[str]
match_how : Union[str, Callable]
Define the partition label matching scheme.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the whole thing label-based and not index-based?

Define the partition label matching scheme.
Available implementations are:

* first : The partitions of the first dataset are considered to be the base
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to the question above: do we really need different string-based join modes?

* first : The partitions of the first dataset are considered to be the base
partitions and **all** partitions of the remaining datasets are
joined to the partitions of the first dataset. This should only be
used if all but the first dataset contain very few partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "few" mean? What happens if this is not the case? Please give the user more guidance and try to provide a more failure-proof API.

explicit instructions for a specific merge.
Each dict should contain key/values:

* 'output_label' : The table for the merged dataframe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the tables key from the example below?

* `merge_func`: A callable with signature
`merge_func(dfs, merge_kwargs)` to
handle the data preprocessing and merging.
* 'merge_kwargs' : The kwargs to be passed to the `merge_func`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required, use a partial instead.

If False (Default), the partition labels of the dataset with fewer
partitions are interpreted as prefixes.
merge_tasks : List[Dict]
A list of merge tasks. Each item in this list is a dictionary giving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does a merge task drop/consume its input tables? if not, I think this might be a memory issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Let merge_datasets_as_delayed merge >2 datasets and filter by predicates
3 participants