diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6639cdb7..bf84647e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,19 +17,19 @@ repos: types_or: [ python, pyi ] args: [--ignore-missing-imports, --scripts-are-modules] - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.11 + rev: v0.1.14 hooks: - id: ruff args: [ --fix, --exit-non-zero-on-fix ] - id: ruff-format - repo: https://github.com/PyCQA/bandit - rev: 1.7.6 + rev: 1.7.7 hooks: - id: bandit args: [-c, pyproject.toml] additional_dependencies: ["bandit[toml]"] - repo: https://github.com/jsh9/pydoclint - rev: 0.3.8 + rev: 0.3.9 hooks: - id: pydoclint - repo: https://github.com/jazzband/pip-tools diff --git a/README.md b/README.md index 36544242..b05c30ad 100644 --- a/README.md +++ b/README.md @@ -59,14 +59,14 @@ local_path.write_text("Hello, lakeFS!") # Upload to lakeFS and create a commit fs = LakeFSFileSystem() # will auto-discover config from ~/.lakectl.yaml -repo_path = f"{REPO}/{BRANCH}/{local_path.name}" -with fs.transaction as tx: - fs.put(str(local_path), repo_path) - tx.commit(REPO, BRANCH, "Add demo data") +# Upload a file on a temporary transaction branch +with fs.transaction(repository=REPO, base_branch=BRANCH) as tx: + fs.put(local_path, f"{REPO}/{tx.branch.id}/{local_path.name}") + tx.commit(message="Add demo data") # Read back committed file -f = fs.open(repo_path, "rt") +f = fs.open(f"{REPO}/{BRANCH}/demo.txt", "rt") print(f.readline()) # "Hello, lakeFS!" ``` diff --git a/docs/_code/duckdb_example.py b/docs/_code/duckdb_example.py index b4460fbd..f42ce37e 100644 --- a/docs/_code/duckdb_example.py +++ b/docs/_code/duckdb_example.py @@ -5,11 +5,9 @@ fs = LakeFSFileSystem() duckdb.register_filesystem(fs) -with fs.transaction as tx: - tx.create_branch("quickstart", "italian-lakes", "main") - +with fs.transaction("quickstart", "main") as tx: lakes = duckdb.read_parquet("lakefs://quickstart/main/lakes.parquet") italian_lakes = duckdb.sql("SELECT * FROM lakes where Country='Italy'") - italian_lakes.to_csv("lakefs://quickstart/italian-lakes/italian_lakes.csv") + italian_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/italian_lakes.csv") - tx.commit("quickstart", "italian-lakes", "Add Italian lakes") + tx.commit(message="Add Italian lakes") diff --git a/docs/_code/pandas_example.py b/docs/_code/pandas_example.py index 93aa0aa1..2a44190a 100644 --- a/docs/_code/pandas_example.py +++ b/docs/_code/pandas_example.py @@ -4,11 +4,9 @@ fs = LakeFSFileSystem() -with fs.transaction as tx: - tx.create_branch("quickstart", "german-lakes", "main") - - lakes = pd.read_parquet("lakefs://quickstart/main/lakes.parquet") +with fs.transaction("quickstart", "main") as tx: + lakes = pd.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet") german_lakes = lakes.query('Country == "Germany"') - german_lakes.to_csv("lakefs://quickstart/german-lakes/german_lakes.csv") + german_lakes.to_csv(f"lakefs://quickstart/{tx.branch.id}/german_lakes.csv") - tx.commit("quickstart", "german-lakes", "Add German lakes") + tx.commit(message="Add German lakes") diff --git a/docs/_code/polars_example.py b/docs/_code/polars_example.py index ebeb56ff..f585d12b 100644 --- a/docs/_code/polars_example.py +++ b/docs/_code/polars_example.py @@ -4,13 +4,11 @@ fs = LakeFSFileSystem() -with fs.transaction as tx: - tx.create_branch("quickstart", "us-lakes", "main") - - lakes = pl.read_parquet("lakefs://quickstart/main/lakes.parquet") +with fs.transaction("quickstart", "main") as tx: + lakes = pl.read_parquet(f"lakefs://quickstart/{tx.branch.id}/lakes.parquet") us_lakes = lakes.filter(pl.col("Country") == "United States of America") - with fs.open("lakefs://quickstart/us-lakes/us_lakes.csv", "wb") as f: + with fs.open(f"lakefs://quickstart/{tx.branch.id}/us_lakes.csv", "wb") as f: us_lakes.write_csv(f) - tx.commit("quickstart", "us-lakes", "Add US lakes") + tx.commit(message="Add US lakes") diff --git a/docs/_code/pyarrow_example.py b/docs/_code/pyarrow_example.py index b7a19b10..2b4c45f0 100644 --- a/docs/_code/pyarrow_example.py +++ b/docs/_code/pyarrow_example.py @@ -6,17 +6,15 @@ fs = LakeFSFileSystem() -with fs.transaction as tx: - tx.create_branch("quickstart", "partitioned-data", "main") - - lakes_table = pq.read_table("quickstart/main/lakes.parquet", filesystem=fs) +with fs.transaction("quickstart", "main") as tx: + lakes_table = pq.read_table(f"quickstart/{tx.branch.id}/lakes.parquet", filesystem=fs) ds.write_dataset( lakes_table, - "quickstart/partitioned-data/lakes", + f"quickstart/{tx.branch.id}/lakes", filesystem=fs, format="csv", partitioning=ds.partitioning(pa.schema([lakes_table.schema.field("Country")])), ) - tx.commit("quickstart", "partitioned-data", "Add partitioned lakes data set") + tx.commit("Add partitioned lakes data set") diff --git a/docs/_code/quickstart.py b/docs/_code/quickstart.py index 3d830e3f..8582d632 100644 --- a/docs/_code/quickstart.py +++ b/docs/_code/quickstart.py @@ -12,9 +12,9 @@ fs = LakeFSFileSystem() # will auto-discover credentials from ~/.lakectl.yaml repo_path = f"{REPO}/{BRANCH}/{local_path.name}" -with fs.transaction as tx: - fs.put(str(local_path), repo_path) - tx.commit(REPO, BRANCH, "Add demo data") +with fs.transaction(REPO, BRANCH) as tx: + fs.put(str(local_path), f"{REPO}/{tx.branch.id}/{local_path.name}") + tx.commit(message="Add demo data") # Read back the file contents f = fs.open(repo_path, "rt") @@ -30,7 +30,8 @@ print(fs.ls(f"{REPO}/{BRANCH}/")) # Delete uploaded file from the repository (and commit) -with fs.transaction as tx: - fs.rm(repo_path) - tx.commit(REPO, BRANCH, "Delete demo data") +with fs.transaction(REPO, BRANCH) as tx: + fs.rm(f"{REPO}/{tx.branch.id}/{local_path.name}") + tx.commit(message="Delete demo data") + local_path.unlink() diff --git a/docs/guides/transactions.md b/docs/guides/transactions.md index 7a961cdc..4676e1e7 100644 --- a/docs/guides/transactions.md +++ b/docs/guides/transactions.md @@ -2,39 +2,14 @@ In addition to file operations, you can carry out versioning operations in your Python code using file system *transactions*. -A transaction is essentially a context manager that collects all file uploads, defers them, and executes the uploads on completion of the transaction. -They are an "all or nothing" proposition: If an error occurs during the transaction, none of the queued files are uploaded. +Transactions in lakeFS-spec behave similarly to the transactions in the [high-level lakeFS SDK](https://docs.lakefs.io/integrations/python.html#transactions): +Both approaches create an ephemeral branch for a transaction, perform the operations in the context block on that ephemeral branch, and optionally merge it back into the source branch upon exiting the context manager. -!!! info - The transactions in lakeFS-spec are different from the transactions in the [high-level lakeFS SDK](https://docs.lakefs.io/integrations/python.html#transactions), which were added in v0.2.0. - - *High-level lakeFS SDK* transactions create an ephemeral branch, perform the operations in the context block on that ephemeral branch, and merge it back into the source branch upon exiting the context manager. - - *lakeFS-spec* transactions collect the versioning operations and perform them one by one directly on the source branch once the context manager is exited. - This allows for a more fine-grained control over the applied versioning operations. For example, multiple commits can be created in a single lakeFS-spec transaction. +They are an "all or nothing" proposition: If an error occurs during the transaction, the base branch is left unchanged. The lakeFS-spec transaction inherits from fsspec transactions. For more information on fsspec transactions, see the [official documentation](https://filesystem-spec.readthedocs.io/en/latest/features.html#transactions). -The main features of the lakeFS file system transaction are: - -## Atomicity - -If an exception occurs anywhere during the transaction, all queued file uploads and versioning operations are discarded: - -```python -from lakefs_spec import LakeFSFileSystem - -fs = LakeFSFileSystem() - -with fs.transaction as tx: - fs.put_file("my-file.txt", "repo/main/my-file.txt") - tx.commit("repo", "main", message="Add my-file.txt") - raise ValueError("oops!") -``` - -The above code will not produce a commit on `main`, since the `ValueError` prompts a discard of the full upload queue. - -## Versioning helpers +## Versioning operations The lakeFS file system's transaction is the intended place for conducting versioning operations between file transfers. The following is an example of file uploads with commit creations, with a tag being applied at the end. @@ -44,39 +19,47 @@ from lakefs_spec import LakeFSFileSystem fs = LakeFSFileSystem() -with fs.transaction as tx: - fs.put_file("train-data.txt", "repo/main/train-data.txt") - tx.commit("repo", "main", message="Add training data") - fs.put_file("test-data.txt", "repo/main/test-data.txt") - sha = tx.commit("repo", "main", message="Add test data") - tx.tag("repo", sha, tag="My train-test split") +with fs.transaction("repo", "main") as tx: + fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt") + tx.commit(message="Add training data") + fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt") + sha = tx.commit(message="Add test data") + tx.tag(sha, name="My train-test split") ``` -The full list of supported lakeFS versioning operations: +The full list of supported lakeFS versioning operations (by default, these operations target the transaction branch): -* [`commit`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.commit), for creating commits on a branch, optionally with attached metadata. -* [`create_branch`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.create_branch), for creating a new branch. -* [`merge`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.merge), for merging a given branch into another branch. -* [`revert`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.revert), for reverting a previous commit on a branch. +* [`commit`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.commit), for creating a commit, optionally with attached metadata. +* [`merge`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.merge), for merging a given branch. +* [`revert`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.revert), for reverting a previous commit. * [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs. * [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit. -### Reusing resources created in transactions +## Lifecycle of ephemeral transaction branches + +You can control the lifecycle for a transaction branch with the `delete` argument: + +* By default (`delete="onsuccess`), the branch is deleted after successful completion, and left over in case of failure for debugging purposes. +* If `delete="always"`, the branch is unconditionally deleted after the transaction regardless of its status. +* Similarly, if `delete="never"`, the branch is unconditionally left in place after the transaction. + +Additionally, the `automerge` keyword controls whether the transaction branch is merged after successful completion of the transaction. +It has no effect if an error occurs over the course of the transaction. + +## Error handling -Some transaction versioning helpers create new objects in the lakeFS instance that are not known before said helpers are actually executed. -An example of this is a commit SHA, which is only available once created by the lakeFS server. -In the above example, a commit is created directly after a file upload, but its actual SHA identifier will not be available until the transaction is complete. -After the transaction is completed, you can reuse the computed value (a [`Placeholder`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.Placeholder) object) in your code like you would any other lakeFS server result: +Since all files are uploaded to a short-lived transaction branch, no commit on the target branch happens in case of an exception: ```python -with fs.transaction as tx: - fs.put_file("my-file.txt", "repo/branch/my-file.txt") - sha = tx.commit("repo", "branch", message="Add my-file.txt") +from lakefs_spec import LakeFSFileSystem -# after transaction completion, just use the SHA value as normal. -fs.get_file(f"repo/{sha.id}/my-file.txt", "my-new-file.txt") -``` +fs = LakeFSFileSystem() -## Thread safety +with fs.transaction("repo", "main", delete="onsuccess") as tx: + fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt") + tx.commit(message="Add my-file.txt") + raise ValueError("oops!") +``` -Through its use of `collections.deque` as a store for uploads, upload queueing and file transfers are thread-safe. +The above code will not modify the `main` branch, since the `ValueError` prevents the merge of the transaction branch. +Note that you can examine the contents of the transaction branch due to `delete="onsuccess"` (the default behavior), which prevents deletion of the branch in case of failure for debugging purposes. diff --git a/docs/tutorials/demo_data_science_project.py b/docs/tutorials/demo_data_science_project.py index e71d954b..aee6a05b 100644 --- a/docs/tutorials/demo_data_science_project.py +++ b/docs/tutorials/demo_data_science_project.py @@ -159,11 +159,12 @@ def _maybe_urlretrieve(url: str, filename: str) -> str: """ # %% -NEW_BRANCH_NAME = "transform-raw-data" +NEW_BRANCH = lakefs.Branch(REPO_NAME, "transform-raw-data", client=fs.client) +NEW_BRANCH.create("main") -with fs.transaction as tx: - fs.put(outfile, f"{REPO_NAME}/{NEW_BRANCH_NAME}/weather-2010.json") - tx.commit(repository=REPO_NAME, branch=NEW_BRANCH_NAME, message="Add 2010 weather data") +with fs.transaction(REPO_NAME, NEW_BRANCH) as tx: + fs.put(outfile, f"{REPO_NAME}/{tx.branch.id}/weather-2010.json") + tx.commit(message="Add 2010 weather data") # %% [markdown] """ @@ -214,9 +215,9 @@ def transform_json_weather_data(filepath): """ # %% -with fs.transaction as tx: - df.to_csv(f"lakefs://{REPO_NAME}/main/weather_2010.csv") - tx.commit(repository=REPO_NAME, branch="main", message="Update weather data") +with fs.transaction(REPO_NAME, "main") as tx: + df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/weather_2010.csv") + tx.commit(message="Update weather data") # %% [markdown] """ @@ -241,16 +242,13 @@ def transform_json_weather_data(filepath): """ # %% -TRAINING_BRANCH = "training" +TRAINING_BRANCH = lakefs.Branch(REPO_NAME, "training", client=fs.client) +TRAINING_BRANCH.create("main") -with fs.transaction as tx: - train.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv") - test.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv") - tx.commit( - repository=REPO_NAME, - branch=TRAINING_BRANCH, - message="Add train-test split of 2010 weather data", - ) +with fs.transaction(REPO_NAME, TRAINING_BRANCH) as tx: + train.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/train_weather.csv") + test.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/test_weather.csv") + tx.commit(message="Add train-test split of 2010 weather data") # %% [markdown] """ @@ -298,9 +296,9 @@ def transform_json_weather_data(filepath): new_data = transform_json_weather_data(outfile) -with fs.transaction as tx: - new_data.to_csv(f"lakefs://{REPO_NAME}/main/weather_2020.csv") - tx.commit(repository=REPO_NAME, branch="main", message="Add 2020 weather data") +with fs.transaction(REPO_NAME, "main") as tx: + new_data.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/weather_2020.csv") + tx.commit(message="Add 2020 weather data") # Remove leftover temporary files from previous `urlretrieve` calls urllib.request.urlcleanup() @@ -320,14 +318,10 @@ def transform_json_weather_data(filepath): print(f"Updated train data shape: {train_df.shape}") print(f"Updated test data shape: {test_df.shape}") -with fs.transaction as tx: - train_df.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/train_weather.csv") - test_df.to_csv(f"lakefs://{REPO_NAME}/{TRAINING_BRANCH}/test_weather.csv") - tx.commit( - repository=REPO_NAME, - branch=TRAINING_BRANCH, - message="Add train-test split of 2010 and 2020 data", - ) +with fs.transaction(REPO_NAME, TRAINING_BRANCH) as tx: + train_df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/train_weather.csv") + test_df.to_csv(f"lakefs://{REPO_NAME}/{tx.branch.id}/test_weather.csv") + tx.commit(message="Add train-test split of 2010 and 2020 data") # %% [markdown] """ @@ -364,7 +358,7 @@ def transform_json_weather_data(filepath): # %% # access the data of the previous commit with a lakefs ref expression, in this case the same as in git. -previous_commit = repo.ref(f"{TRAINING_BRANCH}~").get_commit() +previous_commit = repo.ref(f"{TRAINING_BRANCH.id}~").get_commit() fixed_commit_id = previous_commit.id print(fixed_commit_id) @@ -409,9 +403,9 @@ def transform_json_weather_data(filepath): # %% -with fs.transaction as tx: - # the `tag` result is simply the tag name, in this case 'train-test-split-2010'. - tag = tx.tag(repository=REPO_NAME, ref=fixed_commit_id, tag="train-test-split-2010") +with fs.transaction(REPO_NAME, "main") as tx: + # returns the tag as a lakeFS object. + tag = tx.tag(fixed_commit_id, name="train-test-split-2010") # %% [markdown] @@ -424,7 +418,7 @@ def transform_json_weather_data(filepath): train_from_commit = pd.read_csv( f"lakefs://{REPO_NAME}/{fixed_commit_id}/train_weather.csv", index_col=0 ) -train_from_tag = pd.read_csv(f"lakefs://{REPO_NAME}/{tag}/train_weather.csv", index_col=0) +train_from_tag = pd.read_csv(f"lakefs://{REPO_NAME}/{tag.id}/train_weather.csv", index_col=0) # %% [markdown] """ diff --git a/pyproject.toml b/pyproject.toml index b81e12ff..ac98f7f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ classifiers = [ "Typing :: Typed", ] -dependencies = ["fsspec>=2023.6.0", "lakefs>=0.2.0", "wrapt"] +dependencies = ["fsspec>=2023.6.0", "lakefs>=0.2.0"] dynamic = ["version"] @@ -133,4 +133,7 @@ style = 'numpy' exclude = '\.git|venv' [tool.coverage.report] -exclude_also = ["@overload"] +exclude_also = [ + "@overload", + "raise NotImplementedError" +] diff --git a/requirements-dev.txt b/requirements-dev.txt index 1c9e58df..c8a5cd55 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -40,7 +40,6 @@ typing-extensions==4.9.0 tzdata==2023.4 urllib3==2.0.7 virtualenv==20.25.0 -wrapt==1.16.0 # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/requirements-docs.txt b/requirements-docs.txt index db964e4d..160d859d 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -152,7 +152,6 @@ webcolors==1.13 webencodings==0.5.1 websocket-client==1.7.0 widgetsnbextension==4.0.9 -wrapt==1.16.0 zipp==3.17.0 # The following packages are considered to be unsafe in a requirements file: diff --git a/src/lakefs_spec/spec.py b/src/lakefs_spec/spec.py index 82cae4fa..ed9e81d5 100644 --- a/src/lakefs_spec/spec.py +++ b/src/lakefs_spec/spec.py @@ -28,7 +28,7 @@ from lakefs_spec.transaction import LakeFSTransaction from lakefs_spec.util import md5_checksum, parse -logger = logging.getLogger(__name__) +logger = logging.getLogger("lakefs-spec") class LakeFSFileSystem(AbstractFileSystem): @@ -137,7 +137,7 @@ def _strip_protocol(cls, path): return spath @property - def transaction(self): + def transaction(self) -> LakeFSTransaction: """ A context manager within which file uploads and versioning operations are deferred to a queue, and carried out during when exiting the context. @@ -150,12 +150,10 @@ def transaction(self): return self._transaction def start_transaction(self): - """ - Prepare a lakeFS file system transaction without entering the transaction context yet. - """ - self._intrans = True - self._transaction = LakeFSTransaction(self) - return self.transaction + raise NotImplementedError( + "lakeFS transactions should only be used as a context manager via" + " `with LakeFSFileSystem.transaction as tx:`" + ) @contextmanager def wrapped_api_call( @@ -647,7 +645,7 @@ def open( ac = kwargs.pop("autocommit", not self._intrans) if not ac and "r" not in mode: - self.transaction.files.append(handler) + self._transaction.files.append(handler) return handler diff --git a/src/lakefs_spec/transaction.py b/src/lakefs_spec/transaction.py index 6645a607..392b0d16 100644 --- a/src/lakefs_spec/transaction.py +++ b/src/lakefs_spec/transaction.py @@ -5,17 +5,16 @@ from __future__ import annotations import logging +import random +import string from collections import deque -from dataclasses import dataclass -from functools import partial -from typing import TYPE_CHECKING, Callable, Generic, TypeVar +from typing import TYPE_CHECKING, Literal, TypeVar import lakefs -import wrapt -from fsspec.spec import AbstractBufferedFile from fsspec.transaction import Transaction from lakefs.branch import Branch, Reference from lakefs.client import Client +from lakefs.exceptions import ServerException from lakefs.object import ObjectWriter from lakefs.reference import Commit, ReferenceType from lakefs.repository import Repository @@ -23,37 +22,16 @@ T = TypeVar("T") -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) +logger = logging.getLogger("lakefs-spec") if TYPE_CHECKING: # pragma: no cover from lakefs_spec import LakeFSFileSystem - VersioningOpTuple = tuple[Callable[[Client], None], str | "Placeholder" | None] - -@dataclass -class Placeholder(Generic[T], wrapt.ObjectProxy): - """A generic placeholder for a value computed by the lakeFS server in a versioning operation during a transaction.""" - - def __init__(self, wrapped: T | None = None): - super().__init__(wrapped) - - @property - def available(self) -> bool: - """Whether the wrapped value is available, i.e. already computed.""" - return self.__wrapped__ is not None - - @property - def value(self): - if self.__wrapped__ is None: - raise RuntimeError("placeholder unfilled") - return self.__wrapped__ - - @value.setter - def value(self, val: T) -> None: - """Fill in the placeholder. Not meant to be called directly except in the completion of the transaction.""" - self.__wrapped__ = val +def _ensurebranch(b: str | Branch, repository: str, client: Client) -> Branch: + if isinstance(b, str): + return Branch(repository, b, client=client) + return b class LakeFSTransaction(Transaction): @@ -66,263 +44,219 @@ class LakeFSTransaction(Transaction): The lakeFS file system associated with the transaction. """ - def __init__(self, fs: "LakeFSFileSystem"): + def __init__( + self, + fs: "LakeFSFileSystem", + ): super().__init__(fs=fs) self.fs: "LakeFSFileSystem" - self.files: deque[AbstractBufferedFile | VersioningOpTuple] = deque(self.files) + self.files: deque[ObjectWriter] = deque(self.files) - def __enter__(self): - self.fs._intrans = True - return self + self.repository: str | None = None + self.base_branch: Branch | None = None + self.automerge: bool = False + self.delete: Literal["onsuccess", "always", "never"] = "onsuccess" + self._ephemeral_branch: Branch | None = None - def commit( + def __call__( self, repository: str | Repository, - branch: str | Branch, - message: str, - metadata: dict[str, str] | None = None, - ) -> Placeholder[Reference]: + base_branch: str | Branch = "main", + branch_name: str | None = None, + automerge: bool = True, + delete: Literal["onsuccess", "always", "never"] = "onsuccess", + ) -> "LakeFSTransaction": """ - Create a commit on a branch in a repository with a commit message and attached metadata. + Creates an ephemeral branch, conducts all uploads and operations on that branch, + and optionally merges it back into the source branch. - Parameters - ---------- repository: str | Repository - The repository to create the commit in. - branch: str | Branch - The name of the branch to commit on. - message: str - The commit message to attach to the newly created commit. - metadata: dict[str, str] | None - Optional metadata to enrich the created commit with (author, e-mail, etc.). - - Returns - ------- - Placeholder[Reference] - A placeholder for the commit created by the dispatched ``commit`` API call. + The repository in which to conduct the transaction. + base_branch: str | Branch + The branch on which the transaction operations should be based. + automerge: bool + Automatically merge the ephemeral branch into the base branch after successful + transaction completion. + delete: Literal["onsuccess", "always", "never"] + Cleanup policy / deletion handling for the ephemeral branch after the transaction. + + If ``"onsuccess"``, the branch is deleted if the transaction succeeded, + or left over if an error occurred. + + If ``"always"``, the ephemeral branch is always deleted after transaction regardless of success + or failure. + + If ``"never"``, the transaction branch is always left in the repository. """ - # bind all arguments to the client helper function, and then add it to the file-/callstack. - - def commit_op( - client: Client, - repo_: str | Repository, - ref_: str | Branch, - message_: str, - metadata_: dict[str, str] | None, - ) -> Reference: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - ref_id = ref_.id if isinstance(ref_, Branch) else ref_ - b = lakefs.Branch(repo_id, ref_id, client=client) - - diff = list(b.uncommitted()) - - if not diff: - logger.warning(f"No changes to commit on branch {branch!r}.") - return b.head - return b.commit(message_, metadata_) - - op = partial(commit_op, repo_=repository, ref_=branch, message_=message, metadata_=metadata) - p: Placeholder[Reference] = Placeholder() - self.files.append((op, p)) - # return a placeholder for the reference. - return p - - def complete(self, commit: bool = True) -> None: - """ - Finish the transaction by unwinding the file/versioning op stack via - 1. Committing or discarding in case of a file, and - 2. Conducting versioning operations using the file system's client. + if isinstance(repository, str): + self.repository = repository + else: + self.repository = repository.id - No operations happen and all files are discarded if ``commit == False``, - which is the case, e.g., if an exception happens in the context manager. + repo = lakefs.Repository(self.repository, client=self.fs.client) + try: + _ = repo.metadata + except ServerException: + raise ValueError(f"repository {self.repository!r} does not exist") from None - Parameters - ---------- - commit: bool - Whether to conduct operations queued in the transaction. - """ + # base branch needs to be a lakefs.Branch, since it is being diffed + # with the ephemeral branch in __exit__. + self.base_branch = _ensurebranch(base_branch, self.repository, self.fs.client) + + self.automerge = automerge + self.delete = delete + + ephem_name = branch_name or "transaction-" + "".join(random.choices(string.digits, k=6)) # nosec: B311 + self._ephemeral_branch = Branch(self.repository, ephem_name, client=self.fs.client) + return self + + def __enter__(self): + logger.debug( + f"Creating ephemeral branch {self._ephemeral_branch.id!r} " + f"from branch {self.base_branch.id!r}." + ) + self._ephemeral_branch.create(self.base_branch, exist_ok=False) + self.fs._intrans = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + success = exc_type is None while self.files: # fsspec base class calls `append` on the file, which means we # have to pop from the left to preserve order. f = self.files.popleft() - if isinstance(f, ObjectWriter): - if not commit: - f.discard() - else: - # client helper + return value case. - op, retval = f - if commit: - result = op(self.fs.client) - # if the transaction member returns a placeholder, - # fill it with the result of the client helper. - if isinstance(retval, Placeholder): - retval.value = result - - def create_branch( - self, repository: str | Repository, name: str, source: str | Branch, exist_ok: bool = True - ) -> str: + if not success: + f.discard() + + self.fs._intrans = False + self.fs._transaction = None + + if success and self.automerge: + if any(self.base_branch.diff(self._ephemeral_branch)): + self._ephemeral_branch.merge_into(self.base_branch) + if self.delete == "always" or (success and self.delete == "onsuccess"): + self._ephemeral_branch.delete() + + @property + def branch(self): + return self._ephemeral_branch + + def commit(self, message: str, metadata: dict[str, str] | None = None) -> Reference: """ - Create a branch ``name`` in a repository, branching off ``source_branch``. + Create a commit on this transaction's ephemeral branch with a commit message + and attached metadata. Parameters ---------- - repository: str | Repository - Repository name. - name: str - Name of the branch to be created. - source: str | Branch - Name of the branch (or branch object) that the new branch is created from. - exist_ok: bool - Ignore creation errors if the branch already exists. + message: str + The commit message to attach to the newly created commit. + metadata: dict[str, str] | None + Optional metadata to enrich the created commit with (author, e-mail, ...). Returns ------- - str - The requested branch name. + Reference + The created commit. """ - def create_branch_op( - client: Client, - repo_: str | Repository, - branch_: str, - source_: str | Branch, - exist_ok_: bool, - ) -> Branch: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - return lakefs.Branch(repo_id, branch_, client=client).create( - source_, exist_ok=exist_ok_ - ) - - op = partial( - create_branch_op, - repo_=repository, - branch_=name, - source_=source, - exist_ok_=exist_ok, - ) - self.files.append((op, name)) - return name + diff = list(self.branch.uncommitted()) + + if not diff: + logger.warning(f"No changes to commit on branch {self.branch.id!r}.") + return self.branch.head - def merge( - self, repository: str | Repository, source_ref: str | Branch, into: str | Branch - ) -> None: + return self.branch.commit(message, metadata=metadata) + + def merge(self, source_ref: str | Branch, into: str | Branch) -> Commit: """ Merge a branch into another branch in a repository. + In case the branch contains no changes relevant to the target branch, + no merge happens, and the tip of the target branch is returned instead. + Parameters ---------- - repository: str | Repository - Name of the repository. source_ref: str | Branch - Source reference containing the changes to merge. Can be a branch name or partial commit SHA. + Source reference containing the changes to merge. + Can be a branch name or partial commit SHA. into: str | Branch Target branch into which the changes will be merged. - """ - def merge_op( - client: Client, repo_: str | Branch, ref_: str | Branch, into_: str | Branch - ) -> None: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - ref_id = ref_.id if isinstance(ref_, Branch) else ref_ - lakefs.Branch(repo_id, ref_id, client=client).merge_into(into_) + Returns + ------- + Commit + Either the created merge commit, or the head commit of the target branch. + """ + source = _ensurebranch(source_ref, self.repository, self.fs.client) + dest = _ensurebranch(into, self.repository, self.fs.client) - op = partial(merge_op, repo_=repository, ref_=source_ref, into_=into) - self.files.append((op, None)) - return None + if any(dest.diff(source)): + source.merge_into(dest) + return dest.head.get_commit() - def revert( - self, repository: str | Repository, branch: str | Branch, parent_number: int = 1 - ) -> None: + def revert(self, branch: str | Branch, ref: ReferenceType, parent_number: int = 1) -> Commit: """ Revert a previous commit on a branch. Parameters ---------- - repository: str | Repository - Name of the repository. branch: str | Branch Branch on which the commit should be reverted. + ref: ReferenceType + The reference to revert. parent_number: int - If there are multiple parents to a commit, specify to which parent the commit should be reverted. - ``parent_number = 1`` (the default) refers to the first parent commit of the current ``branch`` tip. + If there are multiple parents to a commit, specify to which parent + the commit should be reverted. ``parent_number = 1`` (the default) + refers to the first parent commit of the current ``branch`` tip. + + Returns + ------- + Commit + The created revert commit. """ - def revert_op( - client: Client, repo_: str | Repository, branch_: str | Branch, parent_: int - ) -> None: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - branch_id = branch_.id if isinstance(branch_, Branch) else branch_ - lakefs.Branch(repo_id, branch_id, client=client).revert(branch_id, parent_) + b = _ensurebranch(branch, self.repository, self.fs.client) - op = partial(revert_op, repo_=repository, branch_=branch, parent_=parent_number) - self.files.append((op, None)) - return None + ref_id = ref if isinstance(ref, str) else ref.id + b.revert(ref_id, parent_number=parent_number) + return b.head.get_commit() - def rev_parse( - self, repository: str | Repository, ref: ReferenceType, parent: int = 0 - ) -> Placeholder[Commit]: + def rev_parse(self, ref: ReferenceType) -> Commit: """ - Parse a given reference or any of its parents in a repository. + Parse a given lakeFS reference expression and obtain its corresponding commit. Parameters ---------- - repository: str | Repository - Name of the repository. ref: ReferenceType Reference object to resolve, can be a branch, commit SHA, or tag. - parent: int - Optionally parse a parent of ``ref`` instead of ``ref`` itself as indicated by the number. - Must be non-negative. ``parent = 0`` (the default) refers to ``ref`` itself. Returns ------- - Placeholder[Commit] - A placeholder for the commit created by the dispatched ``rev_parse`` operation. + Commit + The commit referenced by the expression ``ref``. """ - def rev_parse_op( - client: Client, repo_: str | Repository, ref_: ReferenceType, parent_: int - ) -> Commit: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - ref_id = ref_.id if isinstance(ref_, Reference) else ref_ - - commits = list(lakefs.Reference(repo_id, ref_id, client=client).log(parent_ + 1)) - if len(commits) <= parent: - raise ValueError( - f"unable to fetch revision {ref_id}~{parent_}: " - f"ref {ref_id!r} only has {len(commits)} parents" - ) - return commits[parent_] - - p: Placeholder[Commit] = Placeholder() - op = partial(rev_parse_op, repo_=repository, ref_=ref, parent_=parent) - self.files.append((op, p)) - return p - - def tag(self, repository: str | Repository, ref: ReferenceType, tag: str) -> str: + ref_id = ref.id if isinstance(ref, Reference) else ref + reference = lakefs.Reference(self.repository, ref_id, client=self.fs.client) + return reference.get_commit() + + def tag(self, ref: ReferenceType, name: str) -> Tag: """ Create a tag referencing a commit in a repository. Parameters ---------- - repository: str | Repository - Name of the repository. ref: ReferenceType - Commit SHA or placeholder for a reference or commit object to which the new tag will point. - tag: str + Commit SHA or placeholder for a reference or commit object + to which the new tag will point. + name: str Name of the tag to be created. Returns ------- - str - The name of the requested tag. + Tag + The requested tag. """ - def tag_op(client: Client, repo_: str | Repository, ref_: ReferenceType, tag_: str) -> Tag: - repo_id = repo_.id if isinstance(repo_, Repository) else repo_ - ref_id = ref_.id if isinstance(ref_, Commit) else ref_ - return lakefs.Tag(repo_id, tag_, client=client).create(ref_id) - - self.files.append((partial(tag_op, repo_=repository, ref_=ref, tag_=tag), tag)) - return tag + return lakefs.Tag(self.repository, name, client=self.fs.client).create(ref) diff --git a/tests/test_put_file.py b/tests/test_put_file.py index 1cc0dd2b..575b1750 100644 --- a/tests/test_put_file.py +++ b/tests/test_put_file.py @@ -18,24 +18,24 @@ def test_no_change_postcommit( random_file = random_file_factory.make() lpath = str(random_file) - rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}" + message = f"Add file {random_file.name}" - with fs.transaction as tx: - fs.put(lpath, rpath, precheck=False, autocommit=False) - tx.commit(repository, temp_branch, message=f"Add file {random_file.name}") + with fs.transaction(repository, temp_branch) as tx: + fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + tx.commit(message=message) - commits = list(temp_branch.log()) - latest_commit = commits[0] # commit log is ordered branch-tip-first - assert latest_commit.message == f"Add file {random_file.name}" + commits = list(temp_branch.log(max_amount=2)) + current_head = temp_branch.head.get_commit() + assert commits[0].message.startswith("Merge") + assert commits[1].message == message # put the same file again, this time the diff is empty - with fs.transaction as tx: - fs.put(lpath, rpath, precheck=False, autocommit=False) - tx.commit(repository, temp_branch, message=f"Add file {random_file.name}") + with fs.transaction(repository, temp_branch) as tx: + fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}", precheck=False) + tx.commit(message=f"Add file {random_file.name}") # check that no other commit has happened. - commits = list(temp_branch.log()) - assert commits[0] == latest_commit + assert temp_branch.head.get_commit() == current_head def test_implicit_branch_creation( diff --git a/tests/test_rm.py b/tests/test_rm.py index 9349be37..84755425 100644 --- a/tests/test_rm.py +++ b/tests/test_rm.py @@ -15,22 +15,22 @@ def test_rm( assert not fs.exists(path) -def test_rm_with_postcommit( +def test_rm_with_transaction( fs: LakeFSFileSystem, repository: Repository, temp_branch: Branch, ) -> None: path = f"{repository.id}/{temp_branch.id}/README.md" - msg = "Remove file README.md" + message = "Remove file README.md" - with fs.transaction as tx: - fs.rm(path) - tx.commit(repository, temp_branch, message=msg) - assert not fs.exists(path) + with fs.transaction(repository, temp_branch, automerge=True) as tx: + fs.rm(f"{repository.id}/{tx.branch.id}/README.md") + tx.commit(message=message) - commits = list(temp_branch.log()) - latest_commit = commits[0] - assert latest_commit.message == msg + commits = list(temp_branch.log(max_amount=2)) + assert not fs.exists(path) + assert commits[-1].message == message + assert commits[0].message.startswith("Merge") def test_rm_recursive( diff --git a/tests/test_transactions.py b/tests/test_transactions.py index a58528af..3952d754 100644 --- a/tests/test_transactions.py +++ b/tests/test_transactions.py @@ -1,14 +1,11 @@ -from pathlib import Path from typing import Any import pytest from lakefs.branch import Branch -from lakefs.reference import Commit, Reference from lakefs.repository import Repository from lakefs_spec import LakeFSFileSystem -from lakefs_spec.transaction import Placeholder -from tests.util import RandomFileFactory, put_random_file_on_branch, with_counter +from tests.util import RandomFileFactory def test_transaction_commit( @@ -24,38 +21,32 @@ def test_transaction_commit( message = f"Add file {random_file.name}" - with fs.transaction as tx: - fs.put_file(lpath, rpath) + with fs.transaction(repository, temp_branch) as tx: + fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") assert len(tx.files) == 1 # sha is a placeholder for the actual SHA created on transaction completion. - sha = tx.commit(repository, temp_branch, message=message) - # stack contains the file to upload, and the commit op. - assert len(tx.files) == 2 - assert not sha.available + sha = tx.commit(message=message) - assert sha.available - - commits = list(temp_branch.log()) - latest_commit = commits[0] - assert latest_commit.message == message - assert latest_commit.id == sha.id + # HEAD should be the merge commit. + head_tilde = list(temp_branch.log(max_amount=2))[-1] + assert head_tilde.message == message + assert head_tilde.id == sha.id def test_transaction_tag(fs: LakeFSFileSystem, repository: Repository) -> None: try: # tag gets created on exit of the context. - with fs.transaction as tx: - sha = tx.rev_parse(repository, "main") - tag = tx.tag(repository, ref=sha, tag="v2") - - assert sha.available + # in this test, initialize with the repo name. + with fs.transaction(repository.id) as tx: + sha = tx.rev_parse("main") + tag = tx.tag(sha, "v2") tags = list(repository.tags()) assert len(tags) > 0 - assert tags[0].id == tag + assert tags[0].id == tag.id assert tags[0].get_commit().id == sha.id finally: - repository.tag(tag).delete() + tag.delete() def test_transaction_merge( @@ -68,24 +59,23 @@ def test_transaction_merge( random_file = random_file_factory.make() with temporary_branch_context("transaction-merge-test") as new_branch: - resource = f"{repository.id}/{new_branch.id}/{random_file.name}" message = "Commit new file" - with fs.transaction as tx: - # stage a file on new_branch... - fs.put_file(str(random_file), resource) + with fs.transaction(repository, new_branch) as tx: + tbname = tx.branch.id + lpath = str(random_file) + # stage a file on the transaction branch... + fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") # ... commit it with the above message - tx.commit(repository, new_branch, message) + tx.commit(message=message) # ... and merge it into temp_branch. - tx.merge(repository, new_branch, into=temp_branch) + tx.merge(tx.branch, into=temp_branch) - # at last, verify temp_branch@HEAD is the merge commit. - commits = list(temp_branch.log()) - assert len(commits) > 2 - latest_commit = commits[0] - assert latest_commit.message == f"Merge {new_branch.id!r} into {temp_branch.id!r}" - second_latest_commit = commits[1] - assert second_latest_commit.message == message + head, head_tilde = list(temp_branch.log(max_amount=2)) + # HEAD should be the merge commit of the transaction branch. + assert head.message.startswith(f"Merge {tbname!r}") + # HEAD~ should be the commit message. + assert head_tilde.message == message def test_transaction_revert( @@ -97,37 +87,16 @@ def test_transaction_revert( random_file = random_file_factory.make() lpath = str(random_file) - rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}" - message = f"Add file {random_file.name}" - with fs.transaction as tx: - fs.put_file(lpath, rpath, autocommit=False) - tx.commit(repository, temp_branch, message=message) - tx.revert(repository, temp_branch) - - commits = list(temp_branch.log()) - assert len(commits) > 1 - latest_commit = commits[0] - assert latest_commit.message == f"Revert {temp_branch.id}" - - -def test_transaction_branch(fs: LakeFSFileSystem, repository: Repository) -> None: - branch = "new-hello" - - try: - with fs.transaction as tx: - tx.create_branch(repository, branch, source="main") - - assert branch in [b.id for b in list(repository.branches())] - finally: - repository.branch(branch).delete() - + with fs.transaction(repository, temp_branch, automerge=True) as tx: + fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + tx.commit(message=message) + revert_commit = tx.revert(temp_branch, temp_branch.head) -def test_transaction_entry(fs: LakeFSFileSystem) -> None: - fs.start_transaction() - assert fs._intrans - assert fs._transaction is not None + # first commit should be the merge commit + assert temp_branch.head.get_commit().message.startswith("Merge") + assert revert_commit.message.startswith("Revert") def test_transaction_failure( @@ -140,42 +109,40 @@ def test_transaction_failure( lpath = str(random_file) rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}" - message = f"Add file {random_file.name}" - fs.client, counter = with_counter(fs.client) try: - with fs.transaction as tx: - fs.put_file(lpath, rpath) - tx.commit(repository, temp_branch, message=message) + with fs.transaction(repository, temp_branch) as tx: + fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + tx.commit(message=message) raise RuntimeError("something went wrong") except RuntimeError: pass # assert that no commit happens because of the exception. - assert counter.count("commits_api.commit") == 0 + assert not fs.exists(rpath) -def test_placeholder_representations( - random_file_factory: RandomFileFactory, +def test_transaction_no_automerge( fs: LakeFSFileSystem, repository: Repository, temp_branch: Branch, ) -> None: - with fs.transaction as tx: - rpath = put_random_file_on_branch(random_file_factory, fs, repository, temp_branch) - message = f"Add file {Path(rpath).name}" - sha = tx.commit(repository, temp_branch, message=message) + currhead = temp_branch.head.get_commit() - assert isinstance(sha, Reference) - commits = list(temp_branch.log()) - latest_commit = commits[0] - assert sha.id == latest_commit.id - assert repr(sha.id) == repr(latest_commit.id) + with fs.transaction(repository, temp_branch, automerge=False, delete="never") as tx: + transaction_branch = tx.branch + try: + # assert no merge commit is created on temp_branch. + assert currhead == next(temp_branch.log()) + # assert the transaction branch still exists. + assert transaction_branch.id in [b.id for b in repository.branches()] + finally: + transaction_branch.delete() -def test_unfilled_placeholder_error() -> None: - p: Placeholder[Commit] = Placeholder() - with pytest.raises(RuntimeError): - _ = p.value +def test_transaction_bad_repo(fs: LakeFSFileSystem) -> None: + with pytest.raises(ValueError, match="repository .* does not exist"): + with fs.transaction(repository="REEEE"): + pass