Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOMA ingestion integration #589

Merged
merged 7 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/tiledb/cloud/soma/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .ingest import build_ingest_workflow_graph
from .ingest import ingest
from .ingest import ingest_h5ad
from .ingest import run_ingest_workflow
from .mapper import build_collection_mapper_workflow_graph
from .mapper import run_collection_mapper_workflow

__all__ = [
"build_ingest_workflow_graph",
"ingest",
"ingest_h5ad",
"run_ingest_workflow",
"build_collection_mapper_workflow_graph",
"run_collection_mapper_workflow",
Expand Down
240 changes: 106 additions & 134 deletions src/tiledb/cloud/soma/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,138 +7,14 @@
import tiledb
from tiledb.cloud import dag
from tiledb.cloud._common import functions
from tiledb.cloud.utilities import as_batch
from tiledb.cloud.utilities import get_logger_wrapper
from tiledb.cloud.utilities import run_dag

_DEFAULT_RESOURCES = {"cpu": "8", "memory": "8Gi"}
"""Default resource size; equivalent to a "large" UDF container."""


def run_ingest_workflow(
*,
output_uri: str,
input_uri: str,
measurement_name: str,
pattern: Optional[str] = None,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
) -> Dict[str, str]:
"""Starts a workflow to ingest H5AD data into SOMA.

:param output_uri: The output URI to write to. This will probably look like
``tiledb://namespace/some://storage/uri``.
:param input_uri: The URI of the H5AD file(s) to read from. These are read
using TileDB VFS, so any path supported (and accessible) will work. If the
``input_uri`` passes ``vfs.is_file``, it's ingested. If the ``input_uri``
passes ``vfs.is_dir``, then all first-level entries are ingested . In the
latter, directory case, an input file is skipped if ``pattern`` is provided
and doesn't match the input file. As well, in the directory case, each entry's
basename is appended to the ``output_uri`` to form the entry's output URI.
For example, if ``a.h5ad` and ``b.h5ad`` are present within ``input_uri`` of
``s3://bucket/h5ads/`` and ``output_uri`` is
``tiledb://namespace/s3://bucket/somas``, then
``tiledb://namespace/s3://bucket/somas/a`` and
``tiledb://namespace/s3://bucket/somas/b`` are written.
:param measurement_name: The name of the Measurement within the Experiment
to store the data.
:param pattern: As described for ``input_uri``.
:param extra_tiledb_config: Extra configuration for TileDB.
:param platform_config: The SOMA ``platform_config`` value to pass in,
if any.
:param ingest_mode: One of the ingest modes supported by
``tiledbsoma.io.read_h5ad``.
:param resources: A specification for the amount of resources to provide
to the UDF executing the ingestion process, to override the default.
:param namespace: An alternate namespace to run the ingestion process under.
:param access_credentials_name: If provided, the name of the credentials
to pass to the executing UDF.
:param dry_run: If provided and set to ``True``, does the input-path
traversals without ingesting data.
:return: A dictionary of ``{"status": "started", "graph_id": ...}``,
with the UUID of the graph on the server side, which can be used to
manage execution and monitor progress.
"""

grf = build_ingest_workflow_graph(
output_uri=output_uri,
input_uri=input_uri,
measurement_name=measurement_name,
pattern=pattern,
extra_tiledb_config=extra_tiledb_config,
platform_config=platform_config,
ingest_mode=ingest_mode,
resources=resources,
namespace=namespace,
access_credentials_name=access_credentials_name,
logging_level=logging_level,
dry_run=dry_run,
)
grf.compute()
# On discussion with the cloud team:
# * In batch mode this does add call latency beyond grf.server_graph_uuid
# * However, the cloud UI cannot populate a DAG candelabra without us doing so.
# This is a necessary choice.
the_node = next(iter(grf.nodes.values()))
real_graph_uuid = the_node.result()
return {
"status": "started",
"graph_id": str(real_graph_uuid),
}


def build_ingest_workflow_graph(
*,
output_uri: str,
input_uri: str,
measurement_name: str,
pattern: Optional[str] = None,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
) -> dag.DAG:
"""
Same signature as ``run_ingest_workflow``, but returns the graph object
directly.
"""

grf = dag.DAG(
name="ingest-h5ad-launcher",
mode=dag.Mode.BATCH,
namespace=namespace,
)
grf.submit(
_run_ingest_workflow_udf_byval,
output_uri=output_uri,
input_uri=input_uri,
measurement_name=measurement_name,
pattern=pattern,
extra_tiledb_config=extra_tiledb_config,
platform_config=platform_config,
ingest_mode=ingest_mode,
resources=resources,
namespace=namespace,
access_credentials_name=access_credentials_name,
carry_along={
"resources": _DEFAULT_RESOURCES if resources is None else resources,
"namespace": namespace,
"access_credentials_name": access_credentials_name,
},
logging_level=logging_level,
dry_run=dry_run,
)
return grf


def run_ingest_workflow_udf(
*,
output_uri: str,
Expand Down Expand Up @@ -272,9 +148,9 @@ def ingest_h5ad(
output_uri: str,
input_uri: str,
measurement_name: str,
extra_tiledb_config: Optional[Dict[str, object]],
platform_config: Optional[Dict[str, object]],
ingest_mode: str,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
logging_level: int = logging.INFO,
dry_run: bool = False,
) -> None:
Expand Down Expand Up @@ -344,6 +220,7 @@ def __getattr__(self, name: str) -> object:

with _hack_patch_anndata_byval():
input_data = anndata.read_h5ad(_FSPathWrapper(input_file, input_uri), "r")

output_uri = io.from_anndata(
experiment_uri=output_uri,
anndata=input_data,
Expand All @@ -355,12 +232,107 @@ def __getattr__(self, name: str) -> object:
logging.info("Successfully wrote data from %s to %s", input_uri, output_uri)


# Until we fully get this version of tiledb.cloud deployed server-side, we must
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this makes it impossible for me to locally test -- unless as_batch does this for me -- ?

@JohnMoutafis please confirm ... 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnkerl as_batch will run a function as a batch UDF on TileDB Cloud.
Have you tried running either run_ingest_workflow or ingest from your local?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JohnMoutafis I need to be able to edit a function and run it and have the remote be running my changed code, not the latest tagged version.

I haven't yet tried locally cloning this PR of yours as an experiment. I'll do so, before approving this PR, to verify that editable execution hasn't been broken by this PR.

Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did pip install -v -e ., edited locally (adding a print statement), and ran an ingest -- and confirmed that I did not see my mods being picked up by the remote executor. (More details in Slack.)

# refer to all functions by value rather than by reference -- which is a fancy way
# of saying these functions _will not work at all_ until and unless they are
# checked into tiledb-cloud-py and deployed server-side. _All_ dev work _must_
# use this idiom.
def run_ingest_workflow(
*,
output_uri: str,
input_uri: str,
measurement_name: str,
pattern: Optional[str] = None,
extra_tiledb_config: Optional[Dict[str, object]] = None,
platform_config: Optional[Dict[str, object]] = None,
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
access_credentials_name: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
) -> Dict[str, str]:
"""Starts a workflow to ingest H5AD data into SOMA.

:param output_uri: The output URI to write to. This will probably look like
``tiledb://namespace/some://storage/uri``.
:param input_uri: The URI of the H5AD file(s) to read from. These are read
using TileDB VFS, so any path supported (and accessible) will work. If the
``input_uri`` passes ``vfs.is_file``, it's ingested. If the ``input_uri``
passes ``vfs.is_dir``, then all first-level entries are ingested . In the
latter, directory case, an input file is skipped if ``pattern`` is provided
and doesn't match the input file. As well, in the directory case, each entry's
basename is appended to the ``output_uri`` to form the entry's output URI.
For example, if ``a.h5ad` and ``b.h5ad`` are present within ``input_uri`` of
``s3://bucket/h5ads/`` and ``output_uri`` is
``tiledb://namespace/s3://bucket/somas``, then
``tiledb://namespace/s3://bucket/somas/a`` and
``tiledb://namespace/s3://bucket/somas/b`` are written.
:param measurement_name: The name of the Measurement within the Experiment
to store the data.
:param pattern: As described for ``input_uri``.
:param extra_tiledb_config: Extra configuration for TileDB.
:param platform_config: The SOMA ``platform_config`` value to pass in,
if any.
:param ingest_mode: One of the ingest modes supported by
``tiledbsoma.io.read_h5ad``.
:param resources: A specification for the amount of resources to provide
to the UDF executing the ingestion process, to override the default.
:param namespace: An alternate namespace to run the ingestion process under.
:param access_credentials_name: If provided, the name of the credentials
to pass to the executing UDF.
:param dry_run: If provided and set to ``True``, does the input-path
traversals without ingesting data.
:return: A dictionary of ``{"status": "started", "graph_id": ...}``,
with the UUID of the graph on the server side, which can be used to
manage execution and monitor progress.
"""

# Graph init
grf = dag.DAG(
name="ingest-h5ad-launcher",
namespace=namespace,
mode=dag.Mode.BATCH,
)

# Step 1: Ingest workflow UDF
grf.submit(
_run_ingest_workflow_udf_byval,
output_uri=output_uri,
input_uri=input_uri,
measurement_name=measurement_name,
pattern=pattern,
extra_tiledb_config=extra_tiledb_config,
platform_config=platform_config,
ingest_mode=ingest_mode,
resources=resources,
namespace=namespace,
access_credentials_name=access_credentials_name,
carry_along={
"resources": _DEFAULT_RESOURCES if resources is None else resources,
"namespace": namespace,
"access_credentials_name": access_credentials_name,
},
logging_level=logging_level,
dry_run=dry_run,
)

# Start the ingestion process
verbose = logging_level == logging.DEBUG
run_dag(grf, debug=verbose)

# Get the initial graph node UUID
the_node = next(iter(grf.nodes.values()))
real_graph_uuid = the_node.result()
return {
"status": "started",
"graph_id": str(real_graph_uuid),
}


# FIXME: Until we fully get this version of tiledb.cloud deployed server-side,
# we must refer to all functions by value rather than by reference
# -- which is a fancy way of saying these functions _will not work at all_ until
# and unless they are checked into tiledb-cloud-py and deployed server-side.
# _All_ dev work _must_ use this idiom.
_ingest_h5ad_byval = functions.to_register_by_value(ingest_h5ad)
_run_ingest_workflow_byval = functions.to_register_by_value(run_ingest_workflow)
_run_ingest_workflow_udf_byval = functions.to_register_by_value(run_ingest_workflow_udf)
_hack_patch_anndata_byval = functions.to_register_by_value(_hack_patch_anndata)

ingest = as_batch(_run_ingest_workflow_byval)
54 changes: 53 additions & 1 deletion tests/test_soma_mapper.py → tests/test_soma.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,63 @@
import logging
import sys
import unittest

import tiledb.cloud
import tiledb.cloud.soma


class SOMAMapperTest(unittest.TestCase):
class TestSOMAIngestion(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.test_file_path = "s3://tiledb-unittest/soma-ingestion-test/pbmc3k.h5ad"

(
cls.namespace,
cls.storage_path,
cls.acn,
) = tiledb.cloud.groups._default_ns_path_cred()
cls.namespace = cls.namespace.rstrip("/")
cls.storage_path = cls.storage_path.rstrip("/")
cls.array_name = tiledb.cloud._common.testonly.random_name("soma-test")
cls.destination = (
f"tiledb://{cls.namespace}/{cls.storage_path}/{cls.array_name}"
)

return super().setUpClass()

# TODO: Allow test to run when VFS access is enabled
@unittest.skip("Fails until unittest user obtains VFS access.")
def test_ingest_h5ad(self):
tiledb.cloud.soma.ingest_h5ad(
output_uri=self.destination,
input_uri=self.test_file_path,
measurement_name="RNA",
logging_level=logging.DEBUG,
)

array_uri = f"tiledb://{self.namespace}/{self.array_name}"
array_info = tiledb.cloud.array.info(array_uri)
self.assertEqual(array_info.name, self.array_name)
self.assertEqual(array_info.namespace, self.namespace)
tiledb.cloud.array.delete_array(array_uri)

# TODO: Allow test to run when VFS access is enabled
@unittest.skip("Fails until unittest user obtains VFS access.")
def test_ingest_h5ad_dry_run(self):
with self.assertLogs(level=logging.INFO) as lg:
tiledb.cloud.soma.ingest_h5ad(
output_uri=self.destination,
input_uri=self.test_file_path,
measurement_name="RNA",
logging_level=logging.DEBUG,
dry_run=True,
)
self.assertEqual(
f"Dry run for {self.test_file_path} to {self.destination}", lg.output[0]
)


class TestSOMAMapper(unittest.TestCase):
def __init__(self, foo):
super().__init__(foo)
self.maxDiff = None
Expand Down
Loading