Skip to content

Commit

Permalink
Refactored remotepath module (#618)
Browse files Browse the repository at this point in the history
This commit modifies the `remotepath` module by encapsulating the path
management logic inside a `StreamFlowPath` abstract class, which in turn
translates to two alternative concrete implementations:
`LocalStreamFlowPath` for local files and `RemoteStreamFlowPath` for
remote ones. The implemented logic mimics the Python `pathlib` module,
with two important differences:
- All the potentially-remote functions are `async`;
- Some additional functions (e.g., `size` or `checksum`) have been added
  to simplify file management in StreamFlow.
  • Loading branch information
GlassOfWhiskey authored Dec 13, 2024
1 parent 2c8fc7b commit f83c0cb
Show file tree
Hide file tree
Showing 17 changed files with 1,211 additions and 1,081 deletions.
5 changes: 0 additions & 5 deletions streamflow/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ async def transfer_data(
) -> None: ...


class FileType(Enum):
FILE = 1
DIRECTORY = 2


class StreamWrapper(ABC):
def __init__(self, stream: Any):
self.stream: Any = stream
Expand Down
13 changes: 0 additions & 13 deletions streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,6 @@ async def get_remote_to_remote_write_command(
return ["tar", "xf", "-", "-C", posixpath.dirname(dst)]


def get_size(path: str) -> int:
if os.path.isfile(path):
return os.path.getsize(path) if not os.path.islink(path) else 0
else:
total_size = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size


def get_tag(tokens: Iterable[Token]) -> str:
output_tag = "0"
for tag in [t.tag for t in tokens]:
Expand Down
15 changes: 9 additions & 6 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,15 @@ def retag(self, tag: str) -> Token:
async def save(self, context: StreamFlowContext, port_id: int | None = None):
async with self.persistence_lock:
if not self.persistent_id:
self.persistent_id = await context.database.add_token(
port=port_id,
tag=self.tag,
type=type(self),
value=json.dumps(await self._save_value(context)),
)
try:
self.persistent_id = await context.database.add_token(
port=port_id,
tag=self.tag,
type=type(self),
value=json.dumps(await self._save_value(context)),
)
except TypeError as e:
raise WorkflowExecutionException from e

def update(self, value: Any) -> Token:
return self.__class__(tag=self.tag, value=value)
Expand Down
84 changes: 59 additions & 25 deletions streamflow/cwl/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@
)
from streamflow.cwl.step import build_token
from streamflow.cwl.workflow import CWLWorkflow
from streamflow.data import remotepath
from streamflow.data.remotepath import StreamFlowPath
from streamflow.deployment.connector import LocalConnector
from streamflow.deployment.utils import get_path_processor
from streamflow.log_handler import logger
from streamflow.workflow.step import ExecuteStep
from streamflow.workflow.utils import get_token_value


def _adjust_cwl_output(base_path: str, path_processor: ModuleType, value: Any) -> Any:
def _adjust_cwl_output(
base_path: StreamFlowPath, path_processor: ModuleType, value: Any
) -> Any:
if isinstance(value, MutableSequence):
return [_adjust_cwl_output(base_path, path_processor, v) for v in value]
elif isinstance(value, MutableMapping):
if utils.get_token_class(value) in ["File", "Directory"]:
path = utils.get_path_from_token(value)
if not path_processor.isabs(path):
path = path_processor.join(base_path, path)
path = base_path / path
value["path"] = path
value["location"] = f"file://{path}"
return value
Expand Down Expand Up @@ -145,18 +147,25 @@ async def _check_cwl_output(job: Job, step: Step, result: Any) -> Any:
connector = step.workflow.context.scheduler.get_connector(job.name)
locations = step.workflow.context.scheduler.get_locations(job.name)
path_processor = get_path_processor(connector)
cwl_output_path = path_processor.join(job.output_directory, "cwl.output.json")
for location in locations:
if await remotepath.exists(connector, location, cwl_output_path):
cwl_output_path = StreamFlowPath(
job.output_directory,
"cwl.output.json",
context=step.workflow.context,
location=location,
)
if await cwl_output_path.exists():
# If file exists, use its contents as token value
result = json.loads(
await remotepath.read(connector, location, cwl_output_path)
)
result = json.loads(await cwl_output_path.read_text())
# Update step output ports at runtime if needed
if isinstance(result, MutableMapping):
for out_name, out in result.items():
out = _adjust_cwl_output(
base_path=job.output_directory,
base_path=StreamFlowPath(
job.output_directory,
context=step.workflow.context,
location=location,
),
path_processor=path_processor,
value=out,
)
Expand Down Expand Up @@ -281,9 +290,9 @@ async def _prepare_work_dir(
dest_path: str | None = None,
writable: bool = False,
) -> None:
streamflow_context = options.step.workflow.context
connector = streamflow_context.scheduler.get_connector(options.job.name)
locations = streamflow_context.scheduler.get_locations(options.job.name)
context = options.step.workflow.context
connector = context.scheduler.get_connector(options.job.name)
locations = context.scheduler.get_locations(options.job.name)
path_processor = get_path_processor(connector)
# Initialize base path to job output directory if present
base_path = base_path or options.job.output_directory
Expand Down Expand Up @@ -332,7 +341,7 @@ async def _prepare_work_dir(
dest_path = dest_path or path_processor.join(
base_path, path_processor.basename(src_path)
)
await streamflow_context.data_manager.transfer_data(
await context.data_manager.transfer_data(
src_location=selected_location.location,
src_path=selected_location.path,
dst_locations=locations,
Expand All @@ -357,17 +366,20 @@ async def _prepare_work_dir(
await asyncio.gather(
*(
asyncio.create_task(
remotepath.mkdir(connector, location, dest_path)
StreamFlowPath(
dest_path, context=context, location=location
).mkdir(mode=0o777, exist_ok=True)
)
for location in locations
)
)
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=(listing["contents"] if "contents" in listing else ""),
path=dest_path,
relpath=path_processor.relpath(dest_path, base_path),
)
# If `listing` is present, recursively process folder contents
if "listing" in listing:
Expand All @@ -376,7 +388,9 @@ async def _prepare_work_dir(
await asyncio.gather(
*(
asyncio.create_task(
remotepath.mkdir(connector, location, folder_path)
StreamFlowPath(
folder_path, context=context, location=location
).mkdir(mode=0o777, exist_ok=True)
)
for location in locations
)
Expand Down Expand Up @@ -458,10 +472,15 @@ async def _prepare_work_dir(
# If entry is a string, a new text file must be created with the string as the file contents
if isinstance(entry, str):
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=entry,
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# If entry is a list
elif isinstance(entry, MutableSequence):
Expand All @@ -479,10 +498,15 @@ async def _prepare_work_dir(
# Otherwise, the content should be serialised to JSON
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# If entry is a dict
elif isinstance(entry, MutableMapping):
Expand All @@ -498,18 +522,28 @@ async def _prepare_work_dir(
# Otherwise, the content should be serialised to JSON
else:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)
# Every object different from a string should be serialised to JSON
elif entry is not None:
await utils.write_remote_file(
context=streamflow_context,
job=options.job,
context=context,
locations=locations,
content=json.dumps(entry),
path=dest_path or base_path,
relpath=(
path_processor.relpath(dest_path, base_path)
if dest_path
else base_path
),
)


Expand Down
13 changes: 5 additions & 8 deletions streamflow/cwl/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ async def _process_command_output(
command_output: CWLCommandOutput,
connector: Connector | None,
context: MutableMapping[str, Any],
output_directory: str,
):
connector = self._get_connector(connector, job)
locations = await self._get_locations(connector, job)
Expand Down Expand Up @@ -528,7 +527,6 @@ async def _process_command_output(
globpath if isinstance(globpath, MutableSequence) else [globpath]
)
# Resolve glob
path_processor = get_path_processor(connector)
resolve_tasks = []
for location in locations:
for path in globpaths:
Expand All @@ -553,11 +551,7 @@ async def _process_command_output(
if self.target
else job.tmp_directory
),
path=(
path_processor.join(output_directory, path)
if not path_processor.isabs(path)
else path
),
path=cast(str, path),
)
)
)
Expand Down Expand Up @@ -690,7 +684,10 @@ async def process(
hardware=self.workflow.context.scheduler.get_hardware(job.name),
)
token_value = await self._process_command_output(
job, command_output, connector, context, output_directory
job,
command_output,
connector,
context,
)
if isinstance(token_value, MutableSequence):
for value in token_value:
Expand Down
Loading

0 comments on commit f83c0cb

Please sign in to comment.