Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: copy workspace folder to correct path #200

Merged
merged 8 commits into from
Aug 28, 2024
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Great! As you can see in the console output, the OpenHEXA CLI has created a new
structure required for an OpenHEXA pipeline. You can now `cd` in the new pipeline directory and run the pipeline:

```shell
cd my_awesome_pipeline
python pipeline.py
openhexa pipelines run ./my_awesome_pipeline
nazarfil marked this conversation as resolved.
Show resolved Hide resolved
```
nazarfil marked this conversation as resolved.
Show resolved Hide resolved

Congratulations! You have successfully run your first pipeline locally.
Expand Down
64 changes: 37 additions & 27 deletions openhexa/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import logging
import os
import shutil
import tempfile
import typing
from dataclasses import asdict
Expand Down Expand Up @@ -330,7 +329,7 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
ensure_pipeline_config_exists(path)
env_vars = get_local_workspace_config(path)
# # Prepare the mount for the workspace's files
mount_files_path = Path(env_vars["WORKSPACE_FILES_PATH"]).absolute()
mount_files_path = Path(env_vars.pop("WORKSPACE_FILES_PATH")).absolute()
try:
docker_client = docker.from_env()
docker_client.ping()
Expand All @@ -342,11 +341,10 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
if image is None:
image = env_vars.get("WORKSPACE_DOCKER_IMAGE", "blsq/openhexa-blsq-environment:latest")

# Create temporary directory with the files to mount
zip_file = generate_zip_file(path)
tmp_dir = tempfile.mkdtemp()
for file_path in path.glob("**/*"):
if file_path.suffix in (".py", ".ipynb", ".txt", ".md", ".yaml"):
shutil.copy(file_path, tmp_dir)
with ZipFile(zip_file) as zf:
zf.extractall(tmp_dir)

volumes = {
tmp_dir: {"bind": "/home/hexa/pipeline", "mode": "rw"},
Expand All @@ -363,6 +361,8 @@ def run_pipeline(path: Path, config: dict, image: str = None, debug: bool = Fals
**env_vars,
}

print(environment)

command = f"pipeline run --config {base64.b64encode(json.dumps(config).encode('utf-8')).decode('utf-8')}"
try:
docker_client.images.get(image)
Expand Down Expand Up @@ -477,41 +477,33 @@ def create_pipeline_structure(pipeline_name: str, base_path: Path, workspace: st
return output_directory


def upload_pipeline(
pipeline_directory_path: typing.Union[str, Path],
name: str,
description: str = None,
link: str = None,
):
"""Upload the pipeline contained in the provided directory using the GraphQL API.
def generate_zip_file(pipeline_directory_path: typing.Union[str, Path]) -> io.BytesIO:
"""Generate a ZIP file containing the pipeline code.

The pipeline code will be zipped and base64-encoded before being sent to the backend.
"""
if settings.current_workspace is None:
raise NoActiveWorkspaceError

directory = pipeline_directory_path.absolute()
pipeline = get_pipeline_metadata(directory)

zip_file = io.BytesIO(b"")
Args:
pipeline_directory_path (typing.Union[str, Path]): The path to the pipeline directory.

Returns
-------
io.BytesIO: A BytesIO object containing the ZIP file.
"""
if settings.debug:
click.echo("Generating ZIP file:")
zip_file = io.BytesIO(b"")
files = []

# We exclude the workspace directory since it can break the mount of the bucket on /home/hexa/workspace
# This is also the default value of the WORKSPACE_FILES_PATH env var
excluded_paths = [directory / "workspace"]
excluded_paths = [pipeline_directory_path / "workspace"]
try:
env_vars = get_local_workspace_config(pipeline_directory_path)
if env_vars.get("WORKSPACE_FILES_PATH") and Path(env_vars["WORKSPACE_FILES_PATH"]) not in excluded_paths:
excluded_paths.append(Path(env_vars["WORKSPACE_FILES_PATH"]))
except FileNotFoundError:
# No workspace.yaml file found, we can ignore this error and assume the default value of WORKSPACE_FILES_PATH
pass

with ZipFile(zip_file, "w") as zipObj:
for path in directory.glob("**/*"):
for path in pipeline_directory_path.glob("**/*"):
if path.name == "python":
# We are in a virtual environment
excluded_paths.append(path.parent.parent) # ./<venv>/bin/python -> ./<venv>
Expand All @@ -532,9 +524,27 @@ def upload_pipeline(
continue
if settings.debug:
click.echo(f"\t{file_path.name}")
zipObj.write(file_path, file_path.relative_to(directory))

zipObj.write(file_path, file_path.relative_to(pipeline_directory_path))
zip_file.seek(0)
return zip_file


def upload_pipeline(
pipeline_directory_path: typing.Union[str, Path],
name: str,
description: str = None,
link: str = None,
):
"""Upload the pipeline contained in the provided directory using the GraphQL API.

The pipeline code will be zipped and base64-encoded before being sent to the backend.
"""
if settings.current_workspace is None:
raise NoActiveWorkspaceError

directory = pipeline_directory_path.absolute()
pipeline = get_pipeline_metadata(directory)
zip_file = generate_zip_file(directory)

if settings.debug:
# Write zip_file to disk for debugging
Expand Down
6 changes: 1 addition & 5 deletions openhexa/sdk/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,10 @@ def __call__(self, config: typing.Optional[dict[str, typing.Any]] = None):
This method can be called with an explicit configuration. If no configuration is provided, it will parse the
command-line arguments to build it.
"""
# Handle local workspace config for dev / testing, if appropriate
if get_environment() == Environment.LOCAL_PIPELINE:
os.environ.update(get_local_workspace_config(Path("/home/hexa/pipeline")))

# User can run their pipeline using `python pipeline.py`. It's considered as a standalone usage of the library.
# Since we still support this use case for the moment, we'll try to load the workspace.yaml
nazarfil marked this conversation as resolved.
Show resolved Hide resolved
# at the path of the file
elif get_environment() == Environment.STANDALONE:
if get_environment() == Environment.STANDALONE:
os.environ.update(get_local_workspace_config(Path(sys.argv[0]).parent))

if config is None: # Called without arguments, in the pipeline file itself
Expand Down
4 changes: 2 additions & 2 deletions openhexa/sdk/pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def get_local_workspace_config(path: Path):
if "files" in local_workspace_config:
try:
files_path = path / Path(local_workspace_config["files"]["path"])
if not files_path.exists():
# Let's create the folder if it doesn't exist
if files_path.exists() is False:
# Let's create the folder if it doesn't exist (only needed when running the pipeline using `python pipeline.py`)
files_path.mkdir(parents=True)
env_vars["WORKSPACE_FILES_PATH"] = str(files_path.resolve())
except KeyError:
Expand Down
4 changes: 2 additions & 2 deletions openhexa/sdk/workspaces/current_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def files_path(self) -> str:
/home/hexa/workspace/some/path
"""
# FIXME: This is a hack to make the SDK work in the context of the `python pipeline.py` command.
# We can remove this once we deprecate this way of running pipelines
return os.environ["WORKSPACE_FILES_PATH"] if "WORKSPACE_FILES_PATH" in os.environ else "/home/hexa/workspace"
# We can remove this once we deprecate this way of running pipelines and only use /home/hexa/workspace
return os.environ.get("WORKSPACE_FILES_PATH", "/home/hexa/workspace")

@property
def tmp_path(self) -> str:
Expand Down