diff --git a/streamflow/data/manager.py b/streamflow/data/manager.py index 7bf707f5..646068e9 100644 --- a/streamflow/data/manager.py +++ b/streamflow/data/manager.py @@ -137,7 +137,7 @@ def get( [ loc for loc in locations - if not (data_type and loc.data_type != data_type) + if not (data_type is not None and loc.data_type != data_type) ] ) return result @@ -337,17 +337,6 @@ async def transfer_data( ) ) # Follow symlink for source path - await asyncio.gather( - *( - asyncio.create_task(src_data_loc.available.wait()) - for src_data_loc in self.get_data_locations( - path=src_path, - deployment=src_connector.deployment_name, - location_name=src_location.name, - data_type=DataType.PRIMARY, - ) - ) - ) if ( src_realpath := await StreamFlowPath( src_path, context=self.context, location=src_location diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index c1ab9db0..7f80c3b4 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -56,6 +56,52 @@ def _get_filename_from_response(response: ClientResponse, url: str): return url.rsplit("/", 1)[-1] +def _get_inner_path( + context: StreamFlowContext, location: ExecutionLocation, path: StreamFlowPath +) -> StreamFlowPath: + for mount in sorted(location.mounts.keys(), reverse=True): + if path.is_relative_to(mount): + return _get_inner_path( + context=context, + location=location.wraps, + path=StreamFlowPath( + location.mounts[mount], context=context, location=location.wraps + ) + / path.relative_to(mount), + ) + return path + + +def _get_outer_path( + context: StreamFlowContext, + location: ExecutionLocation, + path: StreamFlowPath, +): + if ( + isinstance(path, LocalStreamFlowPath) + and location.local + or isinstance(path, RemoteStreamFlowPath) + and path.location == location + ): + return path + else: + path = _get_outer_path( + context=context, + location=location.wraps, + path=path, + ) + for mnt_point, mount in sorted( + location.mounts.items(), key=lambda item: item[1], reverse=True + ): + if path.is_relative_to(mount): + return StreamFlowPath( + mnt_point, context=context, location=location + ) / path.relative_to(mount) + raise WorkflowExecutionException( + f"Failed to find original location for path {str(path)}" + ) + + async def _size( context: StreamFlowContext, location: ExecutionLocation | None, @@ -278,7 +324,7 @@ async def glob( self, pattern, *, case_sensitive=None ) -> AsyncIterator[LocalStreamFlowPath]: for path in glob.glob(str(self / pattern)): - yield LocalStreamFlowPath(path, context=self.context) + yield self.with_segments(path) async def is_dir(self) -> bool: return cast(Path, super()).is_dir() @@ -329,9 +375,7 @@ async def read_text(self, n=-1, encoding=None, errors=None) -> str: async def resolve(self, strict=False) -> LocalStreamFlowPath | None: if await self.exists(): - return LocalStreamFlowPath( - super().resolve(strict=strict), context=self.context - ) + return self.with_segments(super().resolve(strict=strict)) else: return None @@ -398,6 +442,41 @@ def __init__(self, *args, context: StreamFlowContext, location: ExecutionLocatio location.deployment ) self.location: ExecutionLocation = location + self._inner_path: StreamFlowPath | None = None + + async def _get_inner_path(self) -> StreamFlowPath: + if self._inner_path is None: + # Recurse through mount points to find the innermost path (more efficient) + self._inner_path = _get_inner_path( + context=self.context, + location=self.location, + path=self, + ) + if isinstance(self._inner_path, LocalStreamFlowPath) or ( + isinstance(self._inner_path, RemoteStreamFlowPath) + and self._inner_path.location != self.location + ): + path = self._inner_path + while path != path.parent: + if locations := self.context.data_manager.get_data_locations( + path=path.__str__(), + deployment=self.connector.deployment_name, + location_name=self.location.name, + ): + for loc in locations: + await loc.available.wait() + if loc.data_type == DataType.PRIMARY: + break + else: + self._inner_path = _get_inner_path( + context=self.context, + location=self.location, + path=await self.resolve(), + ) + return self._inner_path + else: + path = path.parent + return self._inner_path async def _test(self, command: list[str]) -> bool: command = ["test"] + command @@ -413,93 +492,111 @@ async def _test(self, command: list[str]) -> bool: else: return not status - async def checksum(self): - command = [ - "test", - "-f", - f"'{self.__str__()}'", - "&&", - "sha1sum", - f"'{self.__str__()}'", - "|", - "awk", - "'{print $1}'", - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - if status > 1: - raise WorkflowExecutionException( - "{} Command '{}' on location {}: {}".format( - status, command, self.location, result - ) - ) - return result.strip() - - async def exists(self, *, follow_symlinks=True) -> bool: - return await self._test( - command=( - ["-e", f"'{self.__str__()}'"] - if follow_symlinks - else ["-e", f"'{self.__str__()}'", "-o", "-L", f"'{self.__str__()}'"] + async def checksum(self) -> str | None: + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.checksum() + else: + command = [ + "test", + "-f", + f"'{self.__str__()}'", + "&&", + "sha1sum", + f"'{self.__str__()}'", + "|", + "awk", + "'{print $1}'", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - ) + if status > 1: + raise WorkflowExecutionException( + "{} Command '{}' on location {}: {}".format( + status, command, self.location, result + ) + ) + return result.strip() + + async def exists(self) -> bool: + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.exists() + else: + return await self._test(command=(["-e", f"'{self.__str__()}'"])) async def glob( self, pattern, *, case_sensitive=None ) -> AsyncIterator[RemoteStreamFlowPath]: - if not pattern: - raise ValueError(f"Unacceptable pattern: {pattern!r}") - command = [ - "printf", - '"%s\\0"', - str(self / pattern), - "|", - "xargs", - "-0", - "-I{}", - "sh", - "-c", - '"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"', - "|", - "sort", - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - for path in result.split(): - yield RemoteStreamFlowPath( - path, context=self.context, location=self.location + if (inner_path := await self._get_inner_path()) != self: + async for path in inner_path.glob(pattern, case_sensitive=case_sensitive): + yield _get_outer_path( + context=self.context, + location=self.location, + path=path, + ) + else: + if not pattern: + raise ValueError(f"Unacceptable pattern: {pattern!r}") + command = [ + "printf", + '"%s\\0"', + str(self / pattern), + "|", + "xargs", + "-0", + "-I{}", + "sh", + "-c", + '"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"', + "|", + "sort", + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) + _check_status(command, self.location, result, status) + for path in result.split(): + yield self.with_segments(path) async def is_dir(self) -> bool: - return await self._test(command=["-d", f"'{self.__str__()}'"]) + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.is_dir() + else: + return await self._test(command=["-d", f"'{self.__str__()}'"]) async def is_file(self) -> bool: - return await self._test(command=["-f", f"'{self.__str__()}'"]) + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.is_file() + else: + return await self._test(command=["-f", f"'{self.__str__()}'"]) async def is_symlink(self) -> bool: return await self._test(command=["-L", f"'{self.__str__()}'"]) async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: - command = ["mkdir", "-m", f"{mode:o}"] - if parents or exist_ok: - command.append("-p") - command.append(self.__str__()) - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := await self._get_inner_path()) != self: + await inner_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + else: + command = ["mkdir", "-m", f"{mode:o}"] + if parents or exist_ok: + command.append("-p") + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def read_text(self, n=-1, encoding=None, errors=None) -> str: - command = ["head", "-c", str(n)] if n >= 0 else ["cat"] - command.append(self.__str__()) - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - return result.strip() + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.read_text(n=n, encoding=encoding, errors=errors) + else: + command = ["head", "-c", str(n)] if n >= 0 else ["cat"] + command.append(self.__str__()) + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) + return result.strip() async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: # If at least one primary location is present on the site, return its path @@ -507,13 +604,11 @@ async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: path=self.__str__(), deployment=self.connector.deployment_name, location_name=self.location.name, - data_type=DataType.PRIMARY, ): - return RemoteStreamFlowPath( - next(iter(locations)).path, - context=self.context, - location=next(iter(locations)).location, - ) + for loc in locations: + await loc.available.wait() + if loc.data_type == DataType.PRIMARY: + return self.with_segments(loc.path) # Otherwise, analyse the remote path command = [ "test", @@ -533,136 +628,142 @@ async def resolve(self, strict=False) -> RemoteStreamFlowPath | None: status, command, self.location, result ) ) - return ( - RemoteStreamFlowPath( - result.strip(), context=self.context, location=self.location - ) - if status == 0 - else None - ) + return self.with_segments(result.strip()) if status == 0 else None async def rmtree(self) -> None: - command = ["rm", "-rf ", self.__str__()] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := await self._get_inner_path()) != self: + await inner_path.rmtree() + else: + command = ["rm", "-rf ", self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def size(self) -> int: - command = [ - "".join( - [ - "find -L ", - f'"{self.__str__()}"', - " -type f -exec ls -ln {} \\+ | ", - "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", - ] + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.size() + else: + command = [ + "".join( + [ + "find -L ", + f'"{self.__str__()}"', + " -type f -exec ls -ln {} \\+ | ", + "awk 'BEGIN {sum=0} {sum+=$5} END {print sum}'; ", + ] + ) + ] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True ) - ] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) - result = result.strip().strip("'\"") - return int(result) if result.isdigit() else 0 + _check_status(command, self.location, result, status) + result = result.strip().strip("'\"") + return int(result) if result.isdigit() else 0 async def symlink_to(self, target, target_is_directory=False) -> None: - command = ["ln", "-snf", str(target), self.__str__()] - result, status = await self.connector.run( - location=self.location, command=command, capture_output=True - ) - _check_status(command, self.location, result, status) + if (inner_path := await self._get_inner_path()) != self: + await inner_path.symlink_to(target, target_is_directory=target_is_directory) + else: + command = ["ln", "-snf", str(target), self.__str__()] + result, status = await self.connector.run( + location=self.location, command=command, capture_output=True + ) + _check_status(command, self.location, result, status) async def walk( self, top_down=True, on_error=None, follow_symlinks=False ) -> AsyncIterator[ tuple[ - LocalStreamFlowPath, + RemoteStreamFlowPath, MutableSequence[str], MutableSequence[str], ] ]: - paths = [self] - while paths: - path = paths.pop() - if isinstance(path, tuple): - yield path - continue - command = ["find"] - if follow_symlinks: - command.append("-L") - command.extend([f"'{str(path)}'", "-mindepth", "1", "-maxdepth", "1"]) - try: - content, status = await self.connector.run( - location=self.location, - command=command + ["-type", "d"], - capture_output=True, - ) - _check_status(command, self.location, content, status) - content = content.strip(" \n") - dirnames = ( - [ - str( - RemoteStreamFlowPath( - p, context=self.context, location=self.location - ).relative_to(path) - ) - for p in content.splitlines() - ] - if content - else [] - ) - content, status = await self.connector.run( - location=self.location, - command=command + ["-type", "f"], - capture_output=True, - ) - _check_status(command, self.location, content, status) - content = content.strip(" \n") - filenames = ( - [ - str( - RemoteStreamFlowPath( - p, context=self.context, location=self.location - ).relative_to(path) - ) - for p in content.splitlines() - ] - if content - else [] - ) - except WorkflowExecutionException as error: - if on_error is not None: - on_error(error) - continue + if (inner_path := await self._get_inner_path()) != self: + async for path, dirnames, filenames in inner_path.walk( + top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks + ): + yield _get_outer_path( + context=self.context, location=self.location, path=path + ), dirnames, filenames + else: + paths = [self] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + command = ["find"] + if follow_symlinks: + command.append("-L") + command.extend([f"'{str(path)}'", "-mindepth", "1", "-maxdepth", "1"]) + try: + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "d"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + dirnames = ( + [ + str(self.with_segments(p).relative_to(path)) + for p in content.splitlines() + ] + if content + else [] + ) + content, status = await self.connector.run( + location=self.location, + command=command + ["-type", "f"], + capture_output=True, + ) + _check_status(command, self.location, content, status) + content = content.strip(" \n") + filenames = ( + [ + str(self.with_segments(p).relative_to(path)) + for p in content.splitlines() + ] + if content + else [] + ) + except WorkflowExecutionException as error: + if on_error is not None: + on_error(error) + continue - if top_down: - yield path, dirnames, filenames - else: - paths.append((path, dirnames, filenames)) - paths += [path._make_child_relpath(d) for d in reversed(dirnames)] + if top_down: + yield path, dirnames, filenames + else: + paths.append((path, dirnames, filenames)) + paths += [path._make_child_relpath(d) for d in reversed(dirnames)] def with_segments(self, *pathsegments): return type(self)(*pathsegments, context=self.context, location=self.location) async def write_text(self, data: str, **kwargs) -> int: - if not isinstance(data, str): - raise TypeError("data must be str, not %s" % data.__class__.__name__) - command = [ - "echo", - base64.b64encode(data.encode("utf-8")).decode("utf-8"), - "|", - "base64", - "-d", - ] - result, status = await self.connector.run( - location=self.location, - command=command, - stdout=self.__str__(), - capture_output=True, - ) - _check_status(command, self.location, result, status) - return len(data) + if (inner_path := await self._get_inner_path()) != self: + return await inner_path.write_text(data, **kwargs) + else: + if not isinstance(data, str): + raise TypeError("data must be str, not %s" % data.__class__.__name__) + command = [ + "echo", + base64.b64encode(data.encode("utf-8")).decode("utf-8"), + "|", + "base64", + "-d", + ] + result, status = await self.connector.run( + location=self.location, + command=command, + stdout=self.__str__(), + capture_output=True, + ) + _check_status(command, self.location, result, status) + return len(data) async def download( diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index e8296550..602c0313 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -58,7 +58,7 @@ async def _get_storage_from_binds( "+2", "|", "awk", - "'{print $7, $2, $5}'", + "'NF == 1 {device = $1; getline; $0 = device $0} {print $7, $2, $5}'", ], capture_output=True, )