diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 3c3b9178d..500f4d83b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -71,7 +71,7 @@ jobs: with: node-version: "20" - name: "Install Docker (MacOS X)" - uses: douglascamata/setup-docker-macos-action@v1-alpha.15 + uses: douglascamata/setup-docker-macos-action@v1-alpha.16 with: colima-additional-options: '--mount /private/var/folders:w --mount-type virtiofs' if: ${{ startsWith(matrix.on, 'macos-') }} @@ -249,7 +249,7 @@ jobs: with: node-version: "20" - name: "Install Docker (MacOs X)" - uses: douglascamata/setup-docker-macos-action@v1-alpha.15 + uses: douglascamata/setup-docker-macos-action@v1-alpha.16 with: colima-additional-options: '--mount /private/var/folders:w --mount-type virtiofs' if: ${{ startsWith(matrix.on, 'macos-') }} diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 4de438c1f..0190ca0cb 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1252,38 +1252,39 @@ def _process_input_value( _process_input_value(path_processor, output_directory, target, v) for v in value ] + elif isinstance( + value, (get_args(cwl_utils.parser.File), get_args(cwl_utils.parser.Directory)) + ): + if value.path: + value.path = _remap_path( + path_processor=path_processor, + path=value.path, + old_dir=output_directory, + new_dir=target.workdir, + ) + if value.location: + value.location = _remap_path( + path_processor=path_processor, + path=value.location, + old_dir=output_directory, + new_dir=target.workdir, + ) + if value.secondaryFiles: + value.secondaryFiles = [ + _process_input_value(path_processor, output_directory, target, sf) + for sf in value.secondaryFiles + ] + if isinstance(value, get_args(cwl_utils.parser.Directory)) and value.listing: + value.listing = [ + _process_input_value(path_processor, output_directory, target, sf) + for sf in value["listing"] + ] + return value elif isinstance(value, MutableMapping): - if utils.get_token_class(value) in ["File", "Directory"]: - if "location" in value: - value["location"] = _remap_path( - path_processor=path_processor, - path=value["location"], - old_dir=output_directory, - new_dir=target.workdir, - ) - if "path" in value: - value["path"] = _remap_path( - path_processor=path_processor, - path=value["path"], - old_dir=output_directory, - new_dir=target.workdir, - ) - if "secondaryFiles" in value: - value["secondaryFiles"] = [ - _process_input_value(path_processor, output_directory, target, sf) - for sf in value["secondaryFiles"] - ] - if "listing" in value: - value["listing"] = [ - _process_input_value(path_processor, output_directory, target, sf) - for sf in value["listing"] - ] - return value - else: - return { - k: _process_input_value(path_processor, output_directory, target, v) - for k, v in value.items() - } + return { + k: _process_input_value(path_processor, output_directory, target, v) + for k, v in value.items() + } else: return value diff --git a/tests/test_translator.py b/tests/test_translator.py index 1ba25399f..f88f71776 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -4,22 +4,34 @@ import tempfile from collections.abc import MutableMapping from pathlib import PurePosixPath -from typing import Any +from typing import Any, cast +import cwl_utils.parser import pytest from cwltool.tests.util import get_data from streamflow.config.config import WorkflowConfig from streamflow.config.validator import SfValidator from streamflow.core import utils +from streamflow.core.config import BindingConfig from streamflow.core.context import StreamFlowContext +from streamflow.core.deployment import Target from streamflow.cwl.runner import main +from streamflow.cwl.step import CWLInputInjectorStep, CWLTransferStep from streamflow.cwl.token import CWLFileToken from streamflow.cwl.translator import CWLTranslator from streamflow.cwl.workflow import CWLWorkflow +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_binding_config from streamflow.workflow.executor import StreamFlowExecutor +from streamflow.workflow.port import JobPort +from streamflow.workflow.step import DeployStep, ScheduleStep from streamflow.workflow.token import TerminationToken +from tests.utils.deployment import ( + get_docker_deployment_config, + get_location, + get_service, +) from tests.utils.workflow import CWL_VERSION @@ -30,8 +42,8 @@ def _create_file(content: MutableMapping[Any, Any]) -> str: return temp_config.name -def _get_workflow_config(): - streamflow_config = { +def _get_streamflow_config() -> MutableMapping[str, Any]: + return { "version": "v1.0", "workflows": { "test": { @@ -40,62 +52,12 @@ def _get_workflow_config(): "file": "cwl/main.cwl", "settings": "cwl/config.yaml", }, - "bindings": [ - { - "step": "/compute_1", - "target": [ - {"deployment": "wrapper_1"}, - { - "deployment": "wrapper_2", - "workdir": "/other/remote/workdir_2", - }, - {"deployment": "wrapper_3"}, - {"deployment": "wrapper_4"}, - ], - }, - { - "port": "/model", - "target": { - "deployment": "awesome", - "workdir": "/remote/workdir/models", - }, - }, - ], } }, - "deployments": { - "awesome": { - "type": "docker", - "config": {"image": "busybox"}, - }, - "handsome": { - "type": "docker", - "config": {"image": "busybox"}, - "workdir": "/remote/workdir", - }, - "wrapper_1": { - "type": "docker", - "config": {"image": "busybox"}, - "wraps": {"deployment": "handsome", "service": "boost"}, - }, - "wrapper_2": { - "type": "docker", - "config": {"image": "busybox"}, - "wraps": {"deployment": "handsome", "service": "boost"}, - "workdir": "/myremote/workdir", - }, - "wrapper_3": { - "type": "docker", - "config": {"image": "busybox"}, - "wraps": "wrapper_1", - }, - "wrapper_4": { - "type": "docker", - "config": {"image": "busybox"}, - "wraps": "awesome", - }, - }, } + + +def _get_workflow_config(streamflow_config) -> WorkflowConfig: SfValidator().validate(streamflow_config) return WorkflowConfig( list(streamflow_config["workflows"].keys())[0], streamflow_config @@ -105,23 +67,48 @@ def _get_workflow_config(): @pytest.mark.asyncio async def test_inject_remote_input(context: StreamFlowContext) -> None: """Test injection of remote input data through the port targets in the StreamFlow file""" - cwl_workflow_path = "/path/to/local/cwl/wf_description" - relative_data_path = "relative/path/to/data" - input_data = { - "class": "Directory", - "location": f"file://{cwl_workflow_path}/{relative_data_path}", + + # Create remote file + docker_config = get_docker_deployment_config() + location = await get_location(context, docker_config.type) + remote_workdir = await StreamFlowPath( + "home", context=context, location=location + ).resolve() + remote_path = remote_workdir / "data" + await remote_path.mkdir() + remote_path = remote_path / "file1.txt" + relative_path = os.path.relpath(remote_path, remote_workdir) + await remote_path.write_text("StreamFlow") + assert await remote_path.exists() + + # Create input data and call the `CWLTranslator` inject method + cwl_workflow_path = os.getcwd() + input_data = cwl_utils.parser.cwl_v1_2.File( + path=f"file://{cwl_workflow_path}/{relative_path}" + ) + port_name = "model" + streamflow_config = _get_streamflow_config() + streamflow_config["workflows"]["test"].setdefault("bindings", []).append( + { + "port": "/model", + "target": { + "deployment": docker_config.name, + "workdir": "/home", + }, + } + ) + streamflow_config.setdefault("deployments", {})[docker_config.name] = { + "type": docker_config.type, + "config": docker_config.config, } - workflow_config = _get_workflow_config() - remote_workdir = next( - iter(workflow_config.get(PurePosixPath("/model"), "port")["targets"]) - )["workdir"] + workflow_config = _get_workflow_config(streamflow_config) translator = CWLTranslator( context=context, name=utils.random_name(), output_directory=tempfile.gettempdir(), - cwl_definition=None, # cwltool.process.Process, - cwl_inputs={"model": input_data}, + cwl_definition=None, # CWL object + cwl_inputs={port_name: input_data}, cwl_inputs_path=None, workflow_config=workflow_config, ) @@ -133,47 +120,132 @@ async def test_inject_remote_input(context: StreamFlowContext) -> None: ) translator._inject_input( workflow=workflow, - port_name="model", - global_name="/model", + port_name=port_name, + global_name=f"/{port_name}", port=workflow.create_port(), output_directory=cwl_workflow_path, - value=translator.cwl_inputs["model"], + value=translator.cwl_inputs[port_name], + ) + + # Add a transfer step in the workflow + injector_schedule_step = next( + iter( + step + for step in workflow.steps.values() + if isinstance(step, ScheduleStep) and step.input_directory is not None + ) + ) + input_injector_step = next( + iter(s for s in workflow.steps.values() if isinstance(s, CWLInputInjectorStep)) + ) + docker_target = Target( + deployment=docker_config, + service=get_service(context, docker_config.type), + workdir=docker_config.workdir, + ) + binding_config = BindingConfig(targets=[docker_target]) + schedule_step = workflow.create_step( + cls=ScheduleStep, + name=posixpath.join(f"{port_name}", "__schedule__"), + job_prefix=f"{port_name}", + connector_ports={ + docker_config.name: next( + iter(s for s in workflow.steps.values() if isinstance(s, DeployStep)) + ).get_output_port() + }, + binding_config=binding_config, ) + transfer_step = workflow.create_step( + cls=CWLTransferStep, + name=posixpath.join(os.sep, port_name, "__transfer__", port_name), + job_port=schedule_step.get_output_port(), + ) + transfer_step.add_input_port(port_name, input_injector_step.get_output_port()) + transfer_step.add_output_port(port_name, workflow.create_port()) # Check input tokens - input_tokens = workflow.steps["/model-injector"].get_input_port("model").token_list - assert input_tokens[0].value == input_data + input_tokens = input_injector_step.get_input_port(port_name).token_list + assert ( + input_tokens[0].value["class"] == "File" + and input_tokens[0].value["path"] == input_data.path + ) assert isinstance(input_tokens[1], TerminationToken) # Execute injector steps executor = StreamFlowExecutor(workflow) await executor.run() + job = await cast( + JobPort, + injector_schedule_step.get_output_port("__job__"), + ).get_job(port_name) - # Check output tokens - output_tokens = ( - workflow.steps["/model-injector"].get_output_port("model").token_list - ) + # Check output tokens of input injector step + output_tokens = input_injector_step.get_output_port(port_name).token_list assert isinstance(output_tokens[0], CWLFileToken) assert isinstance(output_tokens[1], TerminationToken) - job = ( - workflow.steps["/model-injector/__schedule__"] - .get_output_port("__job__") - .token_list[0] - .value - ) - assert ( - len({job.input_directory, job.output_directory, job.tmp_directory}) == 1 - and job.input_directory == remote_workdir - ) - assert output_tokens[0].value["class"] == "Directory" - assert output_tokens[0].value["path"] == os.path.join( - remote_workdir, relative_data_path - ) + assert len( + {job.input_directory, job.output_directory, job.tmp_directory} + ) == 1 and job.input_directory == str(remote_workdir) + assert output_tokens[0].value["class"] == "File" + assert output_tokens[0].value["path"] == str(remote_workdir / relative_path) + + # Check output tokens of transfer step + output_tokens = transfer_step.get_output_port(port_name).token_list + assert output_tokens[0].value["checksum"] == f"sha1${await remote_path.checksum()}" def test_workdir_inheritance() -> None: """Test the workdir inheritance of deployments, wrapped deployments and targets""" - workflow_config = _get_workflow_config() + streamflow_config = _get_streamflow_config() + streamflow_config["workflows"]["test"].setdefault("bindings", []).append( + { + "step": "/compute_1", + "target": [ + {"deployment": "wrapper_1"}, + { + "deployment": "wrapper_2", + "workdir": "/other/remote/workdir_2", + }, + {"deployment": "wrapper_3"}, + {"deployment": "wrapper_4"}, + ], + }, + ) + streamflow_config.setdefault("deployments", {}) + streamflow_config["deployments"] |= { + "awesome": { + "type": "docker", + "config": {"image": "busybox"}, + }, + "handsome": { + "type": "docker", + "config": {"image": "busybox"}, + "workdir": "/remote/workdir", + }, + "wrapper_1": { + "type": "docker", + "config": {"image": "busybox"}, + "wraps": {"deployment": "handsome", "service": "boost"}, + }, + "wrapper_2": { + "type": "docker", + "config": {"image": "busybox"}, + "wraps": {"deployment": "handsome", "service": "boost"}, + "workdir": "/myremote/workdir", + }, + "wrapper_3": { + "type": "docker", + "config": {"image": "busybox"}, + "wraps": "wrapper_1", + }, + "wrapper_4": { + "type": "docker", + "config": {"image": "busybox"}, + "wraps": "awesome", + }, + } + + workflow_config = _get_workflow_config(streamflow_config) workdir_deployment_1 = workflow_config.deployments["handsome"]["workdir"] binding_config = get_binding_config("/compute_1", "step", workflow_config)