Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting a snapshot property in same commit as spark.sql #368

Closed
brianfromoregon opened this issue Feb 5, 2024 · 19 comments
Closed
Labels

Comments

@brianfromoregon
Copy link

Feature Request / Improvement

This cookbook has a java snippet to update a snapshot property atomically with a sql MERGE INTO.

// Update the target table and set the watermark in the same commit
CommitMetadata.withCommitProperties(
    Map.of("watermark:raw.log_source", endSnapshotId),
    () -> {
        spark.sql("MERGE INTO ...");
        return 0;
     }, RuntimeException.class);

It would be great if pyiceberg allowed me to similarly wrap my spark.sql call and add snapshot properties.

@Fokko
Copy link
Contributor

Fokko commented Feb 5, 2024

Thanks for raising this @brianfromoregon!

I think it would be a great addition. We need to extend the .append and .overwrite API and allow passing in a map. And then it needs to be passed in when constructing the Summary:

summary=Summary(operation=self._operation, **ssc.build()),

Are you interested in contributing this? :)

@brianfromoregon
Copy link
Author

brianfromoregon commented Feb 6, 2024

Beyond writing snapshot summary fields, this issue is also requesting ability to write those fields in same snapshot as one created by spark.sql. That would take changes beyond what you describe right @Fokko . Ideally id have a single transaction to (1) read a summary field (2) run spark.sql (3) write a summary field

@Fokko
Copy link
Contributor

Fokko commented Feb 6, 2024

@brianfromoregon That's not possible. Spark will create the snapshot, and those are immutable. So you cannot update those afterward in PyIceberg.

@brianfromoregon
Copy link
Author

@Fokko Interesting, that makes sense, so what does the linked cookbook code mean when it says "in the same commit"?

@Fokko
Copy link
Contributor

Fokko commented Feb 6, 2024

@brianfromoregon In the cookbook example it will be in the same commit, which will result into a single snapshot. I was under the impression that you also want to replicate this on the Python side :)

@brianfromoregon
Copy link
Author

@Fokko Yes I am using python. So this is possible from java but impossible from python, interesting I wonder why.

@Fokko
Copy link
Contributor

Fokko commented Feb 6, 2024

Because it is not part of the API, so we need to extend it :) In Python, you would append an Arrow table to the Iceberg table and set the properties in the same commit (snapshot).

@brianfromoregon
Copy link
Author

Ok agreed. So my intention was to have this issue represent extending the API to allow same commit semantics like the java cookbook, and then issue #367 represent the (simpler) change to allow setting snapshot properties in general.

@sungwy sungwy added this to the PyIceberg 0.7.0 release milestone Feb 7, 2024
@ajosephides
Copy link

As @brianfromoregon has mentioned I also understood the issue raised to " represent extending the API to allow same commit semantics like the java"

@Fokko
Copy link
Contributor

Fokko commented Feb 8, 2024

I would love that, and this is what I suggested in #368 (comment)

@Gowthami03B
Copy link
Contributor

@brianfromoregon @Fokko can I take a stab at this?

@Gowthami03B
Copy link
Contributor

Gowthami03B commented Feb 14, 2024

#419

@corleyma
Copy link

corleyma commented Feb 29, 2024

I think there's still some confusion here, since there are two possible interpretations of "represent extending the API to allow same commit semantics like the java":

  • Interpretation 1: allow pyiceberg to both write snapshot properties and table updates (append/overwrite) in the same transaction (pyiceberg is doing both updates); this is the interpretation implemented in Change Append/Overwrite API to accept snapshot properties #419.
  • Interpretation 2: Allow pyiceberg and pyspark to participate in the same transaction, with pyiceberg writing snapshot properties and pyspark executing SQL to update the table. This is what the java cookbook example is showing, using java iceberg API with java spark API. The differences for interpretation 2 over interpretation 1 would be that you could do everything supported in PySpark (MERGE INTO, distributed processing of table updates, etc) that isn't possible (yet) in pyiceberg.

