Skip to content

Commit

Permalink
Remove unnecessary move operation when saving model (logicalclocks#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzor92 authored Dec 17, 2021
1 parent 08f3ade commit f534a29
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
72 changes: 31 additions & 41 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,42 +99,27 @@ def _copy_hopsfs_model(self, model_path, dataset_model_version_path):
_, file_name = os.path.split(path)
self._dataset_api.copy(path, dataset_model_version_path + "/" + file_name)

def _upload_local_model_folder(self, model_path, dataset_model_version_path):
def _upload_local_model_folder(
self, local_model_path, model_version, dataset_model_name_path
):
archive_out_dir = None
uploaded_archive_path = None
try:
archive_out_dir = tempfile.TemporaryDirectory(dir=os.getcwd())
archive_path = util.compress(archive_out_dir.name, model_path)
self._dataset_api.upload(archive_path, dataset_model_version_path)
archive_path = util.compress(
archive_out_dir.name, str(model_version), local_model_path
)
uploaded_archive_path = (
dataset_model_name_path + "/" + os.path.basename(archive_path)
)
self._dataset_api.upload(archive_path, dataset_model_name_path)
self._dataset_api.unzip(uploaded_archive_path, block=True, timeout=600)
except RestAPIError:
raise
finally:
if archive_out_dir is not None:
archive_out_dir.cleanup()

extracted_archive_path = (
dataset_model_version_path + "/" + os.path.basename(archive_path)
)

self._dataset_api.unzip(extracted_archive_path, block=True, timeout=480)

self._dataset_api.rm(extracted_archive_path)

extracted_model_dir = (
dataset_model_version_path
+ "/"
+ os.path.basename(archive_path[: archive_path.index(".")])
)

# Observed that when decompressing a large folder and directly moving the files sometimes caused filesystem exceptions
time.sleep(5)

for artifact in os.listdir(model_path):
_, file_name = os.path.split(artifact)
self._dataset_api.move(
extracted_model_dir + "/" + file_name,
dataset_model_version_path + "/" + file_name,
)
self._dataset_api.rm(extracted_model_dir)
self._dataset_api.rm(uploaded_archive_path)

def _set_model_version(
self, model_instance, dataset_models_root_path, dataset_model_path
Expand All @@ -147,7 +132,10 @@ def _set_model_version(
]:
_, file_name = os.path.split(item["attributes"]["path"])
try:
current_version = int(file_name)
try:
current_version = int(file_name)
except ValueError:
continue
if current_version > current_highest_version:
current_highest_version = current_version
except RestAPIError:
Expand Down Expand Up @@ -199,20 +187,16 @@ def save(self, model_instance, model_path, await_registration=480):
)

# Create /Models/{model_instance._name} folder
dataset_model_path = dataset_models_root_path + "/" + model_instance._name
if not self._dataset_api.path_exists(dataset_model_path):
self._dataset_api.mkdir(dataset_model_path)
dataset_model_name_path = dataset_models_root_path + "/" + model_instance._name
if not self._dataset_api.path_exists(dataset_model_name_path):
self._dataset_api.mkdir(dataset_model_name_path)

model_instance = self._set_model_version(
model_instance, dataset_models_root_path, dataset_model_path
model_instance, dataset_models_root_path, dataset_model_name_path
)

dataset_model_version_path = (
dataset_models_root_path
+ "/"
+ model_instance._name
+ "/"
+ str(model_instance._version)
dataset_model_name_path + "/" + str(model_instance._version)
)

# Attach model summary xattr to /Models/{model_instance._name}/{model_instance._version}
Expand Down Expand Up @@ -264,17 +248,23 @@ def save(self, model_instance, model_path, await_registration=480):
)
if step["id"] == 2:
# Upload Model files from local path to /Models/{model_instance._name}/{model_instance._version}
if os.path.exists(model_path): # check local absolute
# check local absolute
if os.path.exists(model_path):
self._upload_local_model_folder(
model_path, dataset_model_version_path
model_path,
model_instance.version,
dataset_model_name_path,
)
# check local relative
elif os.path.exists(
os.path.join(os.getcwd(), model_path)
): # check local relative
self._upload_local_model_folder(
os.path.join(os.getcwd(), model_path),
dataset_model_version_path,
model_instance.version,
dataset_model_name_path,
)
# check project relative
elif self._dataset_api.path_exists(
model_path
): # check hdfs relative and absolute
Expand Down
5 changes: 3 additions & 2 deletions python/hsml/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Union
import numpy as np
import pandas as pd
import os

from json import JSONEncoder

Expand Down Expand Up @@ -141,9 +142,9 @@ def _handle_dataframe_input(input_ex):
)


def compress(archive_file_path, dir_to_archive_path):
def compress(archive_file_path, archive_name, dir_to_archive_path):
return shutil.make_archive(
archive_file_path + "/archive", "gztar", dir_to_archive_path
os.path.join(archive_file_path, archive_name), "gztar", dir_to_archive_path
)


Expand Down

0 comments on commit f534a29

Please sign in to comment.