Skip to content

Commit

Permalink
Allow inserts of records to use ensure or replace rather than insert
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Aug 16, 2023
1 parent 671c2ef commit e2fe6a0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
33 changes: 28 additions & 5 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,20 @@ def _delete_artifact(self, location: Location) -> None:
raise
log.debug("Successfully deleted file: %s", location.uri)

def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredFileInfo]) -> None:
def addStoredItemInfo(
self, refs: Iterable[DatasetRef], infos: Iterable[StoredFileInfo], insert_mode="insert"
) -> None:
# Docstring inherited from GenericBaseDatastore
records = [info.rebase(ref).to_record() for ref, info in zip(refs, infos, strict=True)]
self._table.insert(*records, transaction=self._transaction)
match insert_mode:
case "insert":
self._table.insert(*records, transaction=self._transaction)
case "ensure":
self._table.ensure(*records, transaction=self._transaction)

Check warning on line 377 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L377

Added line #L377 was not covered by tests
case "replace":
self._table.replace(*records, transaction=self._transaction)
case _:
raise ValueError(f"Unknown insert mode of '{insert_mode}'")

Check warning on line 381 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L380-L381

Added lines #L380 - L381 were not covered by tests

def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
# Docstring inherited from GenericBaseDatastore
Expand Down Expand Up @@ -1024,6 +1034,7 @@ def _finishIngest(
record_validation_info: bool = True,
) -> None:
# Docstring inherited from Datastore._finishIngest.
uses_uuid_v5 = True
refsAndInfos = []
progress = Progress("lsst.daf.butler.datastores.FileDatastore.ingest", level=logging.DEBUG)
for dataset in progress.wrap(prepData.datasets, desc="Ingesting dataset files"):
Expand All @@ -1036,7 +1047,16 @@ def _finishIngest(
record_validation_info=record_validation_info,
)
refsAndInfos.extend([(ref, info) for ref in dataset.refs])
self._register_datasets(refsAndInfos)
for ref in dataset.refs:
if ref.id.version != 5:
uses_uuid_v5 = False

insert_mode = "insert"
if uses_uuid_v5 and transfer == "direct":
# Datasets are immutable, external and use well-defined UUID.
# Re-ingest is allowed (use most recent information).
insert_mode = "replace"

Check warning on line 1058 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L1058

Added line #L1058 was not covered by tests
self._register_datasets(refsAndInfos, insert_mode=insert_mode)

def _calculate_ingested_datastore_name(
self,
Expand Down Expand Up @@ -2305,7 +2325,7 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref)
artifacts.append((ref, storedInfo))

self._register_datasets(artifacts)
self._register_datasets(artifacts, insert_mode="insert")

@transactional
def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None:
Expand Down Expand Up @@ -2729,7 +2749,10 @@ def transfer_from(
"" if len(direct_transfers) == 1 else "s",
)

self._register_datasets(artifacts)
# We are overwriting previous datasets that may have already
# existed. We therefore should ensure that we force the
# datastore records to agree.
self._register_datasets(artifacts, insert_mode="replace")

if already_present:
n_skipped = len(already_present)
Expand Down
26 changes: 22 additions & 4 deletions python/lsst/daf/butler/datastores/genericDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def bridge(self) -> DatastoreRegistryBridge:
raise NotImplementedError()

@abstractmethod
def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) -> None:
def addStoredItemInfo(
self, refs: Iterable[DatasetRef], infos: Iterable[Any], insert_mode: str = "insert"
) -> None:
"""Record internal storage information associated with one or more
datasets.
Expand All @@ -64,6 +66,11 @@ def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) ->
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `str`, optional
Mode to use to insert the new records into the table. The
options are "insert" (error if pre-existing), "replace" (replace
content with new values), and "ensure" (skip if the row already
exists).
"""
raise NotImplementedError()

Expand Down Expand Up @@ -98,7 +105,9 @@ def removeStoredItemInfo(self, ref: DatasetRef) -> None:
"""
raise NotImplementedError()

def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]]) -> None:
def _register_datasets(
self, refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]], insert_mode="insert"
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Expand All @@ -108,6 +117,10 @@ def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredData
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos = []
Expand All @@ -120,8 +133,13 @@ def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredData
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
self.bridge.insert(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos)
if insert_mode == "insert":
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _post_process_get(
self,
Expand Down
4 changes: 3 additions & 1 deletion python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ def bridge(self) -> DatastoreRegistryBridge:
# Docstring inherited from GenericBaseDatastore.
return self._bridge

def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredMemoryItemInfo]) -> None:
def addStoredItemInfo(
self, refs: Iterable[DatasetRef], infos: Iterable[StoredMemoryItemInfo], insert_mode: str = "insert"
) -> None:
# Docstring inherited from GenericBaseDatastore.
for ref, info in zip(refs, infos, strict=True):
self.records[ref.id] = info
Expand Down

0 comments on commit e2fe6a0

Please sign in to comment.