Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let merge_datasets_as_delayed merge >2 datasets and filter by predicates #235

Open
mlondschien opened this issue Mar 3, 2020 · 6 comments · May be fixed by #243
Open

Let merge_datasets_as_delayed merge >2 datasets and filter by predicates #235

mlondschien opened this issue Mar 3, 2020 · 6 comments · May be fixed by #243

Comments

@mlondschien
Copy link
Contributor

It would be nice to be able to supply kartothek.io.dask.delayed.merge_datasets_as_delayed with a list of dataset_uuids to merge an arbitrary number of datasets.

This could be implemented by

  • defining a new kartothek.dask.delayed.concat_datasets_as_delayed or similar which takes a list dataset_uuids as an argument or
  • allow kartothek.dask.delayed.merge_datasets_as_delayed to be supplied with a list for left

to not break existing usages of the function.

The match_how="left" would need to be replaced by match_how="first". I am not sure how to translate match_how="prefix". What is a typical use case here? Additionally it would be nice to supply a merge_func via merge_tasks that takes more than two dataframes as input. This would require a similar change as above, either defining a MetaPartition.concat_dataframes or similar method or allowing MetaPartition.merge_dataframes to be supplied with a list for left.

Questions concerning the current implementation of merge_datasets_as_delayed:

  • For merging with match_how="exact", the keys of the partitions are compared to match partitions. However, since the keys include the name of the stored parquet files, they will never match. What is the idea here?
  • It is annoying to have to supply the names of the tables in merge_tasks. Why not supply the merge_func with all available dataframes if no labels (currently left and right) are supplied?

Additionally it would be nice to be able to supply merge_datasets_as_delayed with a predicates argument that filters by partitions before merging and then uses filter_df_from_predicates after merging.

@fjetter
Copy link
Collaborator

fjetter commented Mar 10, 2020

Firstly, thanks for the interest and sorry for the delayed response.

Secondly, the entire currently existing alignment logic is about as old as the library and was never really refactored. I'm open to breaking some eggs in this situation (We have a few other things which should be addressed in terms of UX so friendly, breaking release would be appropriate in the near future, anyhow)

he keys of the partitions are compared to match partitions. However, since the keys include the name of the stored parquet files, they will never match. What is the idea here?

As I said, this implementation is dated back a long time. In the very first iterations we used hard coded file names (e.g. partition_0, partition_1) which were provided by the user application. In this context this makes a lot of sense and allows for the easiest alignment, obviously.
I consider providing these labels as a bad practice and want to remove this in the next breaking release so we can safely consider this merge option as deprecated.

It is annoying to have to supply the names of the tables in merge_tasks

Similar to the labels I would encourage people not to actually use multi-table datasets and stick to a single table per dataset. If we remove this feature this question also becomes obsolete.

it would be nice to be able to supply merge_datasets_as_delayed with a predicates argument

Definitely.

Why not supply the merge_func with all available dataframes

As a first iteration I would propose to stick to a simple deep join, i.e. merge(merge(df1, df2), df3),...) and not include this in the interface. I'm curious, do you need a custom merge function or would this suite your need?

Full disclosure: We do have in in-house implementation of a more advanced multi dataset alignment. The alignment logic is essentially based on "match partition_keys" and does not allow more advanced alignment. We intend to push this OSS as well but the time line is, best case, in a few weeks. I'd be curious if this would suite your need.

Can you elaborate a bit more how you want to use this?

@fjetter
Copy link
Collaborator

fjetter commented Mar 10, 2020

Depending on how your datasets look like (partitioning and indexing), #226 could also be interesting for you. If a simple join axis suffices, you could let dask figure out the join once you have an index

@fjetter
Copy link
Collaborator

fjetter commented Mar 10, 2020

I am currently shocked, we forgot to put the tests for the merge pipeline in the upstream package 😱 (that's embarrassing)

@xhochy
Copy link
Contributor

xhochy commented Mar 11, 2020

I am currently shocked, we forgot to put the tests for the merge pipeline in the upstream package 😱 (that's embarrassing)

No, there are tests in https://github.com/JDASoftwareGroup/kartothek/blob/9314d66b2b35a64282945c6b6ae24d8bb5a51ed0/kartothek/io/testing/merge.py and https://github.com/JDASoftwareGroup/kartothek/blob/9314d66b2b35a64282945c6b6ae24d8bb5a51ed0/tests/io/dask/delayed/test_merge.py

They are very basic but cover at least the majority of the code to show that #239 breaks these tests.

As a first iteration I would propose to stick to a simple deep join, i.e. merge(merge(df1, df2), df3),...) and not include this in the interface. I'm curious, do you need a custom merge function or would this suite your need?

Supply all DataFrames to the merge function is a enormous performance benefit. If you don't do a plain merge but can do other optimizations like only use pd.concat (you need to be very careful for that), this saves a lot of time in loading.

@fjetter
Copy link
Collaborator

fjetter commented Mar 11, 2020

If you don't do a plain merge but can do other optimizations like only use pd.concat (you need to be very careful for that), this saves a lot of time in loading.

Agreed, but I would argue this doesn't need to be part of the public interface, does it? I guess it should be possible to find a suitable fast join method which can be used for everything.

@xhochy xhochy linked a pull request Mar 11, 2020 that will close this issue
4 tasks
@mlondschien
Copy link
Contributor Author

No worries for taking your time to respond. I guess @xhochy (who is now implementing the merge functionality much better than I could) could answer most of your questions? Simply put, the issue we were facing is that the combined dataset is too big to fit into memory before subselecting columns and applying predicates. We have around 1000 partitions per dataset with only one parquet file per partition, such that sequentially loading chunks (or a subset of columns), aligning (basically pd.concat) and applying predicates seems like the optimal solution (and basically describes what is done in #243).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

3 participants