diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 76fe0ef5..cffde569 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -38,9 +38,3 @@ jobs: timeout-minutes: 5 - name: Test offline trainer run: make test-offline-trainer - - name: Test model server select - run: make test-model-server-select - timeout-minutes: 5 - - name: Test model server select via estimator - run: make test-model-server-estimator-select - timeout-minutes: 5 diff --git a/.gitignore b/.gitignore index 6e94f560..15b72fad 100644 --- a/.gitignore +++ b/.gitignore @@ -146,7 +146,6 @@ tests/data/extractor_output tests/data/isolator_output tests/data/offline_trainer_output tests/data/plot_output -tests/db-models model_training/*data* model_training/tekton/secret local-dev-cluster diff --git a/Makefile b/Makefile index 782605f4..f4646a2a 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,6 @@ PYTHON = python3.10 DOCKERFILES_PATH := ./dockerfiles MODEL_PATH := ${PWD}/tests/models -MACHINE_SPEC_PATH := ${PWD}/tests/data/machine_spec build: $(CTR_CMD) build -t $(IMAGE) -f $(DOCKERFILES_PATH)/Dockerfile . @@ -56,15 +55,10 @@ run-estimator: run-collector-client: $(CTR_CMD) exec estimator /bin/bash -c \ - "while [ ! -S "/tmp/estimator.sock" ]; do \ - sleep 1; \ - done; \ - hatch run test -vvv -s ./tests/estimator_power_request_test.py" + "while [ ! -S "/tmp/estimator.sock" ]; do sleep 1; done; hatch test -vvv -s ./tests/estimator_power_request_test.py" clean-estimator: - @$(CTR_CMD) logs estimator - @$(CTR_CMD) stop estimator - @$(CTR_CMD) rm estimator || true + $(CTR_CMD) stop estimator test-estimator: run-estimator run-collector-client clean-estimator @@ -84,9 +78,7 @@ run-estimator-client: hatch run test -vvv -s ./tests/estimator_model_request_test.py clean-model-server: - @$(CTR_CMD) logs model-server @$(CTR_CMD) stop model-server - @$(CTR_CMD) rm model-server || true test-model-server: \ run-model-server \ @@ -114,42 +106,6 @@ test-offline-trainer: \ run-offline-trainer-client \ clean-offline-trainer -# test model server select -create-container-net: - @$(CTR_CMD) network create kepler-model-server-test - -run-model-server-with-db: - $(CTR_CMD) run -d --platform linux/amd64 \ - --network kepler-model-server-test \ - -p 8100:8100 \ - --name model-server $(TEST_IMAGE) \ - model-server - while ! docker logs model-server 2>&1 | grep -q 'Running on all'; do \ - echo "... waiting for model-server to serve"; sleep 5; \ - done - -run-estimator-with-model-server: - $(CTR_CMD) run -d --platform linux/amd64 \ - --network kepler-model-server-test \ - -e "PYTHONUNBUFFERED=1" \ - -e "MACHINE_ID=test" \ - -v ${MACHINE_SPEC_PATH}:/etc/kepler/models/machine_spec \ - -e "MODEL_SERVER_ENABLE=true" \ - -e "MODEL_SERVER_URL=http://model-server:8100" \ - --name estimator $(TEST_IMAGE) \ - estimator - -clean-container-net: - @$(CTR_CMD) network rm kepler-model-server-test - -run-select-client: - $(CTR_CMD) exec model-server \ - hatch run test -vvv -s ./tests/model_select_test.py - -test-model-server-select: create-container-net run-model-server-with-db run-select-client clean-model-server clean-container-net - -test-model-server-estimator-select: create-container-net run-model-server-with-db run-estimator-with-model-server run-collector-client clean-estimator clean-model-server clean-container-net - test: \ build-test \ test-pipeline \ diff --git a/dockerfiles/Dockerfile.dockerignore b/dockerfiles/Dockerfile.dockerignore index 44351442..15b9d786 100644 --- a/dockerfiles/Dockerfile.dockerignore +++ b/dockerfiles/Dockerfile.dockerignore @@ -1,5 +1,4 @@ __pycache__ src/resource/ src/kepler_model/models/ -src/models/ tests/models/ diff --git a/dockerfiles/Dockerfile.test-nobase.dockerignore b/dockerfiles/Dockerfile.test-nobase.dockerignore index 44351442..15b9d786 100644 --- a/dockerfiles/Dockerfile.test-nobase.dockerignore +++ b/dockerfiles/Dockerfile.test-nobase.dockerignore @@ -1,5 +1,4 @@ __pycache__ src/resource/ src/kepler_model/models/ -src/models/ tests/models/ diff --git a/dockerfiles/Dockerfile.test.dockerignore b/dockerfiles/Dockerfile.test.dockerignore index 44351442..15b9d786 100644 --- a/dockerfiles/Dockerfile.test.dockerignore +++ b/dockerfiles/Dockerfile.test.dockerignore @@ -1,5 +1,4 @@ __pycache__ src/resource/ src/kepler_model/models/ -src/models/ tests/models/ diff --git a/src/kepler_model/estimate/estimator.py b/src/kepler_model/estimate/estimator.py index a15445ab..ae67ce02 100644 --- a/src/kepler_model/estimate/estimator.py +++ b/src/kepler_model/estimate/estimator.py @@ -70,7 +70,7 @@ def handle_request(data): current_trainer = loaded_model[output_type.name][power_request.energy_source].trainer_name request_trainer = current_trainer != power_request.trainer_name if request_trainer: - logger.info(f"try obtaining the requesting trainer {power_request.trainer_name} (current: {current_trainer})") + logger.info("try obtaining the requesting trainer {} (current: {})".format(power_request.trainer_name, current_trainer)) if power_request.energy_source not in loaded_model[output_type.name] or request_trainer: output_path = get_download_output_path(download_path, power_request.energy_source, output_type) if not os.path.exists(output_path): @@ -84,20 +84,20 @@ def handle_request(data): logger.error(msg) return {"powers": dict(), "msg": msg} else: - logger.info(f"load model from config: {output_path}") + logger.info("load model from config: ", output_path) else: - logger.info(f"load model from model server: {output_path}") + logger.info("load model from model server: %s", output_path) loaded_item = load_downloaded_model(power_request.energy_source, output_type) if loaded_item is not None and loaded_item.estimator is not None: loaded_model[output_type.name][power_request.energy_source] = loaded_item - logger.info(f"set model {loaded_item.model_name} for {power_request.energy_source} ({output_type.name})") + logger.info("set model {0} for {2} ({1})".format(loaded_item.model_name, output_type.name, power_request.energy_source)) # remove loaded model shutil.rmtree(output_path) model = loaded_model[output_type.name][power_request.energy_source] powers, msg = model.get_power(power_request.datapoint) if msg != "": - logger.info(f"{model.model_name} fail to predict, removed: {msg}") + logger.info("{} fail to predict, removed: {}".format(model.model_name, msg)) if output_path != "" and os.path.exists(output_path): shutil.rmtree(output_path) return {"powers": powers, "msg": msg} @@ -111,7 +111,7 @@ def start(self): s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.bind(self.socket_path) s.listen(1) - logger.info(f"started serving on {self.socket_path}") + logger.info("started serving on {}".format(self.socket_path)) try: while True: connection, _ = s.accept() diff --git a/src/kepler_model/estimate/model_server_connector.py b/src/kepler_model/estimate/model_server_connector.py index 8fc71b0f..b884decb 100644 --- a/src/kepler_model/estimate/model_server_connector.py +++ b/src/kepler_model/estimate/model_server_connector.py @@ -8,31 +8,22 @@ from kepler_model.util.config import is_model_server_enabled, get_model_server_req_endpoint, get_model_server_list_endpoint, download_path from kepler_model.util.loader import get_download_output_path from kepler_model.util.train_types import ModelOutputType -from kepler_model.train.profiler.node_type_index import discover_spec_values -machine_spec_mount_path = "/etc/kepler/models/machine_spec" -machine_id = os.getenv('MACHINE_ID', None) -# get_spec_values: determine node spec in json format (refer to NodeTypeSpec) -def get_spec_values(machine_id : str|None): - if machine_id is not None: - spec_file = os.path.join(machine_spec_mount_path, machine_id) - try: - with open(spec_file) as f: - res = json.load(f) - return res - except: - pass - return discover_spec_values() +# discover_spec: determine node spec in json format (refer to NodeTypeSpec) +def discover_spec(): + import psutil -node_spec = None + # TODO: reuse node_type_index/generate_spec with loosen selection + cores = psutil.cpu_count(logical=True) + spec = {"cores": cores} + return spec + + +node_spec = discover_spec() def make_model_request(power_request): - global node_spec - if node_spec is None: - node_spec = get_spec_values(machine_id) - print(f"Node spec: {node_spec}") return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name, "spec": node_spec} @@ -71,17 +62,11 @@ def make_request(power_request): return unpack(power_request.energy_source, output_type, response) -def list_all_models(energy_source=None, node_type=None): +def list_all_models(): if not is_model_server_enabled(): return dict() try: - endpoint = get_model_server_list_endpoint() - params= {} - if energy_source: - params["source"] = energy_source - if node_type: - params["type"] = node_type - response = requests.get(endpoint, params=params) + response = requests.get(get_model_server_list_endpoint()) except Exception as err: print("cannot list model: {}".format(err)) return dict() @@ -89,3 +74,4 @@ def list_all_models(energy_source=None, node_type=None): return dict() model_names = json.loads(response.content.decode("utf-8")) return model_names + diff --git a/src/kepler_model/server/model_server.py b/src/kepler_model/server/model_server.py index cb0d28aa..4f149339 100644 --- a/src/kepler_model/server/model_server.py +++ b/src/kepler_model/server/model_server.py @@ -10,7 +10,7 @@ from kepler_model.util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup, PowerSourceMap, weight_support_trainers from kepler_model.util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_urls, download_path -from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates, default_pipelines, get_node_type_from_name +from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates from kepler_model.util.saver import WEIGHT_FILENAME from kepler_model.train import NodeTypeSpec, NodeTypeIndexCollection @@ -65,26 +65,10 @@ def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weig """ -def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False, loose_node_type=True): - # Find initial model list filtered by trainer - initial_model_names = [f for f in os.listdir(valid_grouppath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_grouppath, f)) and (trainer_name == "" or trainer_name in f)] - if pipeline_name == "" and energy_source in default_pipelines: - pipeline_name = default_pipelines[energy_source] - - if node_type != any_node_type: - model_names = [name for name in initial_model_names if "_{}".format(node_type) in name] - if len(model_names) == 0: - if not loose_node_type: - return None, None - logger.warning(f"{valid_grouppath} has no matched model for node type={node_type}, try all available models") - model_names = initial_model_names - else: - model_names = initial_model_names - - # Filter weight models +def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False): + model_names = [f for f in os.listdir(valid_groupath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_groupath, f)) and (trainer_name == "" or trainer_name in f)] if weight: - candidates = [name for name in model_names if name.split("_")[0] in weight_support_trainers] - + model_names = [name for name in model_names if name.split("_")[0] in weight_support_trainers] # Load metadata of trainers best_cadidate = None best_response = None @@ -101,7 +85,7 @@ def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_na logger.warn("no large candidates, select from all availables") candidates = model_names for model_name in candidates: - model_savepath = os.path.join(valid_grouppath, model_name) + model_savepath = os.path.join(valid_groupath, model_name) metadata = load_json(model_savepath, METADATA_FILENAME) if metadata is None or not is_valid_model(metadata, filters) or ERROR_KEY not in metadata: # invalid metadata @@ -114,7 +98,7 @@ def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_na logger.warn("weight failed: %s", model_savepath) continue else: - response = get_archived_file(valid_grouppath, model_name) + response = get_archived_file(valid_groupath, model_name) if not os.path.exists(response): # archived model file does not exists logger.warn("archive failed: %s", response) @@ -146,36 +130,20 @@ def get_model(): output_type = ModelOutputType[req.output_type] best_model = None best_response = None - best_uncertainty = None - best_looseness = None # find best model comparing best candidate from each valid feature group complied with filtering conditions for fg in valid_fgs: - pipeline_name = pipelineName[energy_source] - valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipeline_name) - node_type = req.node_type - if req.node_type == any_node_type and req.spec is not None and not req.spec.is_none() and pipeline_name in nodeCollection: - node_type, uncertainty, looseness = nodeCollection[pipeline_name].get_node_type(req.spec, loose_search=True) - else: - uncertainty = 0 - looseness = 0 + valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source]) if os.path.exists(valid_groupath): - best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, node_type, req.weight) + best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, req.node_type, req.weight) if best_candidate is None: continue - if node_type != any_node_type and best_model is not None and get_node_type_from_name(best_model['model_name']) == node_type: - if get_node_type_from_name(best_candidate['model_name']) != node_type: - continue if best_model is None or best_model[ERROR_KEY] > best_candidate[ERROR_KEY]: best_model = best_candidate best_response = response - best_uncertainty = uncertainty - best_looseness = looseness - logger.info(f"response: model {best_model['model_name']} by {best_model['features']} with {ERROR_KEY}={best_model[ERROR_KEY]} selected with uncertainty={best_uncertainty}, looseness={best_looseness}") if best_model is None: return make_response("cannot find model for {} at the moment".format(model_request), 400) if req.weight: try: - best_response["model_name"] = best_model['model_name'] response = app.response_class(response=json.dumps(best_response), status=200, mimetype="application/json") return response except ValueError as err: @@ -186,13 +154,13 @@ def get_model(): except ValueError as err: return make_response("send archived model error: {}".format(err), 400) + # get_available_models: return name list of best-candidate pipelines @app.route(MODEL_SERVER_MODEL_LIST_PATH, methods=["GET"]) def get_available_models(): fg = request.args.get("fg") ot = request.args.get("ot") energy_source = request.args.get("source") - node_type = request.args.get("type") filter = request.args.get("filter") try: @@ -213,27 +181,21 @@ def get_available_models(): filters = dict() else: filters = parse_filters(filter) - if node_type is None: - node_type = -1 - else: - node_type = int(node_type) model_names = dict() for output_type in output_types: - logger.debug(f"Searching output type {output_type}") model_names[output_type.name] = dict() for fg in valid_fgs: - logger.debug(f"Searching feature group {fg}") valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source]) if os.path.exists(valid_groupath): - best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source, node_type=node_type, loose_node_type=False) + best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source) if best_candidate is None: continue model_names[output_type.name][fg.name] = best_candidate["model_name"] response = app.response_class(response=json.dumps(model_names), status=200, mimetype="application/json") return response except (ValueError, Exception) as err: - return make_response(f"failed to get best model list: {err}", 400) + return make_response("failed to get best model list: {}".format(err), 400) # upack_zip_files: unpack all model.zip files to model folder and copy model.json to model/weight.zip diff --git a/src/kepler_model/train/profiler/node_type_index.py b/src/kepler_model/train/profiler/node_type_index.py index 7fb3d95f..bd4b90da 100644 --- a/src/kepler_model/train/profiler/node_type_index.py +++ b/src/kepler_model/train/profiler/node_type_index.py @@ -17,7 +17,6 @@ from kepler_model.util.saver import save_node_type_index, save_machine_spec from kepler_model.util.loader import load_node_type_index -from kepler_model.util.similarity import compute_jaccard_similarity, get_similarity_weight, compute_similarity, get_num_of_none, get_candidate_score, find_best_candidate, compute_uncertainty, compute_looseness def rename(name: str) -> str: @@ -46,7 +45,8 @@ def format_vendor(vendor): GB = 1024 * 1024 * 1024 -def discover_spec_values(): + +def generate_spec(data_path, machine_id): processor = "" vendor = "" cpu_info = cpuinfo.get_cpu_info() @@ -54,38 +54,23 @@ def discover_spec_values(): processor = format_processor(cpu_info["brand_raw"]) context = pyudev.Context() for device in context.list_devices(subsystem="dmi"): - if device.get('ID_VENDOR') is not None: - vendor = format_vendor(device.get('ID_VENDOR')) + if device.get("ID_VENDOR") is not None: + vendor = format_vendor(device.get("ID_VENDOR")) break - if vendor == "" and "vendor_id_raw" in cpu_info: - vendor = format_vendor(cpu_info["vendor_id_raw"]) - cores = psutil.cpu_count(logical=True) chips = max(1, int(subprocess.check_output('cat /proc/cpuinfo | grep "physical id" | sort -u | wc -l', shell=True))) - threads_per_core = max(1, cores//psutil.cpu_count(logical=False)) + threads_per_core = max(1, cores // psutil.cpu_count(logical=False)) memory = psutil.virtual_memory().total - memory_gb = int(memory/GB) + memory_gb = int(memory / GB) freq = psutil.cpu_freq(percpu=False) - spec_values = { - "vendor": vendor, - "processor": processor, - "cores": cores, - "chips": chips, - "memory": memory_gb, - "threads_per_core": threads_per_core - } - if freq is not None: - cpu_freq_mhz = round(max(freq.max, freq.current)/100)*100 # round to one decimal of GHz - spec_values["frequency"] = cpu_freq_mhz - return spec_values - -def generate_spec(data_path, machine_id): - spec_values = discover_spec_values() + cpu_freq_mhz = round(max(freq.max, freq.current) / 100) * 100 # round to one decimal of GHz + spec_values = {"vendor": vendor, "processor": processor, "cores": cores, "chips": chips, "memory": memory_gb, "frequency": cpu_freq_mhz, "threads_per_core": threads_per_core} spec = NodeTypeSpec(**spec_values) print("Save machine spec ({}): ".format(data_path)) print(str(spec)) save_machine_spec(data_path, machine_id, spec) + class NodeAttribute(str, enum.Enum): PROCESSOR = "processor" CORES = "cores" @@ -118,12 +103,6 @@ def __init__(self, **kwargs): self.attrs[NodeAttribute.FREQ] = kwargs.get("frequency", no_data) self.members = [] - def is_none(self): - for value in self.attrs.values(): - if value != no_data: - return False - return True - def load(self, json_obj): for attr, attr_values in json_obj["attrs"].items(): self.attrs[attr] = attr_values @@ -158,35 +137,6 @@ def cover(self, compare_spec): return False return True - def get_uncertain_attribute_freq(self, compare_spec): - uncertain_attribute_freq = dict() - if not self.cover(compare_spec): - # not covered - return None - size = self.get_size() - for attr in NodeAttribute: - if compare_spec.attrs[attr] is None: - uncertain_attribute_freq[attr] = size - return uncertain_attribute_freq - - def get_similarity(self, compare_spec, debug=False): - total_similarity = 0 - for attr in NodeAttribute: - similarity = 0 - # compare similar string - if compare_spec.attrs[attr] is not None and attr in [NodeAttribute.PROCESSOR]: - similarity = compute_jaccard_similarity(self.attrs[attr], compare_spec.attrs[attr]) - # compare number - else: - if compare_spec.attrs[attr] is not None: - similarity = compute_similarity(self.attrs[attr], compare_spec.attrs[attr]) - if debug: - print(attr, self.attrs[attr], compare_spec.attrs[attr], similarity, get_similarity_weight(attr)) - total_similarity += (similarity*get_similarity_weight(attr)) - if total_similarity > 1: - total_similarity = 1 - return total_similarity - def __str__(self): out_str = "" for attr in NodeAttribute: @@ -224,7 +174,7 @@ def index_train_machine(self, machine_id, new_spec): if not new_spec.complete_info(): print("Machine info not completed: ", str(new_spec)) return -1 - covered_index, _, _ = self.get_node_type(new_spec) + covered_index = self.get_node_type(new_spec) if covered_index == -1: covered_index = 0 if len(self.node_type_index.keys()) > 0: @@ -233,34 +183,13 @@ def index_train_machine(self, machine_id, new_spec): self.node_type_index[covered_index].add_member(machine_id) return covered_index - def get_node_type(self, in_spec, loose_search=False, debug=False): + def get_node_type(self, compare_spec): if len(self.node_type_index) == 0: - return -1, -1, -1 - compare_spec = in_spec.copy() - num_of_none = get_num_of_none(compare_spec) - similarity_map, max_similarity, most_similar_index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total = self._find_candidates(in_spec, loose_search, debug) - if max_similarity == 1: - return most_similar_index, 0, 0 - if has_candidate: - # covered - candidate_score = get_candidate_score(candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total) - best_candidate_index, max_score = find_best_candidate(candidate_score) - uncertainty = compute_uncertainty(max_score, num_of_none) - return best_candidate_index, uncertainty, 0 - else: - # not covered - if loose_search: - if most_similar_index != -1: - candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total, num_of_none = self._loose_search(compare_spec, debug, similarity_map, max_similarity, most_similar_index, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total) - candidate_score = get_candidate_score(candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total) - if debug: - print("Candidate score: ", candidate_score) - most_similar_score = candidate_score[most_similar_index] - uncertainty = compute_uncertainty(most_similar_score, num_of_none) - if max_similarity != -1: - looseness = compute_looseness(max_similarity) - return most_similar_index, uncertainty, looseness - return -1, -1, -1 + return -1 + for index, node_type_spec in self.node_type_index.items(): + if node_type_spec.cover(compare_spec): + return index + return -1 def get_json(self): json_obj = dict() @@ -279,67 +208,3 @@ def copy(self): del node_collection.node_type_index[node_type] return node_collection - def _find_candidates(self, compare_spec, loose_search=False, debug=False): - """ - Returns: - - similarity_map: map from node type index to similarity value - """ - candidate_uncertain_attribute_freq = dict() - candidate_uncertain_attribute_total = dict() - most_similar_index = -1 - max_similarity = -1 - most_similar_freq = -1 - completed_info = compare_spec.complete_info() - has_candidate = False - similarity_map = dict() - for attr in NodeAttribute: - candidate_uncertain_attribute_freq[attr] = [] - candidate_uncertain_attribute_total[attr] = 0 - for index, node_type_spec in self.node_type_index.items(): - freq = node_type_spec.get_size() - if loose_search: - similarity = node_type_spec.get_similarity(compare_spec) - similarity_map[index] = similarity - if similarity > max_similarity or (similarity == max_similarity and most_similar_freq < freq): - most_similar_index = index - max_similarity = similarity - most_similar_freq = freq - if debug: - print(index, str(node_type_spec), similarity) - if node_type_spec.cover(compare_spec): - if completed_info: - return similarity_map, 1, index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total - else: - for attr in NodeAttribute: - if compare_spec.attrs[attr] is None: - candidate_uncertain_attribute_freq[attr] += [(index, freq)] - candidate_uncertain_attribute_total[attr] += freq - has_candidate = True - return similarity_map, max_similarity, most_similar_index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total - - def _loose_search(self, compare_spec, debug, similarity_map, max_similarity, most_similar_index, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total): - num_of_none = get_num_of_none(compare_spec) - most_similar_spec = self.node_type_index[most_similar_index] - # remove uncovered spec - for attr in NodeAttribute: - if compare_spec.attrs[attr] != most_similar_spec.attrs[attr]: - if debug: - print("Loosen {} ({}-->{})".format(attr, compare_spec.attrs[attr], most_similar_spec.attrs[attr])) - compare_spec.attrs[attr] = None - num_of_none += 1 - # find uncertainty - for index, node_type_spec in self.node_type_index.items(): - if node_type_spec.cover(compare_spec): - similarity = similarity_map[index] - freq = node_type_spec.get_size() - if similarity == max_similarity and freq > self.node_type_index[most_similar_index].get_size(): - if debug: - print("change most similar index from {} to {}".format(most_similar_index, index)) - most_similar_index = index - for attr in NodeAttribute: - if compare_spec.attrs[attr] is None: - candidate_uncertain_attribute_freq[attr] += [(index, freq)] - candidate_uncertain_attribute_total[attr] += freq - if most_similar_index == index: - node_type_spec.cover(compare_spec) - return candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total, num_of_none \ No newline at end of file diff --git a/src/kepler_model/util/loader.py b/src/kepler_model/util/loader.py index 88a6c93d..60133e8e 100644 --- a/src/kepler_model/util/loader.py +++ b/src/kepler_model/util/loader.py @@ -60,7 +60,7 @@ def load_json(path, name): with open(filepath) as f: res = json.load(f) return res - except Exception: + except Exception as err: return None @@ -88,16 +88,6 @@ def load_remote_pkl(url_path): except: return None -def load_remote_json(url_path): - if ".json" not in url_path: - url_path = url_path + ".json" - try: - response = urlopen(url_path) - response_data = response.read().decode('utf-8') - json_data = json.loads(response_data) - return json_data - except: - return None def load_machine_spec(data_path, machine_id): machine_spec_path = os.path.join(data_path, MACHINE_SPEC_PATH) diff --git a/src/kepler_model/util/similarity.py b/src/kepler_model/util/similarity.py deleted file mode 100644 index 96899fa0..00000000 --- a/src/kepler_model/util/similarity.py +++ /dev/null @@ -1,89 +0,0 @@ -from .train_types import NodeAttribute - -# from similarity result on SPEC data -similarity_reference = { - NodeAttribute.PROCESSOR: 0.56, - NodeAttribute.CORES: 0.76, - NodeAttribute.CHIPS: 0.52, - NodeAttribute.MEMORY: 0.8, - NodeAttribute.FREQ: 0.01, -} - -thread_related_weight = 0.76 + 0.52 + 0.12 -similarity_reference = { - NodeAttribute.PROCESSOR: 3, - NodeAttribute.CORES: 0.76/thread_related_weight, - NodeAttribute.CHIPS: 0.52/thread_related_weight, - NodeAttribute.MEMORY: 0.8, - NodeAttribute.FREQ: 0.01, -} - -similarity_total_weight = sum(similarity_reference.values()) - -def get_similarity_weight(attr): - return similarity_reference[attr]/similarity_total_weight - -def compute_jaccard_similarity(str1, str2): - if str1.lower() == str2.lower(): - return 1 - set1 = set(str1.lower()) # Convert to lowercase for case-insensitive comparison - set2 = set(str2.lower()) - - intersection = len(set1.intersection(set2)) - union = len(set1.union(set2)) - - similarity = intersection / union if union != 0 else 0 - return similarity/2 - -def compute_similarity(base, cmp): - base = float(base) - cmp = float(cmp) - if base == 0 and cmp == 0: - diff_ratio = abs(cmp-base) - else: - diff_ratio = abs(cmp-base)/((base+cmp)/2) - if diff_ratio >= 1: - return 0 - else: - return 1-diff_ratio - -def compute_looseness(similarity): - return 1-similarity - -# get_candidate_score returns certainty -def get_candidate_score(candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total): - candidate_score = dict() - for attr, candidates in candidate_uncertain_attribute_freq.items(): - total = candidate_uncertain_attribute_total[attr] - if total == 0: - # no uncertainty - continue - for candidate in candidates: - candidate_index = candidate[0] - candidate_freq = candidate[1] - if candidate_index not in candidate_score: - candidate_score[candidate_index] = 0 - candidate_score[candidate_index] += float(candidate_freq)/total - return candidate_score - -def find_best_candidate(candidate_score): - max_score = 0 - best_candidate_index = -1 - for index, score in candidate_score.items(): - if score > max_score: - best_candidate_index = index - max_score = score - return best_candidate_index, max_score - -def compute_uncertainty(max_score, num_of_none): - if num_of_none == 0: - return 0 # covered - uncertainty = 1 - max_score/num_of_none - return uncertainty - -def get_num_of_none(in_spec): - num_of_none = 0 - for attr in NodeAttribute: - if in_spec.attrs[attr] is None: - num_of_none += 1 - return num_of_none diff --git a/src/kepler_model/util/train_types.py b/src/kepler_model/util/train_types.py index ac67f2a6..64c05a6d 100644 --- a/src/kepler_model/util/train_types.py +++ b/src/kepler_model/util/train_types.py @@ -69,13 +69,6 @@ class ModelOutputType(enum.Enum): AbsPower = 1 DynPower = 2 -class NodeAttribute(str, enum.Enum): - PROCESSOR = "processor" - CORES = "cores" - CHIPS = "chips" - MEMORY = "memory" - FREQ = "frequency" - def is_support_output_type(output_type_name): return any(output_type_name == item.name for item in ModelOutputType) diff --git a/tests/README.md b/tests/README.md index 762d19aa..b27bf820 100644 --- a/tests/README.md +++ b/tests/README.md @@ -250,23 +250,6 @@ Optional arguments: - isolators: dict map of isolator class name to argument dict map (default: {"MinIdleIsolator": {}, "NoneIsolator": {}, "ProfileBackgroundIsolator": {}, "TrainIsolator": {"abs_pipeline_name": default_train_output_pipeline}}) - target_path: path to save trained ouput (default: data/offline_trainer_output) -## Server API -The test is for testing server model selection API. - -Requirements: -- server running - - ```bash - # create new model folder - mkdir -p $(pwd)/db-models - MODEL_PATH=$(pwd)/db-models model-server - ``` - -Run: -```bash -hatch run test -vvv -s ./tests/model_select_test.py -``` - # Integration Test ```bash diff --git a/tests/data/machine_spec/test b/tests/data/machine_spec/test deleted file mode 100644 index d95b7919..00000000 --- a/tests/data/machine_spec/test +++ /dev/null @@ -1 +0,0 @@ -{"processor": "intel_xeon_platinum_8259cl", "cores": 96, "chips": 2, "memory": 377, "frequency": 3500} \ No newline at end of file diff --git a/tests/model_select_test.py b/tests/model_select_test.py deleted file mode 100644 index f3dbe98c..00000000 --- a/tests/model_select_test.py +++ /dev/null @@ -1,120 +0,0 @@ -## Test Server API (model selection) - -import os -import shutil -import requests -import codecs -import json - -from kepler_model.util.train_types import FeatureGroups, FeatureGroup, ModelOutputType -from kepler_model.util.loader import base_model_url, load_remote_json, load_metadata -from kepler_model.util.saver import NODE_TYPE_INDEX_FILENAME -from kepler_model.util.config import download_path, default_pipelines -from tests.model_server_test import get_model_request_json -from kepler_model.estimate.model_server_connector import list_all_models -from kepler_model.server.model_server import MODEL_SERVER_PORT -from kepler_model.train.profiler.node_type_index import NodeTypeSpec - -TMP_FILE = 'download.zip' - -# set environment -os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' -test_energy_sources = ["rapl-sysfs"] - -def get_node_types(energy_source): - pipeline_name = default_pipelines[energy_source] - url_path = os.path.join(base_model_url, pipeline_name, NODE_TYPE_INDEX_FILENAME) - return load_remote_json(url_path) - -def make_request_with_spec(metrics, output_type, node_type=-1, weight=False, trainer_name="", energy_source='rapl-sysfs', spec=None): - model_request = get_model_request_json(metrics, output_type, node_type, weight, trainer_name, energy_source) - model_request["spec"] = spec - response = requests.post('http://localhost:{}/model'.format(MODEL_SERVER_PORT), json=model_request) - assert response.status_code == 200, response.text - if weight: - weight_dict = json.loads(response.text) - assert len(weight_dict) > 0, "weight dict must contain one or more than one component" - for weight_values in weight_dict.values(): - weight_length = len(weight_values['All_Weights']['Numerical_Variables']) - expected_length = len(metrics) - assert weight_length <= expected_length, "weight metrics should covered by the requested {} > {}".format(weight_length, expected_length) - return weight_dict["model_name"], weight_length.keys() - else: - output_path = os.path.join(download_path, output_type.name) - if os.path.exists(output_path): - shutil.rmtree(output_path) - with codecs.open(TMP_FILE, 'wb') as f: - f.write(response.content) - shutil.unpack_archive(TMP_FILE, output_path) - metadata = load_metadata(output_path) - os.remove(TMP_FILE) - return metadata["model_name"], metadata["features"] - -def check_select_model(model_name, features, best_model_map): - assert model_name != "", "model name should not be empty." - found = False - for cmp_fg_name, expected_best_model_without_node_type in best_model_map.items(): - cmp_metrics = FeatureGroups[FeatureGroup[cmp_fg_name]] - if cmp_metrics == features: - found = True - assert model_name == expected_best_model_without_node_type, f"should select best model {expected_best_model_without_node_type} (select {model_name}) - {output_type}/{fg_name}" - break - assert found, f"must found matched best model without node_type for {features}: {best_model_map}" - -def process(node_type, info, output_type, energy_source, valid_fgs, best_model_by_source): - expected_suffix = f"_{node_type}" - for fg_name in valid_fgs.keys(): - metrics = FeatureGroups[FeatureGroup[fg_name]] - model_name, features = make_request_with_spec(metrics, output_type, energy_source=energy_source) - check_select_model(model_name, features, best_model_by_source) - model_name, features = make_request_with_spec(metrics, output_type, node_type=node_type, energy_source=energy_source) - assert expected_suffix in model_name, "model must be a matched type" - check_select_model(model_name, features, valid_fgs) - model_name, features = make_request_with_spec(metrics, output_type, spec=info['attrs'], energy_source=energy_source) - assert expected_suffix in model_name, "model must be a matched type" - check_select_model(model_name, features, valid_fgs) - fixed_some_spec = {'processor': info['attrs']['processor'], 'memory': info['attrs']['memory']} - model_name, features = make_request_with_spec(metrics, output_type, spec=fixed_some_spec, energy_source=energy_source) - assert expected_suffix in model_name, "model must be a matched type" - check_select_model(model_name, features, valid_fgs) - uncovered_spec = info['attrs'].copy() - uncovered_spec['processor'] = "_".join(uncovered_spec['processor'].split("_")[:-1]) - model_name, features = make_request_with_spec(metrics, output_type, spec=uncovered_spec, energy_source=energy_source) - assert expected_suffix in model_name, "model must be a matched type" - check_select_model(model_name, features, valid_fgs) - -def test_process(): - # test getting model from server - os.environ['MODEL_SERVER_ENABLE'] = "true" - available_models = list_all_models() - assert len(available_models) > 0, "must have more than one available models" - print("Available Models:", available_models) - for energy_source in test_energy_sources: - node_types = get_node_types(energy_source) - best_model_by_source_map = list_all_models(energy_source=energy_source) - for node_type, info in node_types.items(): - available_models = list_all_models(node_type=node_type, energy_source=energy_source) - if len(available_models) > 0: - for output_type_name, valid_fgs in available_models.items(): - output_type = ModelOutputType[output_type_name] - process(node_type, info, output_type, energy_source, valid_fgs, best_model_by_source=best_model_by_source_map[output_type_name]) - else: - print(f"skip {energy_source}/{node_type} because on available models") - -def test_similarity_computation(): - for energy_source in test_energy_sources: - node_types = get_node_types(energy_source) - for node_type, info in node_types.items(): - spec = NodeTypeSpec(**info['attrs']) - for cmp_node_type, cmp_info in node_types.items(): - cmp_spec = NodeTypeSpec(**cmp_info['attrs']) - similarity = spec.get_similarity(cmp_spec, debug=True) - if node_type == cmp_node_type: - assert similarity == 1, "similarity must be one for the same type" - else: - assert similarity >= 0, "similarity must be >= 0, {}-{} ({})".format(node_type, cmp_node_type, similarity) - assert similarity <= 1, "similarity must be <= 1, {}-{} ({})".format(node_type, cmp_node_type, similarity) - -if __name__ == '__main__': - test_process() - test_similarity_computation()