Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46776: Add zip creation and ingest #1105

Merged
merged 39 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6a4a62b
Add Butler.retrieve_artifacts_zip()
timj Oct 11, 2024
96d86d4
Trust that the datastore knows an artifact exists without checking
timj Oct 14, 2024
2489f41
Write index file with retrieveArtifacts and use it for Zip
timj Oct 15, 2024
cf25c13
Add --zip command-line option to butler retrieve-artifacts
timj Oct 16, 2024
81c827d
Change the internal representation of ZipIndex to remove duplication
timj Oct 16, 2024
fafce2b
Add method to create ZipIndex from zip file
timj Oct 16, 2024
0bafc7a
First attempt at butler ingest_zip
timj Oct 17, 2024
1d6ce51
Strip directory paths when building zip file.
timj Oct 18, 2024
642b90f
Add butler ingest-zip command
timj Oct 18, 2024
d19df2e
Do not remove Zip file until last dataset in Zip is removed
timj Oct 21, 2024
b078e7b
Fragments can be quoted so have to look for unquoted fragment in zip …
timj Oct 22, 2024
b165dab
Add minimal constraints checking for zip ingest
timj Oct 22, 2024
7fd3bcd
Add more diagnostics if JSON/YAML does not parse correctly
timj Oct 22, 2024
244c9b6
Add tests for ingest-zip
timj Oct 22, 2024
fd40aeb
Add support for missing datastore records in trust mode for retrieveA…
timj Oct 23, 2024
a53db14
Add UUID prefix to each file placed in Zip
timj Oct 23, 2024
05bc923
Attempt to prevent repeated copying of the same file for DECam/Zips
timj Oct 23, 2024
9ef21fe
Minor doc string fixes
timj Oct 25, 2024
24c4701
Minor tweaks from review
timj Oct 25, 2024
d06f36a
Rename Zip index file name
timj Oct 25, 2024
372d54c
Use partition rather than split
timj Oct 25, 2024
16afb61
Fix file datastore retrieve artifacts index for DECam data
timj Oct 25, 2024
8141248
Provide defaults for checksum and component
timj Oct 26, 2024
0e8af26
Reorganize index to only have one dict for path in zip
timj Oct 26, 2024
4576a80
Switch to sets just in case there are duplicates
timj Oct 26, 2024
c8cb406
Remove a warning in test code and report warnings from caller
timj Oct 26, 2024
a57e725
Support zip unpacking in retrieve artifacts
timj Oct 28, 2024
97c2d29
Include version information in ZipIndex model
timj Oct 29, 2024
7db9443
Only serialize required dimensions for dataset type and data coordinate
timj Oct 29, 2024
f2cbd09
Ensure that zip unpacking also respects overwrite flag
timj Oct 29, 2024
516d714
Use a protocol for retrieve artifacts callback
timj Oct 29, 2024
0c75a82
Simplify Datastore.retrieveArtifacts return value
timj Oct 29, 2024
13b0f15
Support clobber/no-clobber when writing Zip archive
timj Oct 29, 2024
5082563
Raise rather than use assert
timj Oct 29, 2024
82cf784
Add tests for reading V1 ZipIndex JSON model
timj Oct 29, 2024
b9ee95b
Sort refs when retrieving artifacts
timj Oct 29, 2024
bb9bab4
Enable compression of the index in the zip file
timj Oct 29, 2024
8b8c250
Make SerializedDatasetRefContainerV1 public
timj Oct 29, 2024
c510c7f
Add news fragment
timj Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/changes/DM-46776.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
* Added ``Butler.retrieve_artifacts_zip`` and ``QuantumBackedButler.retrieve_artifacts_zip`` methods to retrieve the dataset artifacts and store them into a zip file.
* Added ``Butler.ingest_zip`` to ingest the contents of a Zip file.
* Added ``SerializedDatasetRefContainerV1`` class to allow a collection of ``DatasetRef`` to be serialized efficiently.
JSON serializations made using this class will be supported.
* Added ``--zip`` parameter to ``butler retrieve-artifacts``.
* Changed ``Butler.retrieveArtifacts`` to always write a JSON index file describing where the artifacts came from.
* Added a ``butler ingest-zip`` command-line tool for ingesting zip files created by ``butler retrieve-artifacts``.
52 changes: 52 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,39 @@ def find_dataset(
"""
raise NotImplementedError()

@abstractmethod
def retrieve_artifacts_zip(
timj marked this conversation as resolved.
Show resolved Hide resolved
self,
refs: Iterable[DatasetRef],
destination: ResourcePathExpression,
overwrite: bool = True,
) -> ResourcePath:
"""Retrieve artifacts from a Butler and place in ZIP file.

Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to be included in the zip file.
destination : `lsst.resources.ResourcePathExpression`
Directory to write the new ZIP file. This directory will
also be used as a staging area for the datasets being downloaded
from the datastore.
overwrite : `bool`, optional
If `False` the output Zip will not be written if a file of the
same name is already present in ``destination``.

Returns
-------
zip_file : `lsst.resources.ResourcePath`
The path to the new ZIP file.

Raises
------
ValueError
Raised if there are no refs to retrieve.
"""
raise NotImplementedError()

@abstractmethod
def retrieveArtifacts(
self,
Expand Down Expand Up @@ -1202,6 +1235,25 @@ def ingest(
"""
raise NotImplementedError()

@abstractmethod
def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None:
"""Ingest a Zip file into this butler.

The Zip file must have been created by `retrieve_artifacts_zip`.

Parameters
----------
zip_file : `lsst.resources.ResourcePathExpression`
Path to the Zip file.
transfer : `str`, optional
Method to use to transfer the Zip into the datastore.

Notes
-----
Run collections are created as needed.
"""
raise NotImplementedError()

@abstractmethod
def export(
self,
Expand Down
176 changes: 174 additions & 2 deletions python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,26 @@
"DatasetIdGenEnum",
"DatasetRef",
"SerializedDatasetRef",
"SerializedDatasetRefContainerV1",
"SerializedDatasetRefContainers",
]

import enum
import logging
import sys
import uuid
from collections.abc import Iterable, Mapping
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable
from typing import (
TYPE_CHECKING,
Annotated,
Any,
ClassVar,
Literal,
Protocol,
Self,
TypeAlias,
runtime_checkable,
)

import pydantic
from lsst.utils.classes import immutable
Expand All @@ -50,7 +63,13 @@
from ._dataset_type import DatasetType, SerializedDatasetType
from ._named import NamedKeyDict
from .datastore.stored_file_info import StoredDatastoreItemInfo
from .dimensions import DataCoordinate, DimensionGroup, DimensionUniverse, SerializedDataCoordinate
from .dimensions import (
DataCoordinate,
DimensionGroup,
DimensionUniverse,
SerializedDataCoordinate,
SerializedDataId,
)
from .json import from_json_pydantic, to_json_pydantic
from .persistence_context import PersistenceContextVars

Expand All @@ -63,6 +82,9 @@
DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]


_LOG = logging.getLogger(__name__)


class AmbiguousDatasetError(Exception):
"""Raised when a `DatasetRef` is not resolved but should be.

Expand Down Expand Up @@ -864,3 +886,153 @@ class associated with the dataset type of the other ref can be

Cannot be changed after a `DatasetRef` is constructed.
"""


class MinimalistSerializableDatasetRef(pydantic.BaseModel):
"""Minimal information needed to define a DatasetRef.

The ID is not included and is presumed to be the key to a mapping
to this information.
"""

model_config = pydantic.ConfigDict(frozen=True)

dataset_type_name: str
"""Name of the dataset type."""

run: str
"""Name of the RUN collection."""

data_id: SerializedDataId
"""Data coordinate of this dataset."""


class SerializedDatasetRefContainer(pydantic.BaseModel):
"""Serializable model for a collection of DatasetRef.

Dimension records are not included.
"""

model_config = pydantic.ConfigDict(extra="allow", frozen=True)
container_version: str


class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
"""Serializable model for a collection of DatasetRef.

Dimension records are not included.
"""

container_version: Literal["V1"] = "V1"

universe_version: int
"""Dimension universe version."""

universe_namespace: str
"""Dimension universe namespace."""

dataset_types: dict[str, SerializedDatasetType]
"""Dataset types indexed by their name."""

compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef]
"""Minimal dataset ref information indexed by UUID."""

def __len__(self) -> int:
"""Return the number of datasets in the container."""
return len(self.compact_refs)

@classmethod
def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
"""Construct a serializable form from a list of `DatasetRef`.

Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to include in the container.
"""
# The serialized DatasetRef contains a lot of duplicated information.
# We also want to drop dimension records and assume that the records
# are already in the registry.
universe: DimensionUniverse | None = None
dataset_types: dict[str, SerializedDatasetType] = {}
compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {}
for ref in refs:
simple_ref = ref.to_simple()
dataset_type = simple_ref.datasetType
assert dataset_type is not None # For mypy
if universe is None:
universe = ref.datasetType.dimensions.universe
if (name := dataset_type.name) not in dataset_types:
dataset_types[name] = dataset_type
data_id = simple_ref.dataId
assert data_id is not None # For mypy
compact_refs[simple_ref.id] = MinimalistSerializableDatasetRef(
dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId
)
if universe:
universe_version = universe.version
universe_namespace = universe.namespace
else:
# No refs so no universe.
universe_version = 0
universe_namespace = "unknown"
return cls(
universe_version=universe_version,
universe_namespace=universe_namespace,
dataset_types=dataset_types,
compact_refs=compact_refs,
)

