diff --git a/Makefile b/Makefile index 6130a0f2..5807f988 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ format-check: black --diff --check streamflow tests pyupgrade: - pyupgrade --py3-only --py38-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr) + pyupgrade --py3-only --py39-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr) test: python -m pytest -rs ${PYTEST_EXTRA} diff --git a/streamflow/core/config.py b/streamflow/core/config.py index 3f8d87bc..c9780de3 100644 --- a/streamflow/core/config.py +++ b/streamflow/core/config.py @@ -3,7 +3,7 @@ import asyncio import json import posixpath -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any, TYPE_CHECKING, cast from referencing import Registry, Resource diff --git a/streamflow/core/data.py b/streamflow/core/data.py index 8cca6050..72f0e4c1 100644 --- a/streamflow/core/data.py +++ b/streamflow/core/data.py @@ -111,11 +111,6 @@ async def transfer_data( ) -> None: ... -class FileType(Enum): - FILE = 1 - DIRECTORY = 2 - - class StreamWrapper(ABC): def __init__(self, stream: Any): self.stream: Any = stream diff --git a/streamflow/core/provenance.py b/streamflow/core/provenance.py index 4b24ba36..292bde98 100644 --- a/streamflow/core/provenance.py +++ b/streamflow/core/provenance.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import abstractmethod -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import TYPE_CHECKING from streamflow.core.persistence import DatabaseLoadingContext diff --git a/streamflow/core/scheduling.py b/streamflow/core/scheduling.py index 9a47e7d1..4455c78c 100644 --- a/streamflow/core/scheduling.py +++ b/streamflow/core/scheduling.py @@ -4,10 +4,10 @@ import os from abc import ABC, abstractmethod from collections.abc import ( - MutableSequence, - MutableMapping, - Iterable, Callable, + Iterable, + MutableMapping, + MutableSequence, MutableSet, ) from typing import TYPE_CHECKING, cast diff --git a/streamflow/core/utils.py b/streamflow/core/utils.py index 4f7ce055..e9370bd9 100644 --- a/streamflow/core/utils.py +++ b/streamflow/core/utils.py @@ -9,7 +9,7 @@ import posixpath import shlex import uuid -from collections.abc import MutableSequence, MutableMapping, Iterable +from collections.abc import Iterable, MutableMapping, MutableSequence from typing import Any, TYPE_CHECKING from streamflow.core.exception import WorkflowExecutionException @@ -242,19 +242,6 @@ async def get_remote_to_remote_write_command( return ["tar", "xf", "-", "-C", posixpath.dirname(dst)] -def get_size(path: str) -> int: - if os.path.isfile(path): - return os.path.getsize(path) if not os.path.islink(path) else 0 - else: - total_size = 0 - for dirpath, _, filenames in os.walk(path): - for f in filenames: - fp = os.path.join(dirpath, f) - if not os.path.islink(fp): - total_size += os.path.getsize(fp) - return total_size - - def get_tag(tokens: Iterable[Token]) -> str: output_tag = "0" for tag in [t.tag for t in tokens]: diff --git a/streamflow/core/workflow.py b/streamflow/core/workflow.py index 23104747..63a75f31 100644 --- a/streamflow/core/workflow.py +++ b/streamflow/core/workflow.py @@ -435,12 +435,15 @@ def retag(self, tag: str) -> Token: async def save(self, context: StreamFlowContext, port_id: int | None = None): async with self.persistence_lock: if not self.persistent_id: - self.persistent_id = await context.database.add_token( - port=port_id, - tag=self.tag, - type=type(self), - value=json.dumps(await self._save_value(context)), - ) + try: + self.persistent_id = await context.database.add_token( + port=port_id, + tag=self.tag, + type=type(self), + value=json.dumps(await self._save_value(context)), + ) + except TypeError as e: + raise WorkflowExecutionException from e def update(self, value: Any) -> Token: return self.__class__(tag=self.tag, value=value) diff --git a/streamflow/cwl/combinator.py b/streamflow/cwl/combinator.py index 57df1c50..8ceaf05f 100644 --- a/streamflow/cwl/combinator.py +++ b/streamflow/cwl/combinator.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import MutableSequence, MutableMapping, AsyncIterable +from collections.abc import AsyncIterable, MutableMapping, MutableSequence from typing import Any, cast from streamflow.core.context import StreamFlowContext diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 0df0a92e..ae0093d1 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -8,7 +8,7 @@ import shlex import time from asyncio.subprocess import STDOUT -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from decimal import Decimal from types import ModuleType from typing import Any, IO, cast @@ -56,7 +56,7 @@ ) from streamflow.cwl.step import build_token from streamflow.cwl.workflow import CWLWorkflow -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.connector import LocalConnector from streamflow.deployment.utils import get_path_processor from streamflow.log_handler import logger @@ -64,14 +64,16 @@ from streamflow.workflow.utils import get_token_value -def _adjust_cwl_output(base_path: str, path_processor: ModuleType, value: Any) -> Any: +def _adjust_cwl_output( + base_path: StreamFlowPath, path_processor: ModuleType, value: Any +) -> Any: if isinstance(value, MutableSequence): return [_adjust_cwl_output(base_path, path_processor, v) for v in value] elif isinstance(value, MutableMapping): if utils.get_token_class(value) in ["File", "Directory"]: path = utils.get_path_from_token(value) if not path_processor.isabs(path): - path = path_processor.join(base_path, path) + path = base_path / path value["path"] = path value["location"] = f"file://{path}" return value @@ -154,18 +156,25 @@ async def _check_cwl_output(job: Job, step: Step, result: Any) -> Any: connector = step.workflow.context.scheduler.get_connector(job.name) locations = step.workflow.context.scheduler.get_locations(job.name) path_processor = get_path_processor(connector) - cwl_output_path = path_processor.join(job.output_directory, "cwl.output.json") for location in locations: - if await remotepath.exists(connector, location, cwl_output_path): + cwl_output_path = StreamFlowPath( + job.output_directory, + "cwl.output.json", + context=step.workflow.context, + location=location, + ) + if await cwl_output_path.exists(): # If file exists, use its contents as token value - result = json.loads( - await remotepath.read(connector, location, cwl_output_path) - ) + result = json.loads(await cwl_output_path.read_text()) # Update step output ports at runtime if needed if isinstance(result, MutableMapping): for out_name, out in result.items(): out = _adjust_cwl_output( - base_path=job.output_directory, + base_path=StreamFlowPath( + job.output_directory, + context=step.workflow.context, + location=location, + ), path_processor=path_processor, value=out, ) @@ -290,9 +299,9 @@ async def _prepare_work_dir( dest_path: str | None = None, writable: bool = False, ) -> None: - streamflow_context = options.step.workflow.context - connector = streamflow_context.scheduler.get_connector(options.job.name) - locations = streamflow_context.scheduler.get_locations(options.job.name) + context = options.step.workflow.context + connector = context.scheduler.get_connector(options.job.name) + locations = context.scheduler.get_locations(options.job.name) path_processor = get_path_processor(connector) # Initialize base path to job output directory if present base_path = base_path or options.job.output_directory @@ -341,7 +350,7 @@ async def _prepare_work_dir( dest_path = dest_path or path_processor.join( base_path, path_processor.basename(src_path) ) - await streamflow_context.data_manager.transfer_data( + await context.data_manager.transfer_data( src_location=selected_location.location, src_path=selected_location.path, dst_locations=locations, @@ -366,17 +375,20 @@ async def _prepare_work_dir( await asyncio.gather( *( asyncio.create_task( - remotepath.mkdir(connector, location, dest_path) + StreamFlowPath( + dest_path, context=context, location=location + ).mkdir(mode=0o777, exist_ok=True) ) for location in locations ) ) else: await utils.write_remote_file( - context=streamflow_context, - job=options.job, + context=context, + locations=locations, content=(listing["contents"] if "contents" in listing else ""), path=dest_path, + relpath=path_processor.relpath(dest_path, base_path), ) # If `listing` is present, recursively process folder contents if "listing" in listing: @@ -385,7 +397,9 @@ async def _prepare_work_dir( await asyncio.gather( *( asyncio.create_task( - remotepath.mkdir(connector, location, folder_path) + StreamFlowPath( + folder_path, context=context, location=location + ).mkdir(mode=0o777, exist_ok=True) ) for location in locations ) @@ -467,10 +481,15 @@ async def _prepare_work_dir( # If entry is a string, a new text file must be created with the string as the file contents if isinstance(entry, str): await utils.write_remote_file( - context=streamflow_context, - job=options.job, + context=context, + locations=locations, content=entry, path=dest_path or base_path, + relpath=( + path_processor.relpath(dest_path, base_path) + if dest_path + else base_path + ), ) # If entry is a list elif isinstance(entry, MutableSequence): @@ -488,10 +507,15 @@ async def _prepare_work_dir( # Otherwise, the content should be serialised to JSON else: await utils.write_remote_file( - context=streamflow_context, - job=options.job, + context=context, + locations=locations, content=json.dumps(entry), path=dest_path or base_path, + relpath=( + path_processor.relpath(dest_path, base_path) + if dest_path + else base_path + ), ) # If entry is a dict elif isinstance(entry, MutableMapping): @@ -507,18 +531,28 @@ async def _prepare_work_dir( # Otherwise, the content should be serialised to JSON else: await utils.write_remote_file( - context=streamflow_context, - job=options.job, + context=context, + locations=locations, content=json.dumps(entry), path=dest_path or base_path, + relpath=( + path_processor.relpath(dest_path, base_path) + if dest_path + else base_path + ), ) # Every object different from a string should be serialised to JSON elif entry is not None: await utils.write_remote_file( - context=streamflow_context, - job=options.job, + context=context, + locations=locations, content=json.dumps(entry), path=dest_path or base_path, + relpath=( + path_processor.relpath(dest_path, base_path) + if dest_path + else base_path + ), ) diff --git a/streamflow/cwl/hardware.py b/streamflow/cwl/hardware.py index 9d8d7c0d..4ff91618 100644 --- a/streamflow/cwl/hardware.py +++ b/streamflow/cwl/hardware.py @@ -2,7 +2,7 @@ import math import os -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any, TYPE_CHECKING from streamflow.core.context import StreamFlowContext diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index adba57bd..0ede1ed9 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -3,7 +3,7 @@ import asyncio import json import logging -from collections.abc import MutableSequence, MutableMapping, Callable +from collections.abc import Callable, MutableMapping, MutableSequence from typing import Any, cast import cwl_utils.file_formats @@ -489,7 +489,6 @@ async def _process_command_output( command_output: CWLCommandOutput, connector: Connector | None, context: MutableMapping[str, Any], - output_directory: str, ): connector = self._get_connector(connector, job) locations = await self._get_locations(connector, job) @@ -532,7 +531,6 @@ async def _process_command_output( globpath if isinstance(globpath, MutableSequence) else [globpath] ) # Resolve glob - path_processor = get_path_processor(connector) resolve_tasks = [] for location in locations: for path in globpaths: @@ -557,11 +555,7 @@ async def _process_command_output( if self.target else job.tmp_directory ), - path=( - path_processor.join(output_directory, path) - if not path_processor.isabs(path) - else path - ), + path=cast(str, path), ) ) ) @@ -694,7 +688,10 @@ async def process( hardware=self.workflow.context.scheduler.get_hardware(job.name), ) token_value = await self._process_command_output( - job, command_output, connector, context, output_directory + job, + command_output, + connector, + context, ) if isinstance(token_value, MutableSequence): for value in token_value: diff --git a/streamflow/cwl/requirement/docker/docker.py b/streamflow/cwl/requirement/docker/docker.py index 461c78b6..5450d887 100644 --- a/streamflow/cwl/requirement/docker/docker.py +++ b/streamflow/cwl/requirement/docker/docker.py @@ -3,7 +3,6 @@ import os import posixpath from collections.abc import MutableSequence - from importlib.resources import files from streamflow.core import utils diff --git a/streamflow/cwl/requirement/docker/kubernetes.py b/streamflow/cwl/requirement/docker/kubernetes.py index 9ed7b8b5..8ddc03e8 100644 --- a/streamflow/cwl/requirement/docker/kubernetes.py +++ b/streamflow/cwl/requirement/docker/kubernetes.py @@ -1,8 +1,8 @@ from __future__ import annotations import tempfile - from importlib.resources import files + from jinja2 import Template from streamflow.core import utils diff --git a/streamflow/cwl/requirement/docker/singularity.py b/streamflow/cwl/requirement/docker/singularity.py index 658f896d..6d219360 100644 --- a/streamflow/cwl/requirement/docker/singularity.py +++ b/streamflow/cwl/requirement/docker/singularity.py @@ -3,7 +3,6 @@ import os import posixpath from collections.abc import MutableSequence - from importlib.resources import files from streamflow.core import utils diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 2e3071bc..3effbbab 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -5,7 +5,7 @@ import logging import urllib.parse from abc import ABC -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any, cast from streamflow.core.context import StreamFlowContext @@ -29,6 +29,7 @@ ) from streamflow.cwl.workflow import CWLWorkflow from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_path_processor from streamflow.log_handler import logger from streamflow.workflow.port import JobPort @@ -36,9 +37,9 @@ ConditionalStep, InputInjectorStep, LoopOutputStep, + ScheduleStep, TransferStep, _get_token_ids, - ScheduleStep, ) from streamflow.workflow.token import IterationTerminationToken, ListToken, ObjectToken @@ -146,16 +147,13 @@ async def build_token( async def _download_file(job: Job, url: str, context: StreamFlowContext) -> str: - connector = context.scheduler.get_connector(job.name) locations = context.scheduler.get_locations(job.name) try: filepaths = set( await asyncio.gather( *( asyncio.create_task( - remotepath.download( - connector, location, url, job.input_directory - ) + remotepath.download(context, location, url, job.input_directory) ) for location in locations ) @@ -512,11 +510,21 @@ async def _update_file_token( self, job: Job, token_value: MutableMapping[str, Any], - dest_path: str | None = None, + dest_path: StreamFlowPath | None = None, ) -> MutableMapping[str, Any]: token_class = utils.get_token_class(token_value) - # Get allocation and connector - connector = self.workflow.context.scheduler.get_connector(job.name) + # Get destination coordinates + dst_connector = self.workflow.context.scheduler.get_connector(job.name) + dst_locations = self.workflow.context.scheduler.get_locations(job.name) + indir = StreamFlowPath( + job.input_directory, + context=self.workflow.context, + location=next(iter(dst_locations)), + ) + if dest_path is not None and dest_path.is_relative_to(job.input_directory): + indir /= dest_path.relative_to(indir).parts[0] + else: + indir /= random_name() # Extract location location = token_value.get("location", token_value.get("path")) if location and "://" in location: @@ -528,15 +536,7 @@ async def _update_file_token( location = urllib.parse.unquote(location[7:]) # If basename is explicitly stated in the token, use it as destination path if "basename" in token_value: - path_processor = get_path_processor(connector) - dest_path = dest_path or path_processor.join( - job.input_directory, random_name() - ) - dest_path = path_processor.join(dest_path, token_value["basename"]) - # Get destination coordinates - dst_connector = self.workflow.context.scheduler.get_connector(job.name) - dst_locations = self.workflow.context.scheduler.get_locations(job.name) - path_processor = get_path_processor(dst_connector) + dest_path = (dest_path or indir) / token_value["basename"] # If source data exist, get source locations if location and ( selected_location := self.workflow.context.data_manager.get_source_location( @@ -544,30 +544,28 @@ async def _update_file_token( ) ): # Build destination path - filepath = dest_path or path_processor.join( - job.input_directory, utils.random_name(), selected_location.relpath - ) + filepath = dest_path or (indir / selected_location.relpath) # Perform and transfer await self.workflow.context.data_manager.transfer_data( src_location=selected_location.location, src_path=selected_location.path, dst_locations=dst_locations, - dst_path=filepath, + dst_path=str(filepath), writable=self.writable, ) # Transform token value new_token_value = { "class": token_class, - "path": filepath, - "location": "file://" + filepath, - "basename": path_processor.basename(filepath), - "dirname": path_processor.dirname(filepath), + "path": str(filepath), + "location": "file://" + str(filepath), + "basename": filepath.name, + "dirname": str(filepath.parent), } # If token contains a file if token_class == "File": # nosec # Retrieve symbolic link data locations data_locations = self.workflow.context.data_manager.get_data_locations( - path=filepath, + path=str(filepath), deployment=dst_connector.deployment_name, data_type=DataType.SYMBOLIC_LINK, ) @@ -577,15 +575,16 @@ async def _update_file_token( for data_location in data_locations: if ( data_location.name == location.name - and data_location.path == filepath + and data_location.path == str(filepath) ): break else: - checksum = "sha1${}".format( - await remotepath.checksum( - self.workflow.context, dst_connector, location, filepath - ) + loc_path = StreamFlowPath( + str(filepath), + context=self.workflow.context, + location=location, ) + checksum = f"sha1${await loc_path.checksum()}" if checksum != original_checksum: raise WorkflowExecutionException( "Error transferring file {} in location {} to {} in location {}".format( @@ -614,7 +613,7 @@ async def _update_file_token( self._update_file_token( job=job, token_value=element, - dest_path=path_processor.dirname(filepath), + dest_path=filepath.parent, ) ) for element in token_value["secondaryFiles"] @@ -636,15 +635,17 @@ async def _update_file_token( # Otherwise, create elements remotely else: # Build destination path - filepath = dest_path or path_processor.join( - job.input_directory, utils.random_name() - ) + filepath = dest_path or indir # If the token contains a directory, simply create it if token_class == "Directory": # nosec await asyncio.gather( *( asyncio.create_task( - remotepath.mkdir(dst_connector, location, filepath) + StreamFlowPath( + str(filepath), + context=self.workflow.context, + location=location, + ).mkdir(mode=0o777, parents=True, exist_ok=True) ) for location in dst_locations ) @@ -654,20 +655,29 @@ async def _update_file_token( await asyncio.gather( *( asyncio.create_task( - remotepath.mkdir( - dst_connector, - location, - path_processor.dirname(filepath), - ) + StreamFlowPath( + str(filepath.parent), + context=self.workflow.context, + location=location, + ).mkdir(mode=0o777, exist_ok=True) ) for location in dst_locations ) ) await utils.write_remote_file( context=self.workflow.context, - job=job, + locations=dst_locations, content=token_value.get("contents", ""), - path=filepath, + path=str(filepath), + relpath=( + str(filepath.relative_to(job.output_directory)) + if filepath.is_relative_to(job.output_directory) + else ( + str(filepath.relative_to(indir)) + if filepath != indir + else str(indir) + ) + ), ) # Build file token new_token_value = await utils.get_file_token( @@ -676,7 +686,7 @@ async def _update_file_token( cwl_version=cast(CWLWorkflow, self.workflow).cwl_version, locations=dst_locations, token_class=token_class, - filepath=filepath, + filepath=str(filepath), load_contents="contents" in token_value, load_listing=LoadListing.no_listing, ) diff --git a/streamflow/cwl/token.py b/streamflow/cwl/token.py index be9c3441..5877c7a9 100644 --- a/streamflow/cwl/token.py +++ b/streamflow/cwl/token.py @@ -6,7 +6,7 @@ from streamflow.core.data import DataType from streamflow.core.workflow import Token from streamflow.cwl import utils -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.workflow.token import FileToken @@ -20,16 +20,11 @@ async def _get_file_token_weight(context: StreamFlowContext, value: Any): path=path, data_type=DataType.PRIMARY ) if data_locations: - data_location = list(data_locations)[0] - connector = context.deployment_manager.get_connector( - data_location.deployment - ) - real_path = await remotepath.follow_symlink( - context, connector, data_location.location, data_location.path - ) - weight = await remotepath.size( - connector, data_location.location, real_path + data_location = next(iter(data_locations)) + path = StreamFlowPath( + data_location.path, context=context, location=data_location.location ) + weight = await (await path.resolve()).size() if "secondaryFiles" in value: weight += sum( await asyncio.gather( diff --git a/streamflow/cwl/transformer.py b/streamflow/cwl/transformer.py index 7d7b4ba9..bcb9c593 100644 --- a/streamflow/cwl/transformer.py +++ b/streamflow/cwl/transformer.py @@ -2,7 +2,7 @@ import functools import json -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any, cast from streamflow.core.context import StreamFlowContext diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 59735549..7cf3d503 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -5,13 +5,12 @@ import os import posixpath import urllib.parse +from collections.abc import MutableMapping, MutableSequence from enum import Enum from pathlib import PurePosixPath from types import ModuleType from typing import ( Any, - MutableMapping, - MutableSequence, cast, get_args, ) diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 41da4be5..1ac71415 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -15,7 +15,7 @@ from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT from streamflow.core.context import StreamFlowContext -from streamflow.core.data import DataLocation, DataType, FileType +from streamflow.core.data import DataLocation, DataType from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.exception import ( WorkflowDefinitionException, @@ -25,7 +25,7 @@ from streamflow.core.utils import random_name from streamflow.core.workflow import Job, Token, Workflow from streamflow.cwl.expression import DependencyResolver -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_path_processor from streamflow.log_handler import logger from streamflow.workflow.utils import get_token_value @@ -38,47 +38,47 @@ async def _check_glob_path( input_directory: str, output_directory: str, tmp_directory: str, - path: str, - realpath: str, + path: StreamFlowPath, + real_path: StreamFlowPath, ) -> None: # Cannot glob outside the job output folder if not ( - realpath.startswith(output_directory) - or realpath.startswith(input_directory) - or realpath.startswith(tmp_directory) + real_path.is_relative_to(input_directory) + or real_path.is_relative_to(output_directory) + or real_path.is_relative_to(tmp_directory) ): - path_processor = get_path_processor(connector) - relative_path = path_processor.relpath(path, output_directory) - base_path = ( - path_processor.normpath(realpath[: -len(relative_path)]) - if realpath.endswith(relative_path) - else path_processor.dirname(realpath) + relative_path = path.relative_to(output_directory) + base_path = StreamFlowPath( + ( + str(real_path).removesuffix(str(relative_path)) + if str(real_path).endswith(str(relative_path)) + else real_path.parent + ), + context=workflow.context, + location=location, ) if not await search_in_parent_locations( context=workflow.context, connector=connector, - path=realpath, - relpath=path_processor.relpath(path, output_directory), - base_path=base_path, + path=str(real_path), + relpath=str(relative_path), + base_path=str(base_path), ): - input_dirs = await remotepath.listdir( - connector, location, input_directory, FileType.DIRECTORY + indir = StreamFlowPath( + input_directory, context=workflow.context, location=location ) - for input_dir in input_dirs: - input_path = path_processor.join( - input_dir, path_processor.relpath(path, output_directory) - ) - if await remotepath.exists(connector, location, input_path): - return + async for _, dirnames, _ in indir.walk(): + for dirname in dirnames: + if await (indir / dirname / relative_path).exists(): + return + break raise WorkflowDefinitionException( "Globs outside the job's output folder are not allowed" ) async def _get_contents( - connector: Connector, - location: ExecutionLocation, - path: str, + path: StreamFlowPath, size: int, cwl_version: str, ): @@ -86,7 +86,61 @@ async def _get_contents( raise WorkflowExecutionException( f"Cannot read contents from files larger than {CONTENT_LIMIT / 1024}kB" ) - return await remotepath.head(connector, location, path, CONTENT_LIMIT) + return await path.read_text(n=CONTENT_LIMIT) + + +async def _get_listing( + context: StreamFlowContext, + connector: Connector, + cwl_version: str, + locations: MutableSequence[ExecutionLocation], + dirpath: StreamFlowPath, + load_contents: bool, + recursive: bool, +) -> MutableSequence[MutableMapping[str, Any]]: + listing_tokens = {} + for location in locations: + loc_path = StreamFlowPath(dirpath, context=context, location=location) + async for dirpath, dirnames, filenames in loc_path.walk(): + for dirname in dirnames: + directory = dirpath / dirname + if str(directory) not in listing_tokens: + load_listing = ( + LoadListing.deep_listing + if recursive + else LoadListing.no_listing + ) + listing_tokens[str(directory)] = asyncio.create_task( + get_file_token( # nosec + context=context, + connector=connector, + cwl_version=cwl_version, + locations=locations, + token_class="Directory", + filepath=str(directory), + load_contents=load_contents, + load_listing=load_listing, + ) + ) + for filename in filenames: + file = dirpath / filename + if str(file) not in listing_tokens: + listing_tokens[str(file)] = asyncio.create_task( + get_file_token( # nosec + context=context, + connector=connector, + cwl_version=cwl_version, + locations=locations, + token_class="File", + filepath=str(file), + load_contents=load_contents, + ) + ) + break + return cast( + MutableSequence[MutableMapping[str, Any]], + await asyncio.gather(*listing_tokens.values()), + ) async def _process_secondary_file( @@ -109,7 +163,8 @@ async def _process_secondary_file( elif isinstance(secondary_file, MutableMapping): filepath = get_path_from_token(secondary_file) for location in locations: - if await remotepath.exists(connector, location, filepath): + path = StreamFlowPath(filepath, context=context, location=location) + if await path.exists(): return await get_file_token( context=context, connector=connector, @@ -145,12 +200,9 @@ async def _process_secondary_file( else: # Search file in job locations and build token value for location in locations: - if await remotepath.exists(connector, location, filepath): - token_class = ( - "File" - if await remotepath.isfile(connector, location, filepath) - else "Directory" - ) + path = StreamFlowPath(filepath, context=context, location=location) + if await path.exists(): + token_class = "File" if await path.is_file() else "Directory" return await get_file_token( context=context, connector=connector, @@ -185,33 +237,37 @@ async def _register_path( relpath: str, data_type: DataType = DataType.PRIMARY, ) -> DataLocation | None: - if real_path := await remotepath.follow_symlink(context, connector, location, path): + path = StreamFlowPath(path, context=context, location=location) + if real_path := await path.resolve(): if real_path != path: if data_locations := context.data_manager.get_data_locations( - path=real_path, deployment=connector.deployment_name + path=str(real_path), deployment=connector.deployment_name ): data_location = next(iter(data_locations)) else: - path_processor = get_path_processor(connector) - base_path = path_processor.normpath(path[: -len(relpath)]) - if real_path.startswith(base_path): + base_path = StreamFlowPath( + str(path).removesuffix(str(relpath)), + context=context, + location=location, + ) + if real_path.is_relative_to(base_path): data_location = context.data_manager.register_path( location=location, - path=real_path, - relpath=path_processor.relpath(real_path, base_path), + path=str(real_path), + relpath=str(real_path.relative_to(base_path)), ) elif data_locations := await search_in_parent_locations( context=context, connector=connector, - path=real_path, - relpath=path_processor.basename(real_path), + path=str(real_path), + relpath=real_path.name, ): data_location = data_locations[0] else: return None link_location = context.data_manager.register_path( location=location, - path=path, + path=str(path), relpath=relpath, data_type=DataType.SYMBOLIC_LINK, ) @@ -219,7 +275,7 @@ async def _register_path( return data_location else: return context.data_manager.register_path( - location=location, path=path, relpath=relpath, data_type=data_type + location=location, path=str(path), relpath=relpath, data_type=data_type ) return None @@ -448,14 +504,12 @@ async def expand_glob( tmp_directory: str, path: str, ) -> MutableSequence[tuple[str, str]]: - paths = await remotepath.resolve(connector, location, path) or [] + outdir = StreamFlowPath( + output_directory, context=workflow.context, location=location + ) + paths = sorted([p async for p in outdir.glob(path)]) effective_paths = await asyncio.gather( - *( - asyncio.create_task( - remotepath.follow_symlink(workflow.context, connector, location, p) - ) - for p in paths - ) + *(asyncio.create_task(p.resolve()) for p in paths) ) await asyncio.gather( *( @@ -468,24 +522,23 @@ async def expand_glob( output_directory=output_directory, tmp_directory=tmp_directory, path=p, - realpath=ep or p, + real_path=ep or p, ) ) for p, ep in zip(paths, effective_paths) ) ) - return [(p, (ep or p)) for p, ep in zip(paths, effective_paths)] + return [(str(p), str(ep or p)) for p, ep in zip(paths, effective_paths)] async def get_class_from_path( path: str, job: Job, context: StreamFlowContext ) -> str | None: - connector = context.scheduler.get_connector(job.name) locations = context.scheduler.get_locations(job.name) for location in locations: return ( "File" - if await remotepath.isfile(connector, location, path) + if await StreamFlowPath(path, context=context, location=location).is_file() else "Directory" ) raise WorkflowExecutionException( @@ -520,33 +573,29 @@ async def get_file_token( token["format"] = file_format token["nameroot"], token["nameext"] = path_processor.splitext(basename) for location in locations: - if real_path := await remotepath.follow_symlink( - context, connector, location, filepath - ): - token["size"] = await remotepath.size(connector, location, real_path) + path = StreamFlowPath(filepath, context=context, location=location) + if real_path := await path.resolve(): + token["size"] = await real_path.size() if load_contents: token["contents"] = await _get_contents( - connector, - location, real_path, token["size"], cwl_version, ) token["checksum"] = "sha1${checksum}".format( - checksum=await remotepath.checksum( - context, connector, location, real_path - ) + checksum=await real_path.checksum() ) break elif token_class == "Directory" and load_listing != LoadListing.no_listing: # nosec for location in locations: - if await remotepath.exists(connector, location, filepath): - token["listing"] = await get_listing( + path = StreamFlowPath(filepath, context=context, location=location) + if await path.exists(): + token["listing"] = await _get_listing( context=context, connector=connector, cwl_version=cwl_version, locations=locations, - dirpath=filepath, + dirpath=path, load_contents=load_contents, recursive=load_listing == LoadListing.deep_listing, ) @@ -554,57 +603,6 @@ async def get_file_token( return token -async def get_listing( - context: StreamFlowContext, - connector: Connector, - cwl_version: str, - locations: MutableSequence[ExecutionLocation], - dirpath: str, - load_contents: bool, - recursive: bool, -) -> MutableSequence[MutableMapping[str, Any]]: - listing_tokens = {} - for location in locations: - directories = await remotepath.listdir( - connector, location, dirpath, FileType.DIRECTORY - ) - for directory in directories: - if directory not in listing_tokens: - load_listing = ( - LoadListing.deep_listing if recursive else LoadListing.no_listing - ) - listing_tokens[directory] = asyncio.create_task( - get_file_token( # nosec - context=context, - connector=connector, - cwl_version=cwl_version, - locations=locations, - token_class="Directory", - filepath=directory, - load_contents=load_contents, - load_listing=load_listing, - ) - ) - files = await remotepath.listdir(connector, location, dirpath, FileType.FILE) - for file in files: - if file not in listing_tokens: - listing_tokens[file] = asyncio.create_task( - get_file_token( # nosec - context=context, - connector=connector, - cwl_version=cwl_version, - locations=locations, - token_class="File", - filepath=file, - load_contents=load_contents, - ) - ) - return cast( - MutableSequence[MutableMapping[str, Any]], - await asyncio.gather(*listing_tokens.values()), - ) - - def get_name( name_prefix: str, cwl_name_prefix: str, @@ -955,14 +953,14 @@ async def update_file_token( load_contents: bool | None, load_listing: LoadListing | None = None, ): - filepath = get_path_from_token(token_value) + filepath = StreamFlowPath( + get_path_from_token(token_value), context=context, location=location + ) # Process contents if get_token_class(token_value) == "File" and load_contents is not None: if load_contents and "contents" not in token_value: token_value |= { "contents": await _get_contents( - connector, - location, filepath, token_value["size"], cwl_version, @@ -979,7 +977,7 @@ async def update_file_token( # If listing is not present or if the token needs a deep listing, process directory contents elif "listing" not in token_value or load_listing == LoadListing.deep_listing: token_value |= { - "listing": await get_listing( + "listing": await _get_listing( context=context, connector=connector, cwl_version=cwl_version, @@ -1001,16 +999,19 @@ async def update_file_token( async def write_remote_file( - context: StreamFlowContext, job: Job, content: str, path: str + context: StreamFlowContext, + locations: MutableSequence[ExecutionLocation], + content: str, + path: str, + relpath: str, ): - for location in context.scheduler.get_locations(job.name): - connector = context.deployment_manager.get_connector(location.deployment) - path_processor = get_path_processor(connector) - if not await remotepath.exists(connector, location, path): + for location in locations: + path = StreamFlowPath(path, context=context, location=location) + if not await path.exists(): if logger.isEnabledFor(logging.INFO): logger.info( "Creating {path} {location}".format( - path=path, + path=str(path), location=( "on local file-system" if location.local @@ -1018,11 +1019,9 @@ async def write_remote_file( ), ) ) - await remotepath.write(connector, location, path, content) + await path.write_text(content) context.data_manager.register_path( location=location, - path=path, - relpath=path_processor.relpath( - path_processor.normpath(path), job.output_directory - ), + path=str(path), + relpath=relpath, ) diff --git a/streamflow/cwl/workflow.py b/streamflow/cwl/workflow.py index eb64a0b7..c382b6a8 100644 --- a/streamflow/cwl/workflow.py +++ b/streamflow/cwl/workflow.py @@ -2,7 +2,7 @@ import json from collections.abc import MutableMapping -from typing import TYPE_CHECKING, Any, cast +from typing import Any, TYPE_CHECKING, cast from rdflib import Graph diff --git a/streamflow/data/manager.py b/streamflow/data/manager.py index 8246d718..7bf707f5 100644 --- a/streamflow/data/manager.py +++ b/streamflow/data/manager.py @@ -10,7 +10,7 @@ from streamflow.core.data import DataLocation, DataManager, DataType from streamflow.core.exception import WorkflowExecutionException -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.connector.local import LocalConnector from streamflow.deployment.utils import get_path_processor from streamflow.log_handler import logger @@ -329,9 +329,9 @@ async def transfer_data( await asyncio.gather( *( asyncio.create_task( - remotepath.mkdir( - dst_connector, location, str(Path(dst_path).parent) - ) + StreamFlowPath( + dst_path, context=self.context, location=location + ).parent.mkdir(mode=0o777, exist_ok=True) ) for location in dst_locations ) @@ -349,9 +349,9 @@ async def transfer_data( ) ) if ( - src_realpath := await remotepath.follow_symlink( - self.context, src_connector, src_location, src_path - ) + src_realpath := await StreamFlowPath( + src_path, context=self.context, location=src_location + ).resolve() ) is None: logger.info(f"Remote file system: {repr(self.path_mapper)}") raise WorkflowExecutionException( @@ -361,7 +361,7 @@ async def transfer_data( else: src_path = src_realpath primary_locations = self.path_mapper.get( - path=src_path, data_type=DataType.PRIMARY + path=str(src_path), data_type=DataType.PRIMARY ) copy_tasks = [] remote_locations = [] @@ -394,31 +394,26 @@ async def transfer_data( else: remote_locations.append(dst_location) # If the source path has already been registered - if src_data_locations := self.path_mapper.get(path=src_path): + if src_data_locations := self.path_mapper.get(path=str(src_path)): src_data_location = next(iter(src_data_locations)) # Compute actual destination path - loc_dst_path = ( - get_path_processor(dst_connector).join( - dst_path, - get_path_processor(src_connector).basename(src_path), - ) - if await remotepath.isdir( - connector=dst_connector, - location=dst_location, - path=dst_path, - ) - else dst_path + loc_dst_path = StreamFlowPath( + dst_path, context=self.context, location=dst_location ) + if await loc_dst_path.is_dir(): + loc_dst_path /= src_path.name # Register path and data location for parent folder self.register_path(dst_location, str(Path(loc_dst_path).parent)) # Register the new `DataLocation` object dst_data_location = DataLocation( location=dst_location, - path=loc_dst_path, + path=str(loc_dst_path), relpath=src_data_location.relpath, data_type=DataType.PRIMARY, ) - self.path_mapper.put(path=loc_dst_path, data_location=dst_data_location) + self.path_mapper.put( + path=str(loc_dst_path), data_location=dst_data_location + ) data_locations.append(dst_data_location) # If the destination is not writable , map the new `DataLocation` object to the source locations if not writable: @@ -436,7 +431,7 @@ async def transfer_data( _copy( src_connector=src_connector, src_location=src_location, - src=src_path, + src=str(src_path), dst_connector=dst_connector, dst_locations=remote_locations, dst=dst_path, @@ -448,16 +443,14 @@ async def transfer_data( # Mark all destination data locations as available for data_location in data_locations: if not writable: - connector = self.context.deployment_manager.get_connector( - data_location.deployment + loc_path = StreamFlowPath( + data_location.path, + context=self.context, + location=data_location.location, ) data_location.data_type = ( DataType.SYMBOLIC_LINK - if await remotepath.islink( - connector=connector, - location=data_location.location, - path=data_location.path, - ) + if await loc_path.is_symlink() else DataType.PRIMARY ) # Process wrapped locations if any diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index 817367ef..99c7af63 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -4,18 +4,22 @@ import base64 import glob import hashlib +import io import os +import pathlib import posixpath import shutil -from collections.abc import MutableMapping, MutableSequence +import sys +from abc import ABC, abstractmethod +from collections.abc import AsyncIterator, MutableMapping, MutableSequence from email.message import Message -from typing import TYPE_CHECKING +from pathlib import Path, PosixPath, PurePath, PurePosixPath, WindowsPath +from typing import TYPE_CHECKING, cast import aiohttp from aiohttp import ClientResponse -from streamflow.core import utils -from streamflow.core.data import FileType, DataType +from streamflow.core.data import DataType from streamflow.core.exception import WorkflowExecutionException if TYPE_CHECKING: @@ -52,331 +56,410 @@ def _get_filename_from_response(response: ClientResponse, url: str): return url.rsplit("/", 1)[-1] -def _listdir_local(path: str, file_type: FileType | None) -> MutableSequence[str]: - content = [] - dir_content = os.listdir(path) - check = ( - (os.path.isfile if file_type == FileType.FILE else os.path.isdir) - if file_type is not None - else None - ) - for element in dir_content: - element_path = os.path.join(path, element) - if check and check(element_path): - content.append(element_path) - return content - - -async def checksum( +async def _size( context: StreamFlowContext, - connector: Connector, location: ExecutionLocation | None, - path: str, -) -> str | None: - if location.local: - if os.path.isfile(path): - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - context.process_executor, _file_checksum_local, path - ) - else: - return None + path: str | MutableSequence[str], +) -> int: + if not path: + return 0 + elif location.local: + if not isinstance(path, MutableSequence): + path = [path] + size = 0 + for p in path: + size += await StreamFlowPath(p, context=context, location=location).size() + return size else: - command = [f'test -f "{path}" && sha1sum "{path}" | awk \'{{print $1}}\''] + command = [ + "".join( + [ + "find -L ", + ( + " ".join([f'"{p}"' for p in path]) + if isinstance(path, MutableSequence) + else f'"{path}"' + ), + " -type f -exec ls -ln {} \\+ | ", + "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", + ] + ) + ] + connector = context.deployment_manager.get_connector(location.deployment) result, status = await connector.run( location=location, command=command, capture_output=True ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, location, result - ) - ) - return result.strip() + _check_status(command, location, result, status) + result = result.strip().strip("'\"") + return int(result) if result.isdigit() else 0 -async def download( - connector: Connector, - location: ExecutionLocation | None, - url: str, - parent_dir: str, -) -> str: - await mkdir(connector, location, parent_dir) - if location.local: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - filepath = os.path.join( - parent_dir, _get_filename_from_response(response, url) - ) - with open(filepath, mode="wb") as f: - f.write(await response.read()) - else: - raise Exception( - f"Downloading {url} failed with status {response.status}:\n{response.content}" - ) - else: - async with aiohttp.ClientSession() as session: - async with session.head(url, allow_redirects=True) as response: - if response.status == 200: - filepath = posixpath.join( - parent_dir, _get_filename_from_response(response, url) - ) - else: - raise Exception( - f"Downloading {url} failed with status {response.status}:\n{response.content}" - ) - await connector.run( - location=location, - command=[ - f'if [ command -v curl ]; then curl -L -o "{filepath}" "{url}"; ' - f'else wget -O "{filepath}" "{url}"; fi' - ], - ) - return filepath +class StreamFlowPath(PurePath, ABC): + def __new__( + cls, *args, context: StreamFlowContext, location: ExecutionLocation, **kwargs + ): + if cls is StreamFlowPath: + cls = LocalStreamFlowPath if location.local else RemoteStreamFlowPath + if sys.version_info < (3, 12): + return cls._from_parts(args) + else: + return object.__new__(cls) + @abstractmethod + async def checksum(self) -> str | None: ... -async def exists( - connector: Connector, location: ExecutionLocation | None, path: str -) -> bool: - if location.local: - return os.path.exists(path) - else: - command = [f'test -e "{path}"'] - result, status = await connector.run( - location=location, command=command, capture_output=True + @abstractmethod + async def exists(self) -> bool: ... + + @abstractmethod + def glob( + self, pattern, *, case_sensitive=None + ) -> AsyncIterator[StreamFlowPath]: ... + + @abstractmethod + async def is_dir(self) -> bool: ... + + @abstractmethod + async def is_file(self) -> bool: ... + + @abstractmethod + async def is_symlink(self) -> bool: ... + + @abstractmethod + async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: ... + + @abstractmethod + async def read_text(self, n=-1, encoding=None, errors=None) -> str: ... + + @abstractmethod + async def resolve(self, strict=False) -> StreamFlowPath | None: ... + + @abstractmethod + async def rmtree(self) -> None: ... + + @abstractmethod + async def size(self) -> int: ... + + @abstractmethod + async def symlink_to(self, target, target_is_directory=False) -> None: ... + + @abstractmethod + def walk( + self, top_down=True, on_error=None, follow_symlinks=False + ) -> AsyncIterator[ + tuple[ + StreamFlowPath, + MutableSequence[str], + MutableSequence[str], + ] + ]: ... + + @abstractmethod + async def write_text( + self, data: str, encoding=None, errors=None, newline=None + ) -> int: ... + + +class classinstancemethod(classmethod): + def __get__(self, *args, **kwargs): + return (super().__get__ if args[0] is None else self.__func__.__get__)( + args[0], args[1] ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, location, result - ) - ) - else: - return not status -async def follow_symlink( - context: StreamFlowContext, - connector: Connector, - location: ExecutionLocation | None, - path: str, -) -> str | None: - """ - Get resolved symbolic links or canonical file names +class __LegacyStreamFlowPath(StreamFlowPath, ABC): - :param context: the `StreamFlowContext` object with global application status. - :param connector: the `Connector` object to communicate with the location - :param location: the `ExecutionLocation` object with the location information - :param path: the path to be resolved in the case of symbolic link - :return: the path of the resolved symlink or `None` if the link points to a location that does not exist - """ - if location.local: - return os.path.realpath(path) if os.path.exists(path) else None - else: - # If at least one primary location is present on the site, return its path - if locations := context.data_manager.get_data_locations( - path=path, - deployment=connector.deployment_name, - location_name=location.name, - data_type=DataType.PRIMARY, - ): - return next(iter(locations)).path - # Otherwise, analyse the remote path - command = [f'test -e "{path}" && readlink -f "{path}"'] - result, status = await connector.run( - location=location, command=command, capture_output=True + @classinstancemethod + def _from_parts( + self, args, init=sys.version_info.major == 3 and sys.version_info.minor == 9 + ) -> StreamFlowPath: + obj = ( + object.__new__(self) + if isinstance(self, type) + else type(self)( + context=getattr(self, "context", None), + location=getattr(self, "location", None), + ) ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, location, result - ) + drv, root, parts = obj._parse_args(args) + obj._drv = drv + obj._root = root + obj._parts = parts + if init: + obj._init() + return obj + + @classinstancemethod + def _from_parsed_parts( + self, + drv, + root, + parts, + init=sys.version_info.major == 3 and sys.version_info.minor == 9, + ): + obj = ( + object.__new__(self) + if isinstance(self, type) + else type(self)( + context=getattr(self, "context", None), + location=getattr(self, "location", None), ) - return result.strip() if status == 0 else None - + ) + obj._drv = drv + obj._root = root + obj._parts = parts + if init: + obj._init() + return obj + + def _scandir(self): + return os.scandir(self) + + def walk(self, top_down=True, on_error=None, follow_symlinks=False): + paths = [self] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + try: + scandir_it = path._scandir() + except OSError as error: + if on_error is not None: + on_error(error) + continue + + with scandir_it: + dirnames = [] + filenames = [] + for entry in scandir_it: + try: + is_dir = entry.is_dir(follow_symlinks=follow_symlinks) + except OSError: + is_dir = False + if is_dir: + dirnames.append(entry.name) + else: + filenames.append(entry.name) + if top_down: + yield path, dirnames, filenames + else: + paths.append((path, dirnames, filenames)) + paths += [path._make_child_relpath(d) for d in reversed(dirnames)] -async def get_storage_usages( - connector: Connector, location: ExecutionLocation, hardware: Hardware -) -> MutableMapping[str, int]: - """ - Get the actual size of the hardware storage paths - Warn. Storage keys are not mount points. - Since the meaning of storage dictionary keys depends on each `HardwareRequirement` implementation, - no assumption about the key meaning should be made. +class LocalStreamFlowPath( + WindowsPath if os.name == "nt" else PosixPath, + __LegacyStreamFlowPath if sys.version_info < (3, 12) else StreamFlowPath, +): + def __init__( + self, + *args, + context: StreamFlowContext, + location: ExecutionLocation | None = None, + ): + if sys.version_info < (3, 12): + super().__init__() + else: + super().__init__(*args) + self.context: StreamFlowContext = context - :param connector: the `Connector` object to communicate with the location - :param location: the `ExecutionLocation` object with the location information - :param hardware: the `Hardware` which contains the paths to discover size. - :return: a map with the `key` of the `hardware` storage and the size of the paths in the `hardware` storage - """ + async def checksum(self) -> str | None: + if await self.is_file(): + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.context.process_executor, _file_checksum_local, self.__str__() + ) + else: + return None - # It is not an accurate snapshot of the resources used - # Eventual follow links inside the Storage paths should be resolved but checking their mount points. - return dict( - zip( - hardware.storage.keys(), - await asyncio.gather( - *( - asyncio.create_task( - size( - connector=connector, - location=location, - path=list(storage.paths), - ) + async def exists(self) -> bool: + return cast(Path, super()).exists() + + async def glob( + self, pattern, *, case_sensitive=None + ) -> AsyncIterator[LocalStreamFlowPath]: + for path in glob.glob(str(self / pattern)): + yield LocalStreamFlowPath(path, context=self.context) + + async def is_dir(self) -> bool: + return cast(Path, super()).is_dir() + + async def is_file(self) -> bool: + return cast(Path, super()).is_file() + + async def is_symlink(self) -> bool: + return cast(Path, super()).is_symlink() + + async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: + try: + os.mkdir(self, mode) + except FileNotFoundError: + if not parents or self.parent == self: + raise + await self.parent.mkdir(parents=True, exist_ok=True) + await self.mkdir(mode, parents=False, exist_ok=exist_ok) + except OSError: + if not exist_ok or not await self.is_dir(): + raise + + @property + def parents(self): + if sys.version_info < (3, 12): + + class __PathParents(pathlib._PathParents): + def __init__(self, path): + super().__init__(path) + self._path = path + + def __getitem__(self, idx): + if idx < 0 or idx >= len(self): + raise IndexError(idx) + return self._path._from_parsed_parts( + self._drv, self._root, self._parts[: -idx - 1] ) - for storage in hardware.storage.values() - ) - ), - ) - ) + return __PathParents(self) + else: + return super().parents + + async def read_text(self, n=-1, encoding=None, errors=None) -> str: + if sys.version_info > (3, 9): + encoding = io.text_encoding(encoding) + with self.open(mode="r", encoding=encoding, errors=errors) as f: + return f.read(n) + + async def resolve(self, strict=False) -> LocalStreamFlowPath | None: + if await self.exists(): + return LocalStreamFlowPath( + super().resolve(strict=strict), context=self.context + ) + else: + return None -async def head( - connector: Connector, location: ExecutionLocation | None, path: str, num_bytes: int -) -> str: - if location.local: - with open(path, "rb") as f: - return f.read(num_bytes).decode("utf-8") - else: - command = ["head", "-c", str(num_bytes), path] - result, status = await connector.run( - location=location, command=command, capture_output=True + async def rmtree(self) -> None: + if await self.exists(): + if await self.is_symlink(): + self.unlink() + elif await self.is_dir(): + shutil.rmtree(self.__str__()) + else: + self.unlink(missing_ok=True) + + async def size(self) -> int: + if await self.is_file(): + return self.stat().st_size if not await self.is_symlink() else 0 + else: + total_size = 0 + async for dirpath, _, filenames in self.walk(): + for f in filenames: + fp = dirpath / f + if not await fp.is_symlink(): + total_size += fp.stat().st_size + return total_size + + async def symlink_to(self, target, target_is_directory=False) -> None: + return cast(Path, super()).symlink_to( + target=target, target_is_directory=target_is_directory ) - _check_status(command, location, result, status) - return result.strip() + async def walk( + self, top_down=True, on_error=None, follow_symlinks=False + ) -> AsyncIterator[ + tuple[ + LocalStreamFlowPath, + MutableSequence[str], + MutableSequence[str], + ] + ]: + for dirpath, dirnames, filenames in cast(Path, super()).walk( + top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks + ): + yield dirpath, dirnames, filenames -async def isdir( - connector: Connector, location: ExecutionLocation | None, path: str -) -> bool: - if location.local: - return os.path.isdir(path) - else: - command = [f'test -d "{path}"'] - result, status = await connector.run( - location=location, command=command, capture_output=True + def with_segments(self, *pathsegments): + return type(self)(*pathsegments, context=self.context) + + async def write_text( + self, data: str, encoding=None, errors=None, newline=None + ) -> int: + return cast(Path, super()).write_text( + data=data, encoding=encoding, errors=errors, newline=newline ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, location, result - ) - ) - else: - return not status -async def isfile( - connector: Connector, location: ExecutionLocation | None, path: str -) -> bool: - if location.local: - return os.path.isfile(path) - else: - command = [f'test -f "{path}"'] - result, status = await connector.run( - location=location, command=command, capture_output=True +class RemoteStreamFlowPath( + PurePosixPath, + __LegacyStreamFlowPath if sys.version_info < (3, 12) else StreamFlowPath, +): + __slots__ = ("context", "connector", "location") + + def __init__(self, *args, context: StreamFlowContext, location: ExecutionLocation): + if sys.version_info < (3, 12): + super().__init__() + else: + super().__init__(*args) + self.context: StreamFlowContext = context + self.connector: Connector = self.context.deployment_manager.get_connector( + location.deployment + ) + self.location: ExecutionLocation = location + + async def _test(self, command: list[str]) -> bool: + command = ["test"] + command + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) if status > 1: raise WorkflowExecutionException( "{} Command '{}' on location {}: {}".format( - status, command, location, result + status, command, self.location, result ) ) else: return not status - -async def islink( - connector: Connector, location: ExecutionLocation | None, path: str -) -> bool: - if location.local: - return os.path.islink(path) - else: - command = [f'test -L "{path}"'] - result, status = await connector.run( - location=location, command=command, capture_output=True + async def checksum(self): + command = [ + "test", + "-f", + f"'{self.__str__()}'", + "&&", + "sha1sum", + f"'{self.__str__()}'", + "|", + "awk", + "'{print $1}'", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) if status > 1: raise WorkflowExecutionException( "{} Command '{}' on location {}: {}".format( - status, command, location, result + status, command, self.location, result ) ) - else: - return not status - - -async def listdir( - connector: Connector, - location: ExecutionLocation | None, - path: str, - file_type: FileType | None = None, -) -> MutableSequence[str]: - if location.local: - return _listdir_local(path, file_type) - else: - command = 'find -L "{path}" -mindepth 1 -maxdepth 1 {type}'.format( - path=path, - type=( - "-type {type}".format( - type="d" if file_type == FileType.DIRECTORY else "f" - ) - if file_type is not None - else "" - ), - ).split() - content, status = await connector.run( - location=location, command=command, capture_output=True - ) - _check_status(command, location, content, status) - content = content.strip(" \n") - return content.splitlines() if content else [] - - -async def mkdir( - connector: Connector, - location: ExecutionLocation, - path: str | MutableSequence[str], -) -> None: - paths = path if isinstance(path, MutableSequence) else [path] - if location.local: - for path in paths: - os.makedirs(path, exist_ok=True) - else: - command = ["mkdir", "-p"] + list(paths) - result, status = await connector.run( - location=location, command=command, capture_output=True - ) - _check_status(command, location, result, status) - - -async def read( - connector: Connector, location: ExecutionLocation | None, path: str -) -> str: - if location.local: - with open(path, "rb") as f: - return f.read().decode("utf-8") - else: - command = ["cat", path] - result, status = await connector.run( - location=location, command=command, capture_output=True - ) - _check_status(command, location, result, status) return result.strip() + async def exists(self, *, follow_symlinks=True) -> bool: + return await self._test( + command=( + ["-e", f"'{self.__str__()}'"] + if follow_symlinks + else ["-e", f"'{self.__str__()}'", "-o", "-L", f"'{self.__str__()}'"] + ) + ) -async def resolve( - connector: Connector, location: ExecutionLocation | None, pattern: str -) -> MutableSequence[str] | None: - if location.local: - return sorted(glob.glob(pattern)) - else: + async def glob( + self, pattern, *, case_sensitive=None + ) -> AsyncIterator[RemoteStreamFlowPath]: + if not pattern: + raise ValueError(f"Unacceptable pattern: {pattern!r}") command = [ "printf", '"%s\\0"', - pattern, + str(self / pattern), "|", "xargs", "-0", @@ -387,105 +470,284 @@ async def resolve( "|", "sort", ] - result, status = await connector.run( - location=location, command=command, capture_output=True + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - _check_status(command, location, result, status) - return result.split() + _check_status(command, self.location, result, status) + for path in result.split(): + yield RemoteStreamFlowPath( + path, context=self.context, location=self.location + ) + async def is_dir(self) -> bool: + return await self._test(command=["-d", f"'{self.__str__()}'"]) -async def rm( - connector: Connector, - location: ExecutionLocation | None, - path: str | MutableSequence[str], -) -> None: - if location.local: - if isinstance(path, MutableSequence): - for p in path: - if os.path.exists(p): - if os.path.islink(p): - os.remove(p) - elif os.path.isdir(p): - shutil.rmtree(p) - else: - os.remove(p) - else: - if os.path.exists(path): - if os.path.islink(path): - os.remove(path) - elif os.path.isdir(path): - shutil.rmtree(path) - else: - os.remove(path) - else: - if isinstance(path, MutableSequence): - path = " ".join([f'"{p}"' for p in path]) - else: - path = f'"{path}"' - await connector.run(location=location, command=["".join(["rm -rf ", path])]) + async def is_file(self) -> bool: + return await self._test(command=["-f", f"'{self.__str__()}'"]) + async def is_symlink(self) -> bool: + return await self._test(command=["-L", f"'{self.__str__()}'"]) -async def size( - connector: Connector, - location: ExecutionLocation | None, - path: str | MutableSequence[str], -) -> int: - """ - Get the data size. + async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: + command = ["mkdir", "-m", f"{mode:o}"] + if parents or exist_ok: + command.append("-p") + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) - If data reside in the local location, Python functions are called to get the size. - Indeed, Python functions are much faster than new processes calling shell commands, - and the Python stack is more portable across different platforms. - Otherwise, a Linux shell command is executed to get the data size from remote locations. + async def read_text(self, n=-1, encoding=None, errors=None) -> str: + command = ["head", "-c", str(n)] if n >= 0 else ["cat"] + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + return result.strip() - :param connector: the `Connector` object to communicate with the location - :param location: the `ExecutionLocation` object with the location information - :return: the sum of all the input path sizes, expressed in bytes - """ - if not path: - return 0 - elif location.local: - if not isinstance(path, MutableSequence): - path = [path] - return sum(utils.get_size(p) for p in path) - else: + async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: + # If at least one primary location is present on the site, return its path + if locations := self.context.data_manager.get_data_locations( + path=self.__str__(), + deployment=self.connector.deployment_name, + location_name=self.location.name, + data_type=DataType.PRIMARY, + ): + return RemoteStreamFlowPath( + next(iter(locations)).path, + context=self.context, + location=next(iter(locations)).location, + ) + # Otherwise, analyse the remote path + command = [ + "test", + "-e", + f"'{self.__str__()}'", + "&&", + "readlink", + "-f", + f"'{self.__str__()}'", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + if status > 1: + raise WorkflowExecutionException( + "{} Command '{}' on location {}: {}".format( + status, command, self.location, result + ) + ) + return ( + RemoteStreamFlowPath( + result.strip(), context=self.context, location=self.location + ) + if status == 0 + else None + ) + + async def rmtree(self) -> None: + command = ["rm", "-rf ", self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + + async def size(self) -> int: command = [ "".join( [ "find -L ", - ( - " ".join([f'"{p}"' for p in path]) - if isinstance(path, MutableSequence) - else f'"{path}"' - ), + f'"{self.__str__()}"', " -type f -exec ls -ln {} \\+ | ", "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", ] ) ] - result, status = await connector.run( - location=location, command=command, capture_output=True + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - _check_status(command, location, result, status) + _check_status(command, self.location, result, status) result = result.strip().strip("'\"") return int(result) if result.isdigit() else 0 - -async def write( - connector: Connector, location: ExecutionLocation | None, path: str, content: str -) -> None: - if location.local: - with open(path, "w") as f: - f.write(content) - else: + async def symlink_to(self, target, target_is_directory=False) -> None: + command = ["ln", "-snf", str(target), self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + + async def walk( + self, top_down=True, on_error=None, follow_symlinks=False + ) -> AsyncIterator[ + tuple[ + LocalStreamFlowPath, + MutableSequence[str], + MutableSequence[str], + ] + ]: + paths = [self] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + command = ["find", f"'{str(path)}'", "-mindepth", "1", "-maxdepth", "1"] + if follow_symlinks: + command.append("-L") + try: + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "d"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + dirnames = ( + [ + str( + RemoteStreamFlowPath( + p, context=self.context, location=self.location + ).relative_to(path) + ) + for p in content.splitlines() + ] + if content + else [] + ) + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "f"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + filenames = ( + [ + str( + RemoteStreamFlowPath( + p, context=self.context, location=self.location + ).relative_to(path) + ) + for p in content.splitlines() + ] + if content + else [] + ) + except WorkflowExecutionException as error: + if on_error is not None: + on_error(error) + continue + + if top_down: + yield path, dirnames, filenames + else: + paths.append((path, dirnames, filenames)) + paths += [path._make_child_relpath(d) for d in reversed(dirnames)] + + def with_segments(self, *pathsegments): + return type(self)(*pathsegments, context=self.context, location=self.location) + + async def write_text( + self, data: str, encoding=None, errors=None, newline=None + ) -> int: + if not isinstance(data, str): + raise TypeError("data must be str, not %s" % data.__class__.__name__) command = [ "echo", - base64.b64encode(content.encode("utf-8")).decode("utf-8"), + base64.b64encode(data.encode("utf-8")).decode("utf-8"), "|", "base64", "-d", ] - result, status = await connector.run( - location=location, command=command, stdout=path, capture_output=True + result, status = await self.connector.run( + location=self.location, + command=command, + stdout=self.__str__(), + capture_output=True, ) - _check_status(command, location, result, status) + _check_status(command, self.location, result, status) + return len(data) + + +async def download( + context: StreamFlowContext, + location: ExecutionLocation | None, + url: str, + parent_dir: str, +) -> StreamFlowPath: + await StreamFlowPath(parent_dir, context=context, location=location).mkdir( + mode=0o777, exist_ok=True + ) + if location.local: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + filepath = os.path.join( + parent_dir, _get_filename_from_response(response, url) + ) + with open(filepath, mode="wb") as f: + f.write(await response.read()) + else: + raise Exception( + f"Downloading {url} failed with status {response.status}:\n{response.content}" + ) + else: + async with aiohttp.ClientSession() as session: + async with session.head(url, allow_redirects=True) as response: + if response.status == 200: + filepath = posixpath.join( + parent_dir, _get_filename_from_response(response, url) + ) + else: + raise Exception( + f"Downloading {url} failed with status {response.status}:\n{response.content}" + ) + connector = context.deployment_manager.get_connector(location.deployment) + await connector.run( + location=location, + command=[ + f'if [ command -v curl ]; then curl -L -o "{filepath}" "{url}"; ' + f'else wget -O "{filepath}" "{url}"; fi' + ], + ) + return StreamFlowPath(filepath, context=context, location=location) + + +async def get_storage_usages( + context: StreamFlowContext, location: ExecutionLocation, hardware: Hardware +) -> MutableMapping[str, int]: + """ + Get the actual size of the hardware storage paths + + Warn. Storage keys are not mount points. + Since the meaning of storage dictionary keys depends on each `HardwareRequirement` implementation, + no assumption about the key meaning should be made. + + :param context: the `StreamFlowContext` object with global application status + :param location: the `ExecutionLocation` object with the location information + :param hardware: the `Hardware` which contains the paths to discover size. + :return: a map with the `key` of the `hardware` storage and the size of the paths in the `hardware` storage + """ + + # It is not an accurate snapshot of the resources used + # Eventual follow links inside the Storage paths should be resolved but checking their mount points. + return dict( + zip( + hardware.storage.keys(), + await asyncio.gather( + *( + asyncio.create_task( + _size( + context=context, + location=location, + path=list(storage.paths), + ) + ) + for storage in hardware.storage.values() + ) + ), + ) + ) diff --git a/streamflow/data/utils.py b/streamflow/data/utils.py index 28c35fcf..ddc552d8 100644 --- a/streamflow/data/utils.py +++ b/streamflow/data/utils.py @@ -1,12 +1,11 @@ from __future__ import annotations -import os from typing import TYPE_CHECKING from streamflow.core.context import StreamFlowContext from streamflow.core.exception import WorkflowExecutionException from streamflow.core.scheduling import Hardware, Storage -from streamflow.data.remotepath import follow_symlink +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_path_processor if TYPE_CHECKING: @@ -72,20 +71,20 @@ async def get_mount_point( try: return location.hardware.get_mount_point(path) except KeyError: - path_processor = get_path_processor(connector) - path_to_resolve = path - while ( - mount_point := await follow_symlink( - context, connector, location.location, path_to_resolve - ) - ) is None: - path_to_resolve = path_processor.dirname(path_to_resolve) + path_to_resolve = StreamFlowPath( + path, context=context, location=location.location + ) + while (mount_point := await path_to_resolve.resolve()) is None: + path_to_resolve = path_to_resolve.parent if not path_to_resolve: raise WorkflowExecutionException( f"Impossible to find the mount point of {path} path on location {location}" ) location_mount_points = location.hardware.get_mount_points() - while mount_point != os.sep and mount_point not in location_mount_points: - mount_point = path_processor.dirname(mount_point) - location.hardware.get_storage(mount_point).add_path(path) - return mount_point + while ( + mount_point.parent != mount_point + and str(mount_point) not in location_mount_points + ): + mount_point = mount_point.parent + location.hardware.get_storage(str(mount_point)).add_path(path) + return str(mount_point) diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index 2ea40765..c5c6e9e4 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -27,7 +27,6 @@ ) from streamflow.log_handler import logger - FS_TYPES_TO_SKIP = { "-", "bpf", diff --git a/streamflow/deployment/connector/occam.py b/streamflow/deployment/connector/occam.py index 3f942f12..74be392f 100644 --- a/streamflow/deployment/connector/occam.py +++ b/streamflow/deployment/connector/occam.py @@ -5,11 +5,11 @@ import os import posixpath import re -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence +from importlib.resources import files from typing import Any import asyncssh -from importlib.resources import files from ruamel.yaml import YAML from streamflow.core import utils diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index 2acb9876..78efe837 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -9,10 +9,10 @@ from abc import ABC, abstractmethod from collections.abc import MutableMapping, MutableSequence from functools import partial +from importlib.resources import files from typing import Any, cast import cachetools -from importlib.resources import files from streamflow.core import utils from streamflow.core.asyncache import cachedmethod diff --git a/streamflow/deployment/filter/shuffle.py b/streamflow/deployment/filter/shuffle.py index afd19c61..5b42c0ea 100644 --- a/streamflow/deployment/filter/shuffle.py +++ b/streamflow/deployment/filter/shuffle.py @@ -1,6 +1,5 @@ import random from collections.abc import MutableSequence - from importlib.resources import files from streamflow.core.deployment import BindingFilter, Target diff --git a/streamflow/deployment/manager.py b/streamflow/deployment/manager.py index 7b2d7c34..f976113e 100644 --- a/streamflow/deployment/manager.py +++ b/streamflow/deployment/manager.py @@ -5,7 +5,7 @@ import os from collections.abc import MutableMapping from importlib.resources import files -from typing import TYPE_CHECKING, Any, cast +from typing import Any, TYPE_CHECKING, cast from streamflow.core.deployment import ( Connector, diff --git a/streamflow/deployment/wrapper.py b/streamflow/deployment/wrapper.py index 66e7c3ad..ab79d6b1 100644 --- a/streamflow/deployment/wrapper.py +++ b/streamflow/deployment/wrapper.py @@ -2,7 +2,7 @@ import asyncio from abc import ABC -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any from streamflow.core.data import StreamWrapperContextManager diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index c694a7a5..91e4ab94 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import Any from streamflow.config import ext_schemas diff --git a/streamflow/persistence/sqlite.py b/streamflow/persistence/sqlite.py index 9685a955..21ff0a9c 100644 --- a/streamflow/persistence/sqlite.py +++ b/streamflow/persistence/sqlite.py @@ -4,10 +4,10 @@ import json import os from collections.abc import MutableMapping, MutableSequence +from importlib.resources import files from typing import Any, cast import aiosqlite -from importlib.resources import files from streamflow.core import utils from streamflow.core.asyncache import cachedmethod diff --git a/streamflow/recovery/checkpoint_manager.py b/streamflow/recovery/checkpoint_manager.py index 1de483db..1c2cde43 100644 --- a/streamflow/recovery/checkpoint_manager.py +++ b/streamflow/recovery/checkpoint_manager.py @@ -4,9 +4,8 @@ import os import tempfile from collections.abc import MutableSequence -from typing import TYPE_CHECKING - from importlib.resources import files +from typing import TYPE_CHECKING from streamflow.core import utils from streamflow.core.data import DataLocation diff --git a/streamflow/recovery/failure_manager.py b/streamflow/recovery/failure_manager.py index 5f14e74f..1a0e26ad 100644 --- a/streamflow/recovery/failure_manager.py +++ b/streamflow/recovery/failure_manager.py @@ -3,13 +3,12 @@ import asyncio import logging from collections.abc import MutableMapping -from typing import cast - from importlib.resources import files +from typing import cast from streamflow.core.command import CommandOutput from streamflow.core.context import StreamFlowContext -from streamflow.core.deployment import Connector, ExecutionLocation +from streamflow.core.deployment import ExecutionLocation from streamflow.core.exception import ( FailureHandlingException, UnrecoverableTokenException, @@ -22,18 +21,27 @@ Token, TokenProcessor, ) -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.log_handler import logger from streamflow.recovery.recovery import JobVersion from streamflow.workflow.step import ExecuteStep async def _cleanup_dir( - connector: Connector, location: ExecutionLocation, directory: str + context: StreamFlowContext, location: ExecutionLocation, directory: str ) -> None: - await remotepath.rm( - connector, location, await remotepath.listdir(connector, location, directory) - ) + path = StreamFlowPath(directory, context=context, location=location) + async for dirpath, dirnames, filenames in path.walk(): + await asyncio.gather( + *( + asyncio.create_task((dirpath / filename).rmtree()) + for filename in filenames + ) + ) + await asyncio.gather( + *(asyncio.create_task((dirpath / dirname).rmtree()) for dirname in dirnames) + ) + break async def _replace_token( @@ -114,7 +122,7 @@ async def _replay_job(self, job_version: JobVersion) -> CommandOutput: for directory in [job.output_directory, job.tmp_directory]: cleanup_tasks.append( asyncio.create_task( - _cleanup_dir(connector, location, directory) + _cleanup_dir(self.context, location, directory) ) ) self.context.data_manager.invalidate_location( diff --git a/streamflow/scheduling/policy/data_locality.py b/streamflow/scheduling/policy/data_locality.py index fefcd874..0c3589fb 100644 --- a/streamflow/scheduling/policy/data_locality.py +++ b/streamflow/scheduling/policy/data_locality.py @@ -2,9 +2,8 @@ import asyncio from collections.abc import MutableMapping -from typing import TYPE_CHECKING - from importlib.resources import files +from typing import TYPE_CHECKING from streamflow.core.context import StreamFlowContext from streamflow.core.data import DataType diff --git a/streamflow/scheduling/scheduler.py b/streamflow/scheduling/scheduler.py index 36207d33..08665c71 100644 --- a/streamflow/scheduling/scheduler.py +++ b/streamflow/scheduling/scheduler.py @@ -6,9 +6,8 @@ import os import posixpath from collections.abc import MutableMapping, MutableSequence -from typing import TYPE_CHECKING, cast - from importlib.resources import files +from typing import TYPE_CHECKING, cast from streamflow.core.config import BindingConfig, Config from streamflow.core.deployment import BindingFilter, Connector, FilterConfig, Target @@ -461,7 +460,9 @@ async def notify_status(self, job_name: str, status: Status) -> None: ) for k, size in ( await remotepath.get_storage_usages( - conn, loc, job_hardware + self.context, + loc, + job_hardware, ) ).items() } diff --git a/streamflow/workflow/combinator.py b/streamflow/workflow/combinator.py index c9d646c8..07e73a26 100644 --- a/streamflow/workflow/combinator.py +++ b/streamflow/workflow/combinator.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import deque -from collections.abc import MutableMapping, MutableSequence, AsyncIterable +from collections.abc import AsyncIterable, MutableMapping, MutableSequence from typing import Any, cast from streamflow.core import utils diff --git a/streamflow/workflow/executor.py b/streamflow/workflow/executor.py index 8c090c9e..ea684125 100644 --- a/streamflow/workflow/executor.py +++ b/streamflow/workflow/executor.py @@ -2,7 +2,7 @@ import asyncio import time -from collections.abc import MutableSequence, MutableMapping +from collections.abc import MutableMapping, MutableSequence from typing import TYPE_CHECKING, cast from streamflow.core import utils diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index 903c939e..fac655a7 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -6,7 +6,7 @@ import posixpath from abc import ABC, abstractmethod from collections import deque -from collections.abc import MutableSequence, MutableMapping, AsyncIterable, MutableSet +from collections.abc import AsyncIterable, MutableMapping, MutableSequence, MutableSet from types import ModuleType from typing import Any, cast @@ -35,7 +35,7 @@ Token, Workflow, ) -from streamflow.data import remotepath +from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_path_processor from streamflow.log_handler import logger from streamflow.workflow.port import ConnectorPort, JobPort @@ -1364,14 +1364,14 @@ async def _propagate_job( if not self.workflow.context.data_manager.get_data_locations( directory, location.deployment, location.name ): - realpath = await remotepath.follow_symlink( - self.workflow.context, connector, location, directory - ) - if realpath != directory: + realpath = await StreamFlowPath( + directory, context=self.workflow.context, location=location + ).resolve() + if str(realpath) != directory: self.workflow.context.data_manager.register_path( location=location, - path=realpath, - relpath=realpath, + path=str(realpath), + relpath=str(realpath), ) self.workflow.context.data_manager.register_path( location=location, @@ -1379,7 +1379,7 @@ async def _propagate_job( relpath=directory, data_type=( DataType.PRIMARY - if realpath == directory + if str(realpath) == directory else DataType.SYMBOLIC_LINK ), ) @@ -1443,31 +1443,40 @@ async def _set_job_directories( path_processor, job.tmp_directory, allocation.target ) # Create directories - await asyncio.gather( - *( - asyncio.create_task( - remotepath.mkdir( - connector=connector, - location=location, - path=[ - job.input_directory, - job.output_directory, - job.tmp_directory, - ], + create_tasks = [] + for location in locations: + for directory in [ + job.input_directory, + job.output_directory, + job.tmp_directory, + ]: + create_tasks.append( + asyncio.create_task( + StreamFlowPath( + directory, context=self.workflow.context, location=location + ).mkdir(mode=0o777, parents=True, exist_ok=True) + ) + ) + await asyncio.gather(*create_tasks) + job.input_directory, job.output_directory, job.tmp_directory = ( + str(p) + for p in await asyncio.gather( + *( + asyncio.create_task( + StreamFlowPath( + directory, + context=self.workflow.context, + location=next(iter(locations)), + ).resolve() + ) + for directory in ( + job.input_directory, + job.output_directory, + job.tmp_directory, ) ) - for location in locations ) ) - job.input_directory = await remotepath.follow_symlink( - self.workflow.context, connector, locations[0], job.input_directory - ) - job.output_directory = await remotepath.follow_symlink( - self.workflow.context, connector, locations[0], job.output_directory - ) - job.tmp_directory = await remotepath.follow_symlink( - self.workflow.context, connector, locations[0], job.tmp_directory - ) def get_output_port(self, name: str | None = None) -> JobPort: return cast(JobPort, super().get_output_port(name)) diff --git a/streamflow/workflow/token.py b/streamflow/workflow/token.py index c23db88f..320adc98 100644 --- a/streamflow/workflow/token.py +++ b/streamflow/workflow/token.py @@ -9,7 +9,7 @@ from streamflow.core.context import StreamFlowContext from streamflow.core.exception import WorkflowExecutionException from streamflow.core.persistence import DatabaseLoadingContext -from streamflow.core.workflow import Job, Token, Status +from streamflow.core.workflow import Job, Status, Token class IterationTerminationToken(Token): diff --git a/tests/test_build_wf.py b/tests/test_build_wf.py index ecfc2434..19603fa5 100644 --- a/tests/test_build_wf.py +++ b/tests/test_build_wf.py @@ -1,4 +1,4 @@ -from typing import Type, cast +from typing import cast import pytest @@ -74,7 +74,7 @@ async def _clone_step(step, workflow, context): return new_workflow, new_step -async def _general_test_port(context: StreamFlowContext, cls_port: Type[Port]): +async def _general_test_port(context: StreamFlowContext, cls_port: type[Port]): workflow, ports = await create_workflow(context) port = workflow.create_port(cls_port) await workflow.save(context) @@ -325,7 +325,7 @@ async def test_schedule_step(context: StreamFlowContext): @pytest.mark.asyncio @pytest.mark.parametrize("port_cls", [Port, JobPort, ConnectorPort]) -async def test_port(context: StreamFlowContext, port_cls: Type[Port]): +async def test_port(context: StreamFlowContext, port_cls: type[Port]): """Test saving Port on database and re-load it in a new Workflow""" await _general_test_port(context, port_cls) diff --git a/tests/test_cwl_build_wf.py b/tests/test_cwl_build_wf.py index 37eaf980..fa7e05ac 100644 --- a/tests/test_cwl_build_wf.py +++ b/tests/test_cwl_build_wf.py @@ -1,5 +1,5 @@ import posixpath -from typing import Type, cast +from typing import cast import pytest @@ -45,7 +45,7 @@ @pytest.mark.asyncio @pytest.mark.parametrize("step_cls", [CWLLoopOutputAllStep, CWLLoopOutputLastStep]) -async def test_cwl_loop_output(context: StreamFlowContext, step_cls: Type[Step]): +async def test_cwl_loop_output(context: StreamFlowContext, step_cls: type[Step]): """Test saving CWLLoopOutputAllStep on database and re-load it in a new Workflow""" workflow, _ = await create_workflow(context, num_port=0) await _base_step_test_process( @@ -181,7 +181,7 @@ async def test_loop_value_from_transformer(context: StreamFlowContext): [AllNonNullTransformer, FirstNonNullTransformer, OnlyNonNullTransformer], ) async def test_non_null_transformer( - context: StreamFlowContext, transformer_cls: Type[Step] + context: StreamFlowContext, transformer_cls: type[Step] ): """Test saving All/First/Only NonNullTransformer on database and re-load it in a new Workflow""" workflow, _ = await create_workflow(context, num_port=0) diff --git a/tests/test_data_manager.py b/tests/test_data_manager.py index 68390431..f65bb3b5 100644 --- a/tests/test_data_manager.py +++ b/tests/test_data_manager.py @@ -1,9 +1,6 @@ from __future__ import annotations -import os -import posixpath import tempfile -from pathlib import PurePath import pytest import pytest_asyncio @@ -11,8 +8,7 @@ from streamflow.core import utils from streamflow.core.data import DataLocation, DataType from streamflow.core.deployment import Connector, ExecutionLocation -from streamflow.data import remotepath -from streamflow.deployment.utils import get_path_processor +from streamflow.data.remotepath import StreamFlowPath from tests.utils.deployment import get_location @@ -63,70 +59,63 @@ async def test_data_locations( context, src_connector, src_location, dst_connector, dst_location ): """Test the existence of data locations after the transfer data""" - src_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name(), utils.random_name()) - if src_location.local - else posixpath.join("/tmp", utils.random_name(), utils.random_name()) + src_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=src_location, ) - dst_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if dst_location.local - else posixpath.join("/tmp", utils.random_name()) + dst_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=dst_location, ) # Create working directories in src and dst locations - await remotepath.mkdir(src_connector, src_location, str(PurePath(src_path).parent)) - await remotepath.mkdir(dst_connector, dst_location, dst_path) + await src_path.parent.mkdir(mode=0o777, exist_ok=True) + await dst_path.mkdir(mode=0o777) try: - await remotepath.write( - src_connector, - src_location, - src_path, - "StreamFlow", - ) - src_path = await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ) + await src_path.write_text("StreamFlow") + src_path = await src_path.resolve() context.data_manager.register_path( location=src_location, - path=await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ), - relpath=src_path, + path=str(src_path), + relpath=str(src_path), data_type=DataType.PRIMARY, ) # Check src data locations - path = get_path_processor(src_connector).sep - for basename in PurePath(src_path).parts: - path = os.path.join(path, basename) + path = StreamFlowPath(context=context, location=src_location) + for part in src_path.parts: + path /= part data_locs = context.data_manager.get_data_locations( - path, src_connector.deployment_name + str(path), src_connector.deployment_name ) assert len(data_locs) == 1 - assert data_locs[0].path == path + assert data_locs[0].path == str(path) assert data_locs[0].deployment == src_connector.deployment_name # Transfer data from src to dst await context.data_manager.transfer_data( src_location=src_location, - src_path=src_path, + src_path=str(src_path), dst_locations=[dst_location], - dst_path=dst_path, + dst_path=str(dst_path), writable=False, ) # Check dst data locations - path = get_path_processor(dst_connector).sep - for basename in PurePath(dst_path).parts: - path = os.path.join(path, basename) + path = StreamFlowPath(context=context, location=src_location) + for part in dst_path.parts: + path /= part data_locs = context.data_manager.get_data_locations( - path, dst_connector.deployment_name + str(path), dst_connector.deployment_name ) assert len(data_locs) in [1, 2] if len(data_locs) == 1: - assert data_locs[0].path == path + assert data_locs[0].path == str(path) assert data_locs[0].deployment == dst_connector.deployment_name elif len(data_locs) == 2: # src and dst are on the same location. So dst will be a symbolic link @@ -149,58 +138,60 @@ async def test_data_locations( for loc in data_locs if loc.data_type == DataType.SYMBOLIC_LINK and _contains_location(loc, dst_location) - and loc.path == path + and loc.path == str(path) ] ) == 1 ) finally: - await remotepath.rm(src_connector, src_location, src_path) - await remotepath.rm(dst_connector, dst_location, dst_path) + await src_path.rmtree() + await dst_path.rmtree() @pytest.mark.asyncio async def test_invalidate_location(context, src_connector, src_location): """Test the invalidation of a location""" - path_processor = get_path_processor(src_connector) - # Remote location are linux-like environments, so they have Posix paths - src_path = path_processor.join( - path_processor.sep, "tmp", utils.random_name(), utils.random_name() + src_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + utils.random_name(), + context=context, + location=src_location, ) - context.data_manager.register_path( location=src_location, - path=src_path, - relpath=src_path, + path=str(src_path), + relpath=str(src_path), data_type=DataType.PRIMARY, ) # Check initial data location - path = path_processor.sep - for basename in PurePath(src_path).parts: - path = path_processor.join(path, basename) + path = StreamFlowPath(context=context, location=src_location) + for part in src_path.parts: + path /= part data_locs = context.data_manager.get_data_locations( - path, src_connector.deployment_name + str(path), src_connector.deployment_name ) assert len(data_locs) == 1 - assert data_locs[0].path == path + assert data_locs[0].path == str(path) assert data_locs[0].deployment == src_connector.deployment_name # Invalidate location - root_data_loc = context.data_manager.get_data_locations( - path_processor.sep, src_connector.deployment_name - )[0] + root_data_loc = next( + iter( + context.data_manager.get_data_locations( + path.root, src_connector.deployment_name + ) + ) + ) context.data_manager.invalidate_location(root_data_loc.location, root_data_loc.path) # Check data manager has invalidated the location - path = path_processor.sep - for basename in PurePath(src_path).parts: - path = path_processor.join(path, basename) + path = StreamFlowPath(context=context, location=src_location) + for part in src_path.parts: + path /= part data_locs = context.data_manager.get_data_locations( - path, src_connector.deployment_name + str(path), src_connector.deployment_name ) # The data location of the root is not invalidated - if basename == path_processor.sep: - assert len(data_locs) == 1 - else: - assert len(data_locs) == 0 + assert len(data_locs) == (1 if path == path.parent else 0) diff --git a/tests/test_remotepath.py b/tests/test_remotepath.py index eb3b2de0..529d9e60 100644 --- a/tests/test_remotepath.py +++ b/tests/test_remotepath.py @@ -1,40 +1,18 @@ from __future__ import annotations -import errno -import os -import posixpath import tempfile import pytest import pytest_asyncio from streamflow.core import utils -from streamflow.core.data import FileType from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.exception import WorkflowExecutionException from streamflow.data import remotepath -from streamflow.deployment.utils import get_path_processor +from streamflow.data.remotepath import StreamFlowPath from tests.utils.deployment import get_docker_deployment_config, get_location -async def _symlink( - connector: Connector, location: ExecutionLocation | None, src: str, path: str -) -> None: - if location.local: - src = os.path.abspath(src) - if os.path.isdir(path): - path = os.path.join(path, os.path.basename(src)) - try: - os.symlink( - os.path.abspath(src), path, target_is_directory=os.path.isdir(src) - ) - except OSError as e: - if not e.errno == errno.EEXIST: - raise - else: - await connector.run(location=location, command=["ln", "-snf", src, path]) - - @pytest_asyncio.fixture(scope="module") async def location(context, deployment_src) -> ExecutionLocation: return await get_location(context, deployment_src) @@ -48,40 +26,32 @@ def connector(context, location) -> Connector: @pytest.mark.asyncio async def test_directory(context, connector, location): """Test directory creation and deletion.""" - path = utils.random_name() + path = StreamFlowPath(utils.random_name(), context=context, location=location) try: - await remotepath.mkdir(connector, location, path) - assert await remotepath.exists(connector, location, path) - assert await remotepath.isdir(connector, location, path) + await path.mkdir(mode=0o777) + assert await path.exists() + assert await path.is_dir() # ./ # file1.txt # file2.csv # dir1/ # dir2/ - await remotepath.mkdir( - connector, - location, - [posixpath.join(path, "dir1"), posixpath.join(path, "dir2")], - ) - await remotepath.write( - connector, location, posixpath.join(path, "file1.txt"), "StreamFlow" - ) - await remotepath.write( - connector, location, posixpath.join(path, "file2.csv"), "StreamFlow" - ) - files = await remotepath.listdir(connector, location, path, FileType.FILE) - assert len(files) == 2 - assert posixpath.join(path, "file1.txt") in files - assert posixpath.join(path, "file2.csv") in files - dirs = await remotepath.listdir(connector, location, path, FileType.DIRECTORY) - assert len(dirs) == 2 - assert posixpath.join(path, "dir1") in dirs - assert posixpath.join(path, "dir2") in dirs - await remotepath.rm(connector, location, path) - assert not await remotepath.exists(connector, location, path) + await (path / "dir1").mkdir(mode=0o777) + await (path / "dir2").mkdir(mode=0o777) + await (path / "file1.txt").write_text("StreamFlow") + await (path / "file2.csv").write_text("StreamFlow") + async for dirpath, dirnames, filenames in path.walk(): + assert len(dirnames) == 2 + assert "dir1" in dirnames + assert "dir2" in dirnames + assert len(filenames) == 2 + assert "file1.txt" in filenames + assert "file2.csv" in filenames + break + await path.rmtree() + assert not await path.exists() finally: - if await remotepath.exists(connector, location, path): - await remotepath.rm(connector, location, path) + await path.rmtree() @pytest.mark.asyncio @@ -91,75 +61,53 @@ async def test_download(context, connector, location): "https://raw.githubusercontent.com/alpha-unito/streamflow/master/LICENSE", "https://github.com/alpha-unito/streamflow/archive/refs/tags/0.1.6.zip", ] - parent_dir = tempfile.gettempdir() if location.local else "/tmp" - path_processor = get_path_processor(connector) + parent_dir = StreamFlowPath( + tempfile.gettempdir() if location.local else "/tmp", + context=context, + location=location, + ) paths = [ - path_processor.join(parent_dir, "LICENSE"), - path_processor.join(parent_dir, "streamflow-0.1.6.zip"), + parent_dir / "LICENSE", + parent_dir / "streamflow-0.1.6.zip", ] - path = None for i, url in enumerate(urls): try: - path = await remotepath.download(connector, location, url, parent_dir) + path = await remotepath.download(context, location, url, str(parent_dir)) assert path == paths[i] - assert await remotepath.exists(connector, location, path) + assert await path.exists() finally: - if path and await remotepath.exists(connector, location, path): - await remotepath.rm(connector, location, path) + await path.rmtree() @pytest.mark.asyncio async def test_file(context, connector, location): """Test file creation, size, checksum and deletion.""" - path = utils.random_name() - path2 = utils.random_name() + path = StreamFlowPath(utils.random_name(), context=context, location=location) + path2 = StreamFlowPath(utils.random_name(), context=context, location=location) try: - await remotepath.write(connector, location, path, "StreamFlow") - assert await remotepath.exists(connector, location, path) - assert await remotepath.isfile(connector, location, path) - assert await remotepath.size(connector, location, path) == 10 - await remotepath.write(connector, location, path2, "CWL") - assert await remotepath.exists(connector, location, path2) - assert await remotepath.size(connector, location, [path, path2]) == 13 - digest = await remotepath.checksum(context, connector, location, path) - assert digest == "e8abb7445e1c4061c3ef39a0e1690159b094d3b5" - await remotepath.rm(connector, location, [path, path2]) - assert not await remotepath.exists(connector, location, path) - assert not await remotepath.exists(connector, location, path2) + await path.write_text("StreamFlow") + assert await path.exists() + assert await path.is_file() + assert await path.size() == 10 + await path2.write_text("CWL") + assert await path2.exists() + assert await path.size() + await path2.size() == 13 + assert await path.checksum() == "e8abb7445e1c4061c3ef39a0e1690159b094d3b5" + await path.rmtree() + await path2.rmtree() + assert not await path.exists() + assert not await path2.exists() finally: - if await remotepath.exists(connector, location, path): - await remotepath.rm(connector, location, path) - if await remotepath.exists(connector, location, path2): - await remotepath.rm(connector, location, path2) - - -@pytest.mark.asyncio -async def test_mkdir_failure(context): - """Test on `mkdirs` function failure""" - deployment_config = get_docker_deployment_config() - connector = context.deployment_manager.get_connector(deployment_config.name) - location = await get_location(context, deployment_config.type) - - # Create a file and try to create a directory with the same name - path = utils.random_name() - await remotepath.write(connector, location, path, "StreamFlow") - with pytest.raises(WorkflowExecutionException) as err: - await remotepath.mkdir( - connector, - location, - path, - ) - expected_msg_err = f"1 Command 'mkdir -p {path}' on location {location}: mkdir: can't create directory '{path}': File exists" - assert str(err.value) == expected_msg_err + await path.rmtree() + await path2.rmtree() @pytest.mark.asyncio -async def test_resolve(context, connector, location): +async def test_glob(context, connector, location): """Test glob resolution.""" - path_processor = get_path_processor(connector) - path = utils.random_name() - await remotepath.mkdir(connector, location, path) + path = StreamFlowPath(utils.random_name(), context=context, location=location) + await path.mkdir(mode=0o777) try: # ./ # file1.txt @@ -170,96 +118,68 @@ async def test_resolve(context, connector, location): # dir2/ # file1.txt # file2.csv - await remotepath.write( - connector, location, path_processor.join(path, "file1.txt"), "StreamFlow" - ) - await remotepath.write( - connector, location, path_processor.join(path, "file2.csv"), "StreamFlow" - ) - await remotepath.mkdir( - connector, location, path_processor.join(path, "dir1", "dir2") - ) - await remotepath.write( - connector, - location, - path_processor.join(path, "dir1", "file1.txt"), - "StreamFlow", - ) - await remotepath.write( - connector, - location, - path_processor.join(path, "dir1", "file2.csv"), - "StreamFlow", - ) - await remotepath.write( - connector, - location, - path_processor.join(path, "dir1", "dir2", "file1.txt"), - "StreamFlow", - ) - await remotepath.write( - connector, - location, - path_processor.join(path, "dir1", "dir2", "file2.csv"), - "StreamFlow", - ) + await (path / "file1.txt").write_text("StreamFlow") + await (path / "file2.csv").write_text("StreamFlow") + await (path / "dir1" / "dir2").mkdir(mode=0o777, parents=True) + await (path / "dir1" / "file1.txt").write_text("StreamFlow") + await (path / "dir1" / "file2.csv").write_text("StreamFlow") + await (path / "dir1" / "dir2" / "file1.txt").write_text("StreamFlow") + await (path / "dir1" / "dir2" / "file2.csv").write_text("StreamFlow") # Test *.txt - result = await remotepath.resolve( - connector, location, path_processor.join(path, "*.txt") - ) + result = [p async for p in path.glob("*.txt")] assert len(result) == 1 - assert path_processor.join(path, "file1.txt") in result + assert path / "file1.txt" in result # Test file* - result = await remotepath.resolve( - connector, location, path_processor.join(path, "file*") - ) + result = [p async for p in path.glob("file*")] assert len(result) == 2 - assert path_processor.join(path, "file1.txt") in result - assert path_processor.join(path, "file2.csv") in result + assert path / "file1.txt" in result + assert path / "file2.csv" in result # Test */*.txt - result = await remotepath.resolve( - connector, location, path_processor.join(path, "*/*.txt") - ) + result = [p async for p in path.glob("*/*.txt")] assert len(result) == 1 - assert path_processor.join(path, "dir1", "file1.txt") in result + assert path / "dir1" / "file1.txt" in result finally: - await remotepath.rm(connector, location, path) + await path.rmtree() + + +@pytest.mark.asyncio +async def test_mkdir_failure(context): + """Test on `mkdir` function failure""" + deployment_config = get_docker_deployment_config() + location = await get_location(context, deployment_config.type) + + # Create a file and try to create a directory with the same name + path = StreamFlowPath(utils.random_name(), context=context, location=location) + await path.write_text("StreamFlow") + with pytest.raises(WorkflowExecutionException) as err: + await path.mkdir(mode=0o777) + expected_msg_err = f"1 Command 'mkdir -m 700 {path}' on location {location}: mkdir: can't create directory '{path}': File exists" + assert str(err.value) == expected_msg_err @pytest.mark.asyncio async def test_symlink(context, connector, location): """Test symlink creation, resolution and deletion.""" - src = utils.random_name() - path = utils.random_name() - path_processor = get_path_processor(connector) + src = StreamFlowPath(utils.random_name(), context=context, location=location) + path = StreamFlowPath(utils.random_name(), context=context, location=location) try: # Test symlink to file - await remotepath.write(connector, location, src, "StreamFlow") - await _symlink(connector, location, src, path) - assert await remotepath.exists(connector, location, path) - assert await remotepath.islink(connector, location, path) - assert ( - path_processor.basename( - await remotepath.follow_symlink(context, connector, location, path) - ) - == src - ) - await remotepath.rm(connector, location, path) - assert not await remotepath.exists(connector, location, path) - await remotepath.rm(connector, location, src) + await src.write_text("StreamFlow") + await path.symlink_to(src) + assert await path.exists() + assert await path.is_symlink() + assert (await path.resolve()).name == str(src) + await path.rmtree() + assert not await path.exists() + await src.rmtree() # Test symlink to directory - await remotepath.mkdir(connector, location, src) - await _symlink(connector, location, src, path) - assert await remotepath.exists(connector, location, path) - assert await remotepath.islink(connector, location, path) - assert ( - path_processor.basename( - await remotepath.follow_symlink(context, connector, location, path) - ) - == src - ) - await remotepath.rm(connector, location, path) - assert not await remotepath.exists(connector, location, path) + await src.mkdir(mode=0o777) + await path.symlink_to(src, target_is_directory=True) + assert await path.exists() + assert await path.is_symlink() + assert (await path.resolve()).name == str(src) + await path.rmtree() + assert not await path.exists() finally: - await remotepath.rm(connector, location, path) - await remotepath.rm(connector, location, src) + await path.rmtree() + await src.rmtree() diff --git a/tests/test_transfer.py b/tests/test_transfer.py index 52ee0356..12c0fd66 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -1,16 +1,14 @@ import asyncio import os -import posixpath import tempfile import pytest from streamflow.core import utils from streamflow.core.context import StreamFlowContext -from streamflow.core.data import DataType, FileType +from streamflow.core.data import DataType from streamflow.core.deployment import Connector, ExecutionLocation -from streamflow.data import remotepath -from streamflow.deployment.utils import get_path_processor +from streamflow.data.remotepath import StreamFlowPath from tests.utils.deployment import get_location @@ -18,56 +16,22 @@ async def _compare_remote_dirs( context: StreamFlowContext, src_connector: Connector, src_location: ExecutionLocation, - src_path: str, + src_path: StreamFlowPath, dst_connector: Connector, dst_location: ExecutionLocation, - dst_path: str, + dst_path: StreamFlowPath, ): - assert await remotepath.exists(dst_connector, dst_location, dst_path) + assert await dst_path.exists() # the two dirs must have the same elements order - src_files, dst_files = await asyncio.gather( - asyncio.create_task( - remotepath.listdir(src_connector, src_location, src_path, FileType.FILE) - ), - asyncio.create_task( - remotepath.listdir(dst_connector, dst_location, dst_path, FileType.FILE) - ), - ) + src_path, src_dirs, src_files = await src_path.walk().__anext__() + dst_path, dst_dirs, dst_files = await dst_path.walk().__anext__() assert len(src_files) == len(dst_files) for src_file, dst_file in zip(sorted(src_files), sorted(dst_files)): - checksums = await asyncio.gather( - asyncio.create_task( - remotepath.checksum( - context, - src_connector, - src_location, - get_path_processor(src_connector).join(src_path, src_file), - ) - ), - asyncio.create_task( - remotepath.checksum( - context, - dst_connector, - dst_location, - get_path_processor(dst_connector).join(dst_path, dst_file), - ) - ), + assert ( + await (src_path / src_file).checksum() + == await (dst_path / dst_file).checksum() ) - assert checksums[0] == checksums[1] - - src_dirs, dst_dirs = await asyncio.gather( - asyncio.create_task( - remotepath.listdir( - src_connector, src_location, src_path, FileType.DIRECTORY - ) - ), - asyncio.create_task( - remotepath.listdir( - dst_connector, dst_location, dst_path, FileType.DIRECTORY - ) - ), - ) assert len(src_dirs) == len(dst_dirs) tasks = [] for src_dir, dst_dir in zip(sorted(src_dirs), sorted(dst_dirs)): @@ -78,41 +42,37 @@ async def _compare_remote_dirs( context, src_connector, src_location, - src_dir, + src_path / src_dir, dst_connector, dst_location, - dst_dir, + dst_path / dst_dir, ) ) ) await asyncio.gather(*tasks) -async def _create_tmp_dir(context, connector, location, root=None, lvl=None, n_files=0): - path_processor = get_path_processor(connector) +async def _create_tmp_dir( + context, connector, location, root=None, lvl=None, n_files=0 +) -> StreamFlowPath: dir_lvl = f"-{lvl}" if lvl else "" - dir_path = ( - os.path.join( - root if root else tempfile.gettempdir(), - f"dir{dir_lvl}-{utils.random_name()}", - ) - if location.local - else os.path.join( - root if root else "/tmp", f"dir{dir_lvl}-{utils.random_name()}" - ) + dir_path = StreamFlowPath( + ( + root + if root is not None + else tempfile.gettempdir() if location.local else "/tmp" + ), + f"dir{dir_lvl}-{utils.random_name()}", + context=context, + location=location, ) - await remotepath.mkdir(connector, location, dir_path) + await dir_path.mkdir(mode=0o777, parents=True) - dir_path = await remotepath.follow_symlink(context, connector, location, dir_path) + dir_path = await dir_path.resolve() file_lvl = f"-{lvl}" if lvl else "" for i in range(n_files): file_name = f"file{file_lvl}-{i}-{utils.random_name()}" - await remotepath.write( - connector, - location, - path_processor.join(dir_path, file_name), - f"Hello from {file_name}", - ) + await (dir_path / file_name).write_text(f"Hello from {file_name}") return dir_path @@ -170,36 +130,35 @@ async def test_directory_to_directory( n_files=2, lvl=f"{i}-0", ) - src_path = await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ) + src_path = await src_path.resolve() # dst init - dst_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if dst_location.local - else posixpath.join("/tmp", utils.random_name()) + dst_path = StreamFlowPath( + tempfile.gettempdir() if dst_location.local else "/tmp", + utils.random_name(), + context=context, + location=dst_location, ) # save src_path into StreamFlow context.data_manager.register_path( location=src_location, - path=src_path, - relpath=src_path, + path=str(src_path), + relpath=str(src_path), data_type=DataType.PRIMARY, ) # transfer src_path to dst_path await context.data_manager.transfer_data( src_location=src_location, - src_path=src_path, + src_path=str(src_path), dst_locations=[dst_location], - dst_path=dst_path, + dst_path=str(dst_path), writable=False, ) # check if dst exists - await remotepath.exists(dst_connector, dst_location, dst_path) + await dst_path.exists() # check that src and dst have the same sub dirs and files await _compare_remote_dirs( @@ -212,10 +171,8 @@ async def test_directory_to_directory( dst_path, ) finally: - if src_path: - await remotepath.rm(src_connector, src_location, src_path) - if dst_path: - await remotepath.rm(dst_connector, dst_location, dst_path) + await src_path.rmtree() + await dst_path.rmtree() @pytest.mark.asyncio @@ -224,59 +181,43 @@ async def test_file_to_directory( ) -> None: """Test transferring a file from one location to a directory into another location.""" src_location = await get_location(context, communication_pattern[0]) - src_connector = context.deployment_manager.get_connector(src_location.deployment) dst_location = await get_location(context, communication_pattern[1]) - dst_connector = context.deployment_manager.get_connector(dst_location.deployment) - src_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if src_location.local - else posixpath.join("/tmp", utils.random_name()) + src_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=src_location, ) - dst_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if dst_location.local - else posixpath.join("/tmp", utils.random_name()) + dst_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=dst_location, ) - await remotepath.mkdir(dst_connector, dst_location, dst_path) + await dst_path.mkdir(mode=0o777, exist_ok=True) try: - await remotepath.write(src_connector, src_location, src_path, "StreamFlow") - src_path = await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ) - src_digest = await remotepath.checksum( - context, src_connector, src_location, src_path - ) - src_name = get_path_processor(src_connector).basename(src_path) + await src_path.write_text("StreamFlow") + src_path = await src_path.resolve() context.data_manager.register_path( location=src_location, - path=await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ), - relpath=src_path, + path=str(src_path), + relpath=str(src_path), data_type=DataType.PRIMARY, ) await context.data_manager.transfer_data( src_location=src_location, - src_path=src_path, + src_path=str(src_path), dst_locations=[dst_location], - dst_path=dst_path, + dst_path=str(dst_path), writable=False, ) - path_processor = get_path_processor(dst_connector) - assert await remotepath.exists( - dst_connector, dst_location, path_processor.join(dst_path, src_name) - ) - dst_digest = await remotepath.checksum( - context, - dst_connector, - dst_location, - path_processor.join(dst_path, src_name), - ) - assert src_digest == dst_digest + dst_file = dst_path / src_path.name + assert await dst_file.exists() + assert await src_path.checksum() == await dst_file.checksum() finally: - await remotepath.rm(src_connector, src_location, src_path) - await remotepath.rm(dst_connector, dst_location, dst_path) + await src_path.rmtree() + await dst_path.rmtree() @pytest.mark.asyncio @@ -285,55 +226,41 @@ async def test_file_to_file( ): """Test transferring a file from one location to another.""" src_location = await get_location(context, communication_pattern[0]) - src_connector = context.deployment_manager.get_connector(src_location.deployment) dst_location = await get_location(context, communication_pattern[1]) - dst_connector = context.deployment_manager.get_connector(dst_location.deployment) - src_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if src_location.local - else posixpath.join("/tmp", utils.random_name()) + + src_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=src_location, ) - dst_path = ( - os.path.join(tempfile.gettempdir(), utils.random_name()) - if dst_location.local - else posixpath.join("/tmp", utils.random_name()) + dst_path = StreamFlowPath( + tempfile.gettempdir() if src_location.local else "/tmp", + utils.random_name(), + context=context, + location=dst_location, ) try: - await remotepath.write( - src_connector, - src_location, - src_path, - "StreamFlow", - ) - src_path = await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ) - src_digest = await remotepath.checksum( - context, src_connector, src_location, src_path - ) + await src_path.write_text("StreamFlow") + src_path = await src_path.resolve() context.data_manager.register_path( location=src_location, - path=await remotepath.follow_symlink( - context, src_connector, src_location, src_path - ), - relpath=src_path, + path=str(src_path), + relpath=str(src_path), data_type=DataType.PRIMARY, ) await context.data_manager.transfer_data( src_location=src_location, - src_path=src_path, + src_path=str(src_path), dst_locations=[dst_location], - dst_path=dst_path, + dst_path=str(dst_path), writable=False, ) - assert await remotepath.exists(dst_connector, dst_location, dst_path) - dst_digest = await remotepath.checksum( - context, dst_connector, dst_location, dst_path - ) - assert src_digest == dst_digest + assert await dst_path.exists() + assert await src_path.checksum() == await dst_path.checksum() finally: - await remotepath.rm(src_connector, src_location, src_path) - await remotepath.rm(dst_connector, dst_location, dst_path) + await src_path.rmtree() + await dst_path.rmtree() @pytest.mark.asyncio