Skip to content

Commit

Permalink
Merge pull request #381 from sustainable-computing-io/revert-370-serv…
Browse files Browse the repository at this point in the history
…er-api-rebase

Revert "feat: update select logic with spec similarity computation"
  • Loading branch information
sunya-ch authored Aug 20, 2024
2 parents 6cb28d7 + 072583d commit cb563bf
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 534 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,3 @@ jobs:
timeout-minutes: 5
- name: Test offline trainer
run: make test-offline-trainer
- name: Test model server select
run: make test-model-server-select
timeout-minutes: 5
- name: Test model server select via estimator
run: make test-model-server-estimator-select
timeout-minutes: 5
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ tests/data/extractor_output
tests/data/isolator_output
tests/data/offline_trainer_output
tests/data/plot_output
tests/db-models
model_training/*data*
model_training/tekton/secret
local-dev-cluster
Expand Down
48 changes: 2 additions & 46 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ PYTHON = python3.10

DOCKERFILES_PATH := ./dockerfiles
MODEL_PATH := ${PWD}/tests/models
MACHINE_SPEC_PATH := ${PWD}/tests/data/machine_spec

build:
$(CTR_CMD) build -t $(IMAGE) -f $(DOCKERFILES_PATH)/Dockerfile .
Expand Down Expand Up @@ -56,15 +55,10 @@ run-estimator:

run-collector-client:
$(CTR_CMD) exec estimator /bin/bash -c \
"while [ ! -S "/tmp/estimator.sock" ]; do \
sleep 1; \
done; \
hatch run test -vvv -s ./tests/estimator_power_request_test.py"
"while [ ! -S "/tmp/estimator.sock" ]; do sleep 1; done; hatch test -vvv -s ./tests/estimator_power_request_test.py"

clean-estimator:
@$(CTR_CMD) logs estimator
@$(CTR_CMD) stop estimator
@$(CTR_CMD) rm estimator || true
$(CTR_CMD) stop estimator

test-estimator: run-estimator run-collector-client clean-estimator

Expand All @@ -84,9 +78,7 @@ run-estimator-client:
hatch run test -vvv -s ./tests/estimator_model_request_test.py

clean-model-server:
@$(CTR_CMD) logs model-server
@$(CTR_CMD) stop model-server
@$(CTR_CMD) rm model-server || true

test-model-server: \
run-model-server \
Expand Down Expand Up @@ -114,42 +106,6 @@ test-offline-trainer: \
run-offline-trainer-client \
clean-offline-trainer

# test model server select
create-container-net:
@$(CTR_CMD) network create kepler-model-server-test

run-model-server-with-db:
$(CTR_CMD) run -d --platform linux/amd64 \
--network kepler-model-server-test \
-p 8100:8100 \
--name model-server $(TEST_IMAGE) \
model-server
while ! docker logs model-server 2>&1 | grep -q 'Running on all'; do \
echo "... waiting for model-server to serve"; sleep 5; \
done

run-estimator-with-model-server:
$(CTR_CMD) run -d --platform linux/amd64 \
--network kepler-model-server-test \
-e "PYTHONUNBUFFERED=1" \
-e "MACHINE_ID=test" \
-v ${MACHINE_SPEC_PATH}:/etc/kepler/models/machine_spec \
-e "MODEL_SERVER_ENABLE=true" \
-e "MODEL_SERVER_URL=http://model-server:8100" \
--name estimator $(TEST_IMAGE) \
estimator

clean-container-net:
@$(CTR_CMD) network rm kepler-model-server-test

run-select-client:
$(CTR_CMD) exec model-server \
hatch run test -vvv -s ./tests/model_select_test.py

test-model-server-select: create-container-net run-model-server-with-db run-select-client clean-model-server clean-container-net

test-model-server-estimator-select: create-container-net run-model-server-with-db run-estimator-with-model-server run-collector-client clean-estimator clean-model-server clean-container-net

test: \
build-test \
test-pipeline \
Expand Down
1 change: 0 additions & 1 deletion dockerfiles/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
__pycache__
src/resource/
src/kepler_model/models/
src/models/
tests/models/
1 change: 0 additions & 1 deletion dockerfiles/Dockerfile.test-nobase.dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
__pycache__
src/resource/
src/kepler_model/models/
src/models/
tests/models/
1 change: 0 additions & 1 deletion dockerfiles/Dockerfile.test.dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
__pycache__
src/resource/
src/kepler_model/models/
src/models/
tests/models/
12 changes: 6 additions & 6 deletions src/kepler_model/estimate/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def handle_request(data):
current_trainer = loaded_model[output_type.name][power_request.energy_source].trainer_name
request_trainer = current_trainer != power_request.trainer_name
if request_trainer:
logger.info(f"try obtaining the requesting trainer {power_request.trainer_name} (current: {current_trainer})")
logger.info("try obtaining the requesting trainer {} (current: {})".format(power_request.trainer_name, current_trainer))
if power_request.energy_source not in loaded_model[output_type.name] or request_trainer:
output_path = get_download_output_path(download_path, power_request.energy_source, output_type)
if not os.path.exists(output_path):
Expand All @@ -84,20 +84,20 @@ def handle_request(data):
logger.error(msg)
return {"powers": dict(), "msg": msg}
else:
logger.info(f"load model from config: {output_path}")
logger.info("load model from config: ", output_path)
else:
logger.info(f"load model from model server: {output_path}")
logger.info("load model from model server: %s", output_path)
loaded_item = load_downloaded_model(power_request.energy_source, output_type)
if loaded_item is not None and loaded_item.estimator is not None:
loaded_model[output_type.name][power_request.energy_source] = loaded_item
logger.info(f"set model {loaded_item.model_name} for {power_request.energy_source} ({output_type.name})")
logger.info("set model {0} for {2} ({1})".format(loaded_item.model_name, output_type.name, power_request.energy_source))
# remove loaded model
shutil.rmtree(output_path)

model = loaded_model[output_type.name][power_request.energy_source]
powers, msg = model.get_power(power_request.datapoint)
if msg != "":
logger.info(f"{model.model_name} fail to predict, removed: {msg}")
logger.info("{} fail to predict, removed: {}".format(model.model_name, msg))
if output_path != "" and os.path.exists(output_path):
shutil.rmtree(output_path)
return {"powers": powers, "msg": msg}
Expand All @@ -111,7 +111,7 @@ def start(self):
s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self.socket_path)
s.listen(1)
logger.info(f"started serving on {self.socket_path}")
logger.info("started serving on {}".format(self.socket_path))
try:
while True:
connection, _ = s.accept()
Expand Down
40 changes: 13 additions & 27 deletions src/kepler_model/estimate/model_server_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,22 @@
from kepler_model.util.config import is_model_server_enabled, get_model_server_req_endpoint, get_model_server_list_endpoint, download_path
from kepler_model.util.loader import get_download_output_path
from kepler_model.util.train_types import ModelOutputType
from kepler_model.train.profiler.node_type_index import discover_spec_values

machine_spec_mount_path = "/etc/kepler/models/machine_spec"
machine_id = os.getenv('MACHINE_ID', None)

# get_spec_values: determine node spec in json format (refer to NodeTypeSpec)
def get_spec_values(machine_id : str|None):
if machine_id is not None:
spec_file = os.path.join(machine_spec_mount_path, machine_id)
try:
with open(spec_file) as f:
res = json.load(f)
return res
except:
pass
return discover_spec_values()
# discover_spec: determine node spec in json format (refer to NodeTypeSpec)
def discover_spec():
import psutil

node_spec = None
# TODO: reuse node_type_index/generate_spec with loosen selection
cores = psutil.cpu_count(logical=True)
spec = {"cores": cores}
return spec


node_spec = discover_spec()


def make_model_request(power_request):
global node_spec
if node_spec is None:
node_spec = get_spec_values(machine_id)
print(f"Node spec: {node_spec}")
return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name, "spec": node_spec}


Expand Down Expand Up @@ -71,21 +62,16 @@ def make_request(power_request):
return unpack(power_request.energy_source, output_type, response)


def list_all_models(energy_source=None, node_type=None):
def list_all_models():
if not is_model_server_enabled():
return dict()
try:
endpoint = get_model_server_list_endpoint()
params= {}
if energy_source:
params["source"] = energy_source
if node_type:
params["type"] = node_type
response = requests.get(endpoint, params=params)
response = requests.get(get_model_server_list_endpoint())
except Exception as err:
print("cannot list model: {}".format(err))
return dict()
if response.status_code != 200:
return dict()
model_names = json.loads(response.content.decode("utf-8"))
return model_names

60 changes: 11 additions & 49 deletions src/kepler_model/server/model_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from kepler_model.util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup, PowerSourceMap, weight_support_trainers
from kepler_model.util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_urls, download_path
from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates, default_pipelines, get_node_type_from_name
from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates
from kepler_model.util.saver import WEIGHT_FILENAME
from kepler_model.train import NodeTypeSpec, NodeTypeIndexCollection

Expand Down Expand Up @@ -65,26 +65,10 @@ def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weig
"""


def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False, loose_node_type=True):
# Find initial model list filtered by trainer
initial_model_names = [f for f in os.listdir(valid_grouppath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_grouppath, f)) and (trainer_name == "" or trainer_name in f)]
if pipeline_name == "" and energy_source in default_pipelines:
pipeline_name = default_pipelines[energy_source]

if node_type != any_node_type:
model_names = [name for name in initial_model_names if "_{}".format(node_type) in name]
if len(model_names) == 0:
if not loose_node_type:
return None, None
logger.warning(f"{valid_grouppath} has no matched model for node type={node_type}, try all available models")
model_names = initial_model_names
else:
model_names = initial_model_names

# Filter weight models
def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False):
model_names = [f for f in os.listdir(valid_groupath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_groupath, f)) and (trainer_name == "" or trainer_name in f)]
if weight:
candidates = [name for name in model_names if name.split("_")[0] in weight_support_trainers]

model_names = [name for name in model_names if name.split("_")[0] in weight_support_trainers]
# Load metadata of trainers
best_cadidate = None
best_response = None
Expand All @@ -101,7 +85,7 @@ def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_na
logger.warn("no large candidates, select from all availables")
candidates = model_names
for model_name in candidates:
model_savepath = os.path.join(valid_grouppath, model_name)
model_savepath = os.path.join(valid_groupath, model_name)
metadata = load_json(model_savepath, METADATA_FILENAME)
if metadata is None or not is_valid_model(metadata, filters) or ERROR_KEY not in metadata:
# invalid metadata
Expand All @@ -114,7 +98,7 @@ def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_na
logger.warn("weight failed: %s", model_savepath)
continue
else:
response = get_archived_file(valid_grouppath, model_name)
response = get_archived_file(valid_groupath, model_name)
if not os.path.exists(response):
# archived model file does not exists
logger.warn("archive failed: %s", response)
Expand Down Expand Up @@ -146,36 +130,20 @@ def get_model():
output_type = ModelOutputType[req.output_type]
best_model = None
best_response = None
best_uncertainty = None
best_looseness = None
# find best model comparing best candidate from each valid feature group complied with filtering conditions
for fg in valid_fgs:
pipeline_name = pipelineName[energy_source]
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipeline_name)
node_type = req.node_type
if req.node_type == any_node_type and req.spec is not None and not req.spec.is_none() and pipeline_name in nodeCollection:
node_type, uncertainty, looseness = nodeCollection[pipeline_name].get_node_type(req.spec, loose_search=True)
else:
uncertainty = 0
looseness = 0
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
if os.path.exists(valid_groupath):
best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, node_type, req.weight)
best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, req.node_type, req.weight)
if best_candidate is None:
continue
if node_type != any_node_type and best_model is not None and get_node_type_from_name(best_model['model_name']) == node_type:
if get_node_type_from_name(best_candidate['model_name']) != node_type:
continue
if best_model is None or best_model[ERROR_KEY] > best_candidate[ERROR_KEY]:
best_model = best_candidate
best_response = response
best_uncertainty = uncertainty
best_looseness = looseness
logger.info(f"response: model {best_model['model_name']} by {best_model['features']} with {ERROR_KEY}={best_model[ERROR_KEY]} selected with uncertainty={best_uncertainty}, looseness={best_looseness}")
if best_model is None:
return make_response("cannot find model for {} at the moment".format(model_request), 400)
if req.weight:
try:
best_response["model_name"] = best_model['model_name']
response = app.response_class(response=json.dumps(best_response), status=200, mimetype="application/json")
return response
except ValueError as err:
Expand All @@ -186,13 +154,13 @@ def get_model():
except ValueError as err:
return make_response("send archived model error: {}".format(err), 400)


# get_available_models: return name list of best-candidate pipelines
@app.route(MODEL_SERVER_MODEL_LIST_PATH, methods=["GET"])
def get_available_models():
fg = request.args.get("fg")
ot = request.args.get("ot")
energy_source = request.args.get("source")
node_type = request.args.get("type")
filter = request.args.get("filter")

try:
Expand All @@ -213,27 +181,21 @@ def get_available_models():
filters = dict()
else:
filters = parse_filters(filter)
if node_type is None:
node_type = -1
else:
node_type = int(node_type)

model_names = dict()
for output_type in output_types:
logger.debug(f"Searching output type {output_type}")
model_names[output_type.name] = dict()
for fg in valid_fgs:
logger.debug(f"Searching feature group {fg}")
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
if os.path.exists(valid_groupath):
best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source, node_type=node_type, loose_node_type=False)
best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source)
if best_candidate is None:
continue
model_names[output_type.name][fg.name] = best_candidate["model_name"]
response = app.response_class(response=json.dumps(model_names), status=200, mimetype="application/json")
return response
except (ValueError, Exception) as err:
return make_response(f"failed to get best model list: {err}", 400)
return make_response("failed to get best model list: {}".format(err), 400)


# upack_zip_files: unpack all model.zip files to model folder and copy model.json to model/weight.zip
Expand Down
Loading

0 comments on commit cb563bf

Please sign in to comment.