def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
"""Construct the original `DatasetRef`.

Parameters
----------
universe : `DimensionUniverse`
The universe to use when constructing the `DatasetRef`.

Returns
-------
refs : `list` [ `DatasetRef` ]
The `DatasetRef` that were serialized.
"""
if not self.compact_refs:
return []

if universe.namespace != self.universe_namespace:
raise RuntimeError(
f"Can not convert to refs in universe {universe.namespace} that were created from "
f"universe {self.universe_namespace}"
)

if universe.version != self.universe_version:
_LOG.warning(
"Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
"Serialized with version %d but asked to use version %d.",
self.universe_version,
universe.version,
)

# Reconstruct the DatasetType objects.
dataset_types = {
name: DatasetType.from_simple(dtype, universe=universe)
for name, dtype in self.dataset_types.items()
}
refs: list[DatasetRef] = []
for id_, minimal in self.compact_refs.items():
simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id)
data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
ref = DatasetRef(
id=id_,
run=minimal.run,
datasetType=dataset_types[minimal.dataset_type_name],
dataId=data_id,
)
refs.append(ref)
return refs


SerializedDatasetRefContainers: TypeAlias = Annotated[
SerializedDatasetRefContainerV1,
pydantic.Field(discriminator="container_version"),
]
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_dataset_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetType:
"name": self.name,
"storageClass": self._storageClassName,
"isCalibration": self._isCalibration,
"dimensions": list(self._dimensions.names),
"dimensions": list(self._dimensions.required),
}

if self._parentStorageClassName is not None:
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ def read(
# direct read from URI option and the contents of the Zip file must
# be extracted.
uri = self.file_descriptor.location.uri
if uri.fragment and uri.fragment.startswith("zip-path="):
_, path_in_zip = uri.fragment.split("=")
if uri.fragment and uri.unquoted_fragment.startswith("zip-path="):
_, _, path_in_zip = uri.unquoted_fragment.partition("=")

# Open the Zip file using ResourcePath.
with uri.open("rb") as fd:
Expand Down
35 changes: 34 additions & 1 deletion python/lsst/daf/butler/_quantum_backed.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from typing import TYPE_CHECKING, Any

import pydantic
from lsst.resources import ResourcePathExpression
from lsst.resources import ResourcePath, ResourcePathExpression

from ._butler_config import ButlerConfig
from ._config import Config
Expand All @@ -51,6 +51,7 @@
from ._storage_class import StorageClass, StorageClassFactory
from .datastore import Datastore
from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
from .datastores.file_datastore.retrieve_artifacts import retrieve_and_zip
from .dimensions import DimensionUniverse
from .registry.bridge.monolithic import MonolithicDatastoreRegistryBridgeManager
from .registry.databases.sqlite import SqliteDatabase
Expand Down Expand Up @@ -498,6 +499,38 @@ def pruneDatasets(
# Point of no return for removing artifacts
self._datastore.emptyTrash()

def retrieve_artifacts_zip(
self,
refs: Iterable[DatasetRef],
destination: ResourcePathExpression,
overwrite: bool = True,
) -> ResourcePath:
"""Retrieve artifacts from the graph and place in ZIP file.

Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to be included in the zip file.
destination : `lsst.resources.ResourcePathExpression`
Directory to write the new ZIP file. This directory will
also be used as a staging area for the datasets being downloaded
from the datastore.
overwrite : `bool`, optional
If `False` the output Zip will not be written if a file of the
same name is already present in ``destination``.

Returns
-------
zip_file : `lsst.resources.ResourcePath`
The path to the new ZIP file.

Raises
------
ValueError
Raised if there are no refs to retrieve.
"""
return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite)

def extract_provenance_data(self) -> QuantumProvenanceData:
"""Extract provenance information and datastore records from this
butler.
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"config_validate",
"export_calibs",
"ingest_files",
"ingest_zip",
"prune_datasets",
"query_collections",
"query_data_ids",
Expand All @@ -61,6 +62,7 @@
create,
export_calibs,
ingest_files,
ingest_zip,
prune_datasets,
query_collections,
query_data_ids,
Expand Down
Loading
Loading