I think interpretation 2 is what @brianfromoregon was getting at, and I don't know how feasible it is... but I think both capabilities are nice so it's great to have #419, and if interpretation 2 is possible, that would also be really useful. Alternative to interpretation 2 would be some other way to set snapshot properties in PySpark without using pyiceberg, and I don't think that exists either.

@brianfromoregon
Copy link
Author

Hi @corleyma, my thinking was that Issue 367 is meant to represent "Interpretation 1" and this issue 368 is meant to represent "Interpretation 2". Fully agreed that both features are useful!

@sungwy
Copy link
Collaborator

sungwy commented Mar 26, 2024

Hi @brianfromoregon and @corleyma , from my understanding of PyIceberg and PySpark Iceberg, I'm not sure if allowing the two separate clients to participate in the same transaction will be possible any time soon. Currently, Transactions are designed as classes, and they are available only to the specific client that's building it.

This feature request implies that the transaction should be shared between the two separate clients which would need either:

  1. the Transaction class to be exchanged in a way that can be understood by both Spark and Python within the same machine (presumably the Spark driver)
  2. or have Transaction that is sent to an intelligent Catalog backend, that doesn't commit it immediately, but stages the transaction - so that the transaction can be looked up with a unique identifier and built upon by separate clients, until it is committed.

Is there a specific use case you are thinking of that requires both PySpark-Iceberg and PyIceberg? We know PyIceberg is still evolving, but it is growing fast and we will reach somewhat feature parity in the near future. After that, the choice of the client we use would really depend on the use case - would it require the built in distributed capabilities of spark? or do we want to perform simpler transactions through PyIceberg?

@Fokko - do you have any thoughts on this topic?

@sungwy sungwy removed this from the PyIceberg 0.7.0 release milestone Mar 26, 2024
@brianfromoregon
Copy link
Author

Hi @syun64, thanks for chiming in!

My batch app store historical data, there is always a date column. It runs for each date and will insert data for that date. Sometimes there is legitimately no data available for a particular date, no matter how many times it runs there will never be data. Other times the app has an error or fails to run and needs to be re-run for a date. I'm trying to allow my app to differentiate between missing dates and present-but-empty dates so it does not constantly try re-running for dates that will never produce data. When I was using raw parquet files I would simply write an empty file for a date to represent present-but-empty. Asking in Slack I learned that Iceberg does not support this concept (for example no empty partitions allowed) so instead I am aiming to use metadata (snapshot properties) to store the date ranges that are reflected in the stored data.

In order to implement this with snapshot properties I want my writer to do the following transactionally:

  1. Fetch the current snapshot's dateranges property.
  2. Modify that dateranges value to include the dates which are about to be written.
  3. Merge the new data and update the dateranges snapshot property, in the same new snapshot.

If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Today I use PySpark Iceberg for writing because PyIceberg does not yet support partitioned writes. PyIceberg is getting partitioned writes soon, I am excited to try it! But until then I'm using PySpark for writing and want some way to accomplish steps 1-3 from a python client. I hope this explains my goal and motivation.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

@sungwy
Copy link
Collaborator

sungwy commented Mar 26, 2024

In order to implement this with snapshot properties I want my writer to do the following transactionally:

Fetch the current snapshot's dateranges property.
Modify that dateranges value to include the dates which are about to be written.
Merge the new data and update the dateranges snapshot property, in the same new snapshot.
If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

I think you should be able to do this today by keeping track of the Iceberg table snapshot you are looking at to do task (1), and then writing with snapshot property and then using an isolation property based on the snapshot commit you've started your sequence of operations from, so that your commit fails if there has been a concurrent commit that was made since then.

https://iceberg.apache.org/docs/1.5.0/spark-configuration/#write-options

"isolation-level", "validate-from-snapshot-id" and "snapshot-property" are probably the write options you want to use to achieve your goal in PySpark. Let me know if that works for you!

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Sep 23, 2024
@sungwy
Copy link
Collaborator

sungwy commented Sep 23, 2024

Snapshot property can now be specified in PyIceberg Table APIs

#419

@sungwy sungwy closed this as completed Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants