Skip to content

Commit

Permalink
Refactored remotepath module
Browse files Browse the repository at this point in the history
This commit modifies the `remotepath` module by encapsulating the path
management logic inside a `StreamFlowPath` abstract class, which in turn
translates to two alternative concrete implementations:
`LocalStreamFlowPath` for local files and `RemoteStreamFlowPath` for
remote ones. The implemented logic mimics the Python `pathlib` module,
with two important differences:
- All the potentially-remote functions are `async`;
- Some additional functions (e.g., `size` or `checksum`) have been added
  to simplify fiel management in StreamFlow.
  • Loading branch information
GlassOfWhiskey committed Dec 12, 2024
1 parent 9a3ab42 commit aa768dd
Show file tree
Hide file tree
Showing 44 changed files with 1,250 additions and 1,127 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ format-check:
black --diff --check streamflow tests

pyupgrade:
pyupgrade --py3-only --py38-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr)
pyupgrade --py3-only --py39-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr)

test:
python -m pytest -rs ${PYTEST_EXTRA}
Expand Down
2 changes: 1 addition & 1 deletion streamflow/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import json
import posixpath
from collections.abc import MutableSequence, MutableMapping
from collections.abc import MutableMapping, MutableSequence
from typing import Any, TYPE_CHECKING, cast

from referencing import Registry, Resource
Expand Down
5 changes: 0 additions & 5 deletions streamflow/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ async def transfer_data(
) -> None: ...


class FileType(Enum):
FILE = 1
DIRECTORY = 2


class StreamWrapper(ABC):
def __init__(self, stream: Any):
self.stream: Any = stream
Expand Down
2 changes: 1 addition & 1 deletion streamflow/core/provenance.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import MutableSequence, MutableMapping
from collections.abc import MutableMapping, MutableSequence
from typing import TYPE_CHECKING

from streamflow.core.persistence import DatabaseLoadingContext
Expand Down
6 changes: 3 additions & 3 deletions streamflow/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import os
from abc import ABC, abstractmethod
from collections.abc import (
MutableSequence,
MutableMapping,
Iterable,
Callable,
Iterable,
MutableMapping,
MutableSequence,
MutableSet,
)
from typing import TYPE_CHECKING, cast
Expand Down
15 changes: 1 addition & 14 deletions streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import posixpath
import shlex
import uuid
from collections.abc import MutableSequence, MutableMapping, Iterable
from collections.abc import Iterable, MutableMapping, MutableSequence
from typing import Any, TYPE_CHECKING

from streamflow.core.exception import WorkflowExecutionException
Expand Down Expand Up @@ -242,19 +242,6 @@ async def get_remote_to_remote_write_command(
return ["tar", "xf", "-", "-C", posixpath.dirname(dst)]


def get_size(path: str) -> int:
if os.path.isfile(path):
return os.path.getsize(path) if not os.path.islink(path) else 0
else:
total_size = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size


def get_tag(tokens: Iterable[Token]) -> str:
output_tag = "0"
for tag in [t.tag for t in tokens]:
Expand Down
15 changes: 9 additions & 6 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,15 @@ def retag(self, tag: str) -> Token:
async def save(self, context: StreamFlowContext, port_id: int | None = None):
async with self.persistence_lock:
if not self.persistent_id:
self.persistent_id = await context.database.add_token(
port=port_id,
tag=self.tag,
type=type(self),
value=json.dumps(await self._save_value(context)),
)
try:
self.persistent_id = await context.database.add_token(
port=port_id,
tag=self.tag,
type=type(self),
value=json.dumps(await self._save_value(context)),
)
except TypeError as e:
raise WorkflowExecutionException from e

def update(self, value: Any) -> Token:
return self.__class__(tag=self.tag, value=value)
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/combinator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import MutableSequence, MutableMapping, AsyncIterable
from collections.abc import AsyncIterable, MutableMapping, MutableSequence
from typing import Any, cast

from streamflow.core.context import StreamFlowContext
Expand Down
86 changes: 60 additions & 26 deletions streamflow/cwl/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import shlex
import time
from asyncio.subprocess import STDOUT
from collections.abc import MutableSequence, MutableMapping
from collections.abc import MutableMapping, MutableSequence
from decimal import Decimal
from types import ModuleType
from typing import Any, IO, cast
Expand Down Expand Up @@ -56,22 +56,24 @@
)
from streamflow.cwl.step import build_token
from streamflow.cwl.workflow import CWLWorkflow
from streamflow.data import remotepath
from streamflow.data.remotepath import StreamFlowPath
from streamflow.deployment.connector import LocalConnector
from streamflow.deployment.utils import get_path_processor
from streamflow.log_handler import logger
from streamflow.workflow.step import ExecuteStep
from streamflow.workflow.utils import get_token_value


