Skip to content

Commit

Permalink
Introduce new transaction model based on ephemeral branches (#258)
Browse files Browse the repository at this point in the history
This is a breaking change, as it requires users to give arguments to the return object of `LakeFSFileSystem.transaction`.

Changes the `LakeFSTransaction` class to operate on a repository+branch-scoped ephemeral branch.

File uploads, removals, and commits happen on that ephemeral branch. After transaction exit, the created branch is optionally merged back into the base branch.

The base branch (from which we branch off in the transaction) must exist before start of the transaction.

Some transaction APIs (merge, revert, tag) can also work on branches other than the transaction's ephemeral branch - this behavior might not be well-defined, and is up for debate on removal.

Similarly, on puts and deletes, one currently has to specify the transaction branch directly via the `LakeFSTransaction.branch` property. This can be made nicer by allowing resource paths relative to the transaction's repo and branch in special functions like e.g. `LakeFSTransaction.put()` (deferring to `fs.put()`), `LakeFSTransaction.rm()` (deferring to `fs.rm()`), and so on.

This pull request turns the LakeFSTransaction into a callable, which needs to be called with a repository name at minimum to ensure transaction execution.
  • Loading branch information
nicholasjng authored Feb 2, 2024
1 parent cc7d289 commit 1586b02
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 459 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ repos:
types_or: [ python, pyi ]
args: [--ignore-missing-imports, --scripts-are-modules]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.11
rev: v0.1.14
hooks:
- id: ruff
args: [ --fix, --exit-non-zero-on-fix ]
- id: ruff-format
- repo: https://github.com/PyCQA/bandit
rev: 1.7.6
rev: 1.7.7
hooks:
- id: bandit
args: [-c, pyproject.toml]
additional_dependencies: ["bandit[toml]"]
- repo: https://github.com/jsh9/pydoclint
rev: 0.3.8
rev: 0.3.9
hooks:
- id: pydoclint
- repo: https://github.com/jazzband/pip-tools
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ local_path.write_text("Hello, lakeFS!")

# Upload to lakeFS and create a commit
fs = LakeFSFileSystem() # will auto-discover config from ~/.lakectl.yaml
repo_path = f"{REPO}/{BRANCH}/{local_path.name}"

with fs.transaction as tx:
fs.put(str(local_path), repo_path)
tx.commit(REPO, BRANCH, "Add demo data")
# Upload a file on a temporary transaction branch
with fs.transaction(repository=REPO, base_branch=BRANCH) as tx:
fs.put(local_path, f"{REPO}/{tx.branch.id}/{local_path.name}")
tx.commit(message="Add demo data")

# Read back committed file
f = fs.open(repo_path, "rt")
f = fs.open(f"{REPO}/{BRANCH}/demo.txt", "rt")
print(f.readline()) # "Hello, lakeFS!"
```

Expand Down
8 changes: 3 additions & 5 deletions docs/_code/duckdb_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
fs = LakeFSFileSystem()
duckdb.register_filesystem(fs)

with fs.transaction as tx:
tx.create_branch("quickstart", "italian-lakes", "main")

with fs.transaction("quickstart", "main") as tx:
lakes = duckdb.read_parquet("lakefs://quickstart/main/lakes.parquet")
italian_lakes = duckdb.sql("SELECT * FROM lakes where Country='Italy'")
italian_lakes.to_csv("lakefs://quickstart/italian-lakes/italian_lakes.csv")
italian_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/italian_lakes.csv")

tx.commit("quickstart", "italian-lakes", "Add Italian lakes")
tx.commit(message="Add Italian lakes")
10 changes: 4 additions & 6 deletions docs/_code/pandas_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

fs = LakeFSFileSystem()

with fs.transaction as tx:
tx.create_branch("quickstart", "german-lakes", "main")

lakes = pd.read_parquet("lakefs://quickstart/main/lakes.parquet")
with fs.transaction("quickstart", "main") as tx:
lakes = pd.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet")
german_lakes = lakes.query('Country == "Germany"')
german_lakes.to_csv("lakefs://quickstart/german-lakes/german_lakes.csv")
german_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/german_lakes.csv")

tx.commit("quickstart", "german-lakes", "Add German lakes")
tx.commit(message="Add German lakes")
10 changes: 4 additions & 6 deletions docs/_code/polars_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@

fs = LakeFSFileSystem()

with fs.transaction as tx:
tx.create_branch("quickstart", "us-lakes", "main")

lakes = pl.read_parquet("lakefs://quickstart/main/lakes.parquet")
with fs.transaction("quickstart", "main") as tx:
lakes = pl.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet")
us_lakes = lakes.filter(pl.col("Country") == "United States of America")

with fs.open("lakefs://quickstart/us-lakes/us_lakes.csv", "wb") as f:
with fs.open(f"lakefs://quickstart/{tx.branch.id}/us_lakes.csv", "wb") as f:
us_lakes.write_csv(f)

tx.commit("quickstart", "us-lakes", "Add US lakes")
tx.commit(message="Add US lakes")
10 changes: 4 additions & 6 deletions docs/_code/pyarrow_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@

fs = LakeFSFileSystem()

with fs.transaction as tx:
tx.create_branch("quickstart", "partitioned-data", "main")

lakes_table = pq.read_table("quickstart/main/lakes.parquet", filesystem=fs)
with fs.transaction("quickstart", "main") as tx:
lakes_table = pq.read_table(f"quickstart/{tx.branch.id}/lakes.parquet", filesystem=fs)

ds.write_dataset(
lakes_table,
"quickstart/partitioned-data/lakes",
f"quickstart/{tx.branch.id}/lakes",
filesystem=fs,
format="csv",
partitioning=ds.partitioning(pa.schema([lakes_table.schema.field("Country")])),
)

tx.commit("quickstart", "partitioned-data", "Add partitioned lakes data set")
tx.commit("Add partitioned lakes data set")
13 changes: 7 additions & 6 deletions docs/_code/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
fs = LakeFSFileSystem() # will auto-discover credentials from ~/.lakectl.yaml
repo_path = f"{REPO}/{BRANCH}/{local_path.name}"

with fs.transaction as tx:
fs.put(str(local_path), repo_path)
tx.commit(REPO, BRANCH, "Add demo data")
with fs.transaction(REPO, BRANCH) as tx:
fs.put(str(local_path), f"{REPO}/{tx.branch.id}/{local_path.name}")
tx.commit(message="Add demo data")

# Read back the file contents
f = fs.open(repo_path, "rt")
Expand All @@ -30,7 +30,8 @@
print(fs.ls(f"{REPO}/{BRANCH}/"))

# Delete uploaded file from the repository (and commit)
with fs.transaction as tx:
fs.rm(repo_path)
tx.commit(REPO, BRANCH, "Delete demo data")
with fs.transaction(REPO, BRANCH) as tx:
fs.rm(f"{REPO}/{tx.branch.id}/{local_path.name}")
tx.commit(message="Delete demo data")

local_path.unlink()
89 changes: 36 additions & 53 deletions docs/guides/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,14 @@

In addition to file operations, you can carry out versioning operations in your Python code using file system *transactions*.

A transaction is essentially a context manager that collects all file uploads, defers them, and executes the uploads on completion of the transaction.
They are an "all or nothing" proposition: If an error occurs during the transaction, none of the queued files are uploaded.
Transactions in lakeFS-spec behave similarly to the transactions in the [high-level lakeFS SDK](https://docs.lakefs.io/integrations/python.html#transactions):
Both approaches create an ephemeral branch for a transaction, perform the operations in the context block on that ephemeral branch, and optionally merge it back into the source branch upon exiting the context manager.

!!! info
The transactions in lakeFS-spec are different from the transactions in the [high-level lakeFS SDK](https://docs.lakefs.io/integrations/python.html#transactions), which were added in v0.2.0.

*High-level lakeFS SDK* transactions create an ephemeral branch, perform the operations in the context block on that ephemeral branch, and merge it back into the source branch upon exiting the context manager.

*lakeFS-spec* transactions collect the versioning operations and perform them one by one directly on the source branch once the context manager is exited.
This allows for a more fine-grained control over the applied versioning operations. For example, multiple commits can be created in a single lakeFS-spec transaction.
They are an "all or nothing" proposition: If an error occurs during the transaction, the base branch is left unchanged.

The lakeFS-spec transaction inherits from fsspec transactions. For more information on fsspec transactions, see the [official documentation](https://filesystem-spec.readthedocs.io/en/latest/features.html#transactions).

The main features of the lakeFS file system transaction are:

## Atomicity

If an exception occurs anywhere during the transaction, all queued file uploads and versioning operations are discarded:

```python
from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction as tx:
fs.put_file("my-file.txt", "repo/main/my-file.txt")
tx.commit("repo", "main", message="Add my-file.txt")
raise ValueError("oops!")
```

The above code will not produce a commit on `main`, since the `ValueError` prompts a discard of the full upload queue.

## Versioning helpers
## Versioning operations

The lakeFS file system's transaction is the intended place for conducting versioning operations between file transfers.
The following is an example of file uploads with commit creations, with a tag being applied at the end.
Expand All @@ -44,39 +19,47 @@ from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction as tx:
fs.put_file("train-data.txt", "repo/main/train-data.txt")
tx.commit("repo", "main", message="Add training data")
fs.put_file("test-data.txt", "repo/main/test-data.txt")
sha = tx.commit("repo", "main", message="Add test data")
tx.tag("repo", sha, tag="My train-test split")
with fs.transaction("repo", "main") as tx:
fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt")
tx.commit(message="Add training data")
fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt")
sha = tx.commit(message="Add test data")
tx.tag(sha, name="My train-test split")
```

The full list of supported lakeFS versioning operations:
The full list of supported lakeFS versioning operations (by default, these operations target the transaction branch):

* [`commit`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.commit), for creating commits on a branch, optionally with attached metadata.
* [`create_branch`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.create_branch), for creating a new branch.
* [`merge`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.merge), for merging a given branch into another branch.
* [`revert`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.revert), for reverting a previous commit on a branch.
* [`commit`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.commit), for creating a commit, optionally with attached metadata.
* [`merge`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.merge), for merging a given branch.
* [`revert`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.revert), for reverting a previous commit.
* [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs.
* [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit.

### Reusing resources created in transactions
## Lifecycle of ephemeral transaction branches

You can control the lifecycle for a transaction branch with the `delete` argument:

* By default (`delete="onsuccess`), the branch is deleted after successful completion, and left over in case of failure for debugging purposes.
* If `delete="always"`, the branch is unconditionally deleted after the transaction regardless of its status.
* Similarly, if `delete="never"`, the branch is unconditionally left in place after the transaction.

Additionally, the `automerge` keyword controls whether the transaction branch is merged after successful completion of the transaction.
It has no effect if an error occurs over the course of the transaction.

## Error handling

Some transaction versioning helpers create new objects in the lakeFS instance that are not known before said helpers are actually executed.
An example of this is a commit SHA, which is only available once created by the lakeFS server.
In the above example, a commit is created directly after a file upload, but its actual SHA identifier will not be available until the transaction is complete.
After the transaction is completed, you can reuse the computed value (a [`Placeholder`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.Placeholder) object) in your code like you would any other lakeFS server result:
Since all files are uploaded to a short-lived transaction branch, no commit on the target branch happens in case of an exception:

```python
with fs.transaction as tx:
fs.put_file("my-file.txt", "repo/branch/my-file.txt")
sha = tx.commit("repo", "branch", message="Add my-file.txt")
from lakefs_spec import LakeFSFileSystem

# after transaction completion, just use the SHA value as normal.
fs.get_file(f"repo/{sha.id}/my-file.txt", "my-new-file.txt")
```
fs = LakeFSFileSystem()

## Thread safety
with fs.transaction("repo", "main", delete="onsuccess") as tx:
fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt")
tx.commit(message="Add my-file.txt")
raise ValueError("oops!")
```

Through its use of `collections.deque` as a store for uploads, upload queueing and file transfers are thread-safe.
The above code will not modify the `main` branch, since the `ValueError` prevents the merge of the transaction branch.
Note that you can examine the contents of the transaction branch due to `delete="onsuccess"` (the default behavior), which prevents deletion of the branch in case of failure for debugging purposes.
58 changes: 26 additions & 32 deletions docs/tutorials/demo_data_science_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ def _maybe_urlretrieve(url: str, filename: str) -> str:
"""

# %%
NEW_BRANCH_NAME = "transform-raw-data"
NEW_BRANCH = lakefs.Branch(REPO_NAME, "transform-raw-data", client=fs.client)
NEW_BRANCH.create("main")

with fs.transaction as tx:
fs.put(outfile, f"{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json")
tx.commit(repository=REPO_NAME, branch=NEW_BRANCH_NAME, message="Add 2010 weather data")
with fs.transaction(REPO_NAME, NEW_BRANCH) as tx:
fs.put(outfile, f"{REPO_NAME}/{tx.branch.id}/weather-2010.json")
tx.commit(message="Add 2010 weather data")

# %% [markdown]
"""
Expand Down Expand Up @@ -214,9 +215,9 @@ def transform_json_weather_data(filepath):
"""

# %%
with fs.transaction as tx:
df.to_csv(f"lakefs://{REPO_NAME}/main/weather_2010.csv")
tx.commit(repository=REPO_NAME, branch="main", message="Update weather data")
with fs.transaction(REPO_NAME, "main") as tx:
df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/weather_2010.csv")
tx.commit(message="Update weather data")

# %% [markdown]
"""
Expand All @@ -241,16 +242,13 @@ def transform_json_weather_data(filepath):
"""

# %%
TRAINING_BRANCH = "training"
TRAINING_BRANCH = lakefs.Branch(REPO_NAME, "training", client=fs.client)
TRAINING_BRANCH.create("main")

with fs.transaction as tx:
train.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv")
test.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv")
tx.commit(
repository=REPO_NAME,
branch=TRAINING_BRANCH,
message="Add train-test split of 2010 weather data",
)
with fs.transaction(REPO_NAME, TRAINING_BRANCH) as tx:
train.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/train_weather.csv")
test.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/test_weather.csv")
tx.commit(message="Add train-test split of 2010 weather data")

# %% [markdown]
"""
Expand Down Expand Up @@ -298,9 +296,9 @@ def transform_json_weather_data(filepath):

new_data = transform_json_weather_data(outfile)

with fs.transaction as tx:
new_data.to_csv(f"lakefs://{REPO_NAME}/main/weather_2020.csv")
tx.commit(repository=REPO_NAME, branch="main", message="Add 2020 weather data")
with fs.transaction(REPO_NAME, "main") as tx:
new_data.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/weather_2020.csv")
tx.commit(message="Add 2020 weather data")

# Remove leftover temporary files from previous `urlretrieve` calls
urllib.request.urlcleanup()
Expand All @@ -320,14 +318,10 @@ def transform_json_weather_data(filepath):
print(f"Updated train data shape: {train_df.shape}")
print(f"Updated test data shape: {test_df.shape}")

with fs.transaction as tx:
train_df.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv")
test_df.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv")
tx.commit(
repository=REPO_NAME,
branch=TRAINING_BRANCH,
message="Add train-test split of 2010 and 2020 data",
)
with fs.transaction(REPO_NAME, TRAINING_BRANCH) as tx:
train_df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/train_weather.csv")
test_df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/test_weather.csv")
tx.commit(message="Add train-test split of 2010 and 2020 data")

# %% [markdown]
"""
Expand Down Expand Up @@ -364,7 +358,7 @@ def transform_json_weather_data(filepath):
# %%

# access the data of the previous commit with a lakefs ref expression, in this case the same as in git.
previous_commit = repo.ref(f"{TRAINING_BRANCH}~").get_commit()
previous_commit = repo.ref(f"{TRAINING_BRANCH.id}~").get_commit()
fixed_commit_id = previous_commit.id
print(fixed_commit_id)

Expand Down Expand Up @@ -409,9 +403,9 @@ def transform_json_weather_data(filepath):


# %%
with fs.transaction as tx:
# the `tag` result is simply the tag name, in this case 'train-test-split-2010'.
tag = tx.tag(repository=REPO_NAME, ref=fixed_commit_id, tag="train-test-split-2010")
with fs.transaction(REPO_NAME, "main") as tx:
# returns the tag as a lakeFS object.
tag = tx.tag(fixed_commit_id, name="train-test-split-2010")


# %% [markdown]
Expand All @@ -424,7 +418,7 @@ def transform_json_weather_data(filepath):
train_from_commit = pd.read_csv(
f"lakefs://{REPO_NAME}/{fixed_commit_id}/train_weather.csv", index_col=0
)
train_from_tag = pd.read_csv(f"lakefs://{REPO_NAME}/{tag}/train_weather.csv", index_col=0)
train_from_tag = pd.read_csv(f"lakefs://{REPO_NAME}/{tag.id}/train_weather.csv", index_col=0)

# %% [markdown]
"""
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ classifiers = [
"Typing :: Typed",
]

dependencies = ["fsspec>=2023.6.0", "lakefs>=0.2.0", "wrapt"]
dependencies = ["fsspec>=2023.6.0", "lakefs>=0.2.0"]

dynamic = ["version"]

Expand Down Expand Up @@ -133,4 +133,7 @@ style = 'numpy'
exclude = '\.git|venv'

[tool.coverage.report]
exclude_also = ["@overload"]
exclude_also = [
"@overload",
"raise NotImplementedError"
]
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ typing-extensions==4.9.0
tzdata==2023.4
urllib3==2.0.7
virtualenv==20.25.0
wrapt==1.16.0

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Loading

0 comments on commit 1586b02

Please sign in to comment.