def _adjust_cwl_output(base_path: str, path_processor: ModuleType, value: Any) -> Any:
def _adjust_cwl_output(
base_path: StreamFlowPath, path_processor: ModuleType, value: Any
) -> Any:
if isinstance(value, MutableSequence):
return [_adjust_cwl_output(base_path, path_processor, v) for v in value]
elif isinstance(value, MutableMapping):
if utils.get_token_class(value) in ["File", "Directory"]:
path = utils.get_path_from_token(value)
if not path_processor.isabs(path):
path = path_processor.join(base_path, path)
path = base_path / path
value["path"] = path
value["location"] = f"file://{path}"
return value
Expand Down Expand Up @@ -154,18 +156,25 @@ async def _check_cwl_output(job: Job, step: Step, result: Any) -> Any:
connector = step.workflow.context.scheduler.get_connector(job.name)
locations = step.workflow.context.scheduler.get_locations(job.name)
path_processor = get_path_processor(connector)
cwl_output_path = path_processor.join(job.output_directory, "cwl.output.json")
for location in locations:
if await remotepath.exists(connector, location, cwl_output_path):
cwl_output_path = StreamFlowPath(
job.output_directory,
"cwl.output.json",
context=step.workflow.context,
location=location,
)
if await cwl_output_path.exists():
# If file exists, use its contents as token value
result = json.loads(
await remotepath.read(connector, location, cwl_output_path)
)
result = json.loads(await cwl_output_path.read_text())
# Update step output ports at runtime if needed
if isinstance(result, MutableMapping):
for out_name, out in result.items():
out = _adjust_cwl_output(
base_path=job.output_directory,
base_path=StreamFlowPath(
job.output_directory,
context=step.workflow.context,
location=location,
),
path_processor=path_processor,
value=out,
)
Expand Down Expand Up @@ -290,9 +299,9 @@ async def _prepare_work_dir(
dest_path: str | None = None,
writable: bool = False,
) -> None:
streamflow_context = options.step.workflow.context
connector = streamflow_context.scheduler.get_connector(options.job.name)
locations = streamflow_context.scheduler.get_locations(options.job.name)
context = options.step.workflow.context
connector = context.scheduler.get_connector(options.job.name)
locations = context.scheduler.get_locations(options.job.name)
path_processor = get_path_processor(connector)
# Initialize base path to job output directory if present
base_path = base_path or options.job.output_directory
Expand Down Expand Up @@ -341,7 +350,7 @@ async def _prepare_work_dir(
dest_path = dest_path or path_processor.join(
base_path, path_processor.basename(src_path)
)
await streamflow_context.data_manager.transfer_data(
await context.data_manager.transfer_data(
src_location=selected_location.location,
src_path=selected_location.path,
dst_locations=locations,
Expand All @@ -366,17 +375,20 @@ async def _prepare_work_dir(
await asyncio.gather(
*(
asyncio.create_task(
remotepath.mkdir(connector, location, dest_path)
StreamFlowPath(
dest_path, context=context, location=location
).mkdir(mode=0o777, exist_ok=True)
)
for location in locations
)
)
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=(listing["contents"] if "contents" in listing else ""),
path=dest_path,
relpath=path_processor.relpath(dest_path, base_path),
)
# If `listing` is present, recursively process folder contents
if "listing" in listing:
Expand All @@ -385,7 +397,9 @@ async def _prepare_work_dir(
await asyncio.gather(
*(
asyncio.create_task(
remotepath.mkdir(connector, location, folder_path)
StreamFlowPath(
folder_path, context=context, location=location
).mkdir(mode=0o777, exist_ok=True)
)
for location in locations
)
Expand Down Expand Up @@ -467,10 +481,15 @@ async def _prepare_work_dir(
# If entry is a string, a new text file must be created with the string as the file contents
if isinstance(entry, str):
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=entry,
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# If entry is a list
elif isinstance(entry, MutableSequence):
Expand All @@ -488,10 +507,15 @@ async def _prepare_work_dir(
# Otherwise, the content should be serialised to JSON
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# If entry is a dict
elif isinstance(entry, MutableMapping):
Expand All @@ -507,18 +531,28 @@ async def _prepare_work_dir(
# Otherwise, the content should be serialised to JSON
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# Every object different from a string should be serialised to JSON
elif entry is not None:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)


Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import math
import os
from collections.abc import MutableSequence, MutableMapping
from collections.abc import MutableMapping, MutableSequence
from typing import Any, TYPE_CHECKING

from streamflow.core.context import StreamFlowContext
Expand Down
15 changes: 6 additions & 9 deletions streamflow/cwl/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import json
import logging
from collections.abc import MutableSequence, MutableMapping, Callable
from collections.abc import Callable, MutableMapping, MutableSequence
from typing import Any, cast

import cwl_utils.file_formats
Expand Down Expand Up @@ -489,7 +489,6 @@ async def _process_command_output(
command_output: CWLCommandOutput,
connector: Connector | None,
context: MutableMapping[str, Any],
output_directory: str,
):
connector = self._get_connector(connector, job)
locations = await self._get_locations(connector, job)
Expand Down Expand Up @@ -532,7 +531,6 @@ async def _process_command_output(
globpath if isinstance(globpath, MutableSequence) else [globpath]
)
# Resolve glob
path_processor = get_path_processor(connector)
resolve_tasks = []
for location in locations:
for path in globpaths:
Expand All @@ -557,11 +555,7 @@ async def _process_command_output(
if self.target
else job.tmp_directory
),
path=(
path_processor.join(output_directory, path)
if not path_processor.isabs(path)
else path
),
path=cast(str, path),
)
)
)
Expand Down Expand Up @@ -694,7 +688,10 @@ async def process(
hardware=self.workflow.context.scheduler.get_hardware(job.name),
)
token_value = await self._process_command_output(
job, command_output, connector, context, output_directory
job,
command_output,
connector,
context,
)
if isinstance(token_value, MutableSequence):
for value in token_value:
Expand Down
1 change: 0 additions & 1 deletion streamflow/cwl/requirement/docker/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import posixpath
from collections.abc import MutableSequence

from importlib.resources import files

from streamflow.core import utils
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/requirement/docker/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

import tempfile

from importlib.resources import files

from jinja2 import Template

from streamflow.core import utils
Expand Down
1 change: 0 additions & 1 deletion streamflow/cwl/requirement/docker/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import posixpath
from collections.abc import MutableSequence

from importlib.resources import files

from streamflow.core import utils
Expand Down
Loading

0 comments on commit aa768dd

Please sign in to comment.