Skip to content

Commit

Permalink
Merge pull request #398 from sunya-ch/server-api-rebase-patch-1
Browse files Browse the repository at this point in the history
feat: add --machine-spec arg to estimator and get_machine_spec
  • Loading branch information
sunya-ch authored Aug 26, 2024
2 parents 003c6f9 + 76cb1bb commit d53a6ba
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 49 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ run-estimator:
$(TEST_IMAGE) \
/bin/bash -c "$(PYTHON) tests/http_server.py & sleep 5 && estimator --log-level debug"

run-estimator-with-test-spec:
$(CTR_CMD) run --rm -d --platform linux/amd64 \
--name estimator \
$(TEST_IMAGE) \
/bin/bash -c "estimator --machine-spec tests/data/machine/spec.json"

run-collector-client:
$(CTR_CMD) exec estimator /bin/bash -c \
"while [ ! -S "/tmp/estimator.sock" ]; do \
Expand Down
23 changes: 15 additions & 8 deletions src/kepler_model/estimate/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kepler_model.estimate.archived_model import get_achived_model
from kepler_model.estimate.model.model import load_downloaded_model
from kepler_model.estimate.model_server_connector import is_model_server_enabled, make_request
from kepler_model.train.profiler.node_type_index import get_machine_spec
from kepler_model.util.config import SERVE_SOCKET, download_path, set_env_from_model_config
from kepler_model.util.loader import get_download_output_path
from kepler_model.util.train_types import ModelOutputType, convert_enery_source, is_output_type_supported
Expand Down Expand Up @@ -39,11 +40,9 @@ def __init__(self, metrics, values, output_type, source, system_features, system
###############################################
# serve


loaded_model = dict()


def handle_request(data: str) -> dict:
def handle_request(data: str, machine_spec=None) -> dict:
try:
power_request = json.loads(data, object_hook=lambda d: PowerRequest(**d))
except Exception as e:
Expand Down Expand Up @@ -78,7 +77,7 @@ def handle_request(data: str) -> dict:
shutil.rmtree(output_path)
if not os.path.exists(output_path):
# try connecting to model server
output_path = make_request(power_request)
output_path = make_request(power_request, machine_spec)
if output_path is None:
# find from config
output_path = get_achived_model(power_request)
Expand Down Expand Up @@ -107,8 +106,10 @@ def handle_request(data: str) -> dict:


class EstimatorServer:
def __init__(self, socket_path):
def __init__(self, socket_path, machine_spec):
self.socket_path = socket_path
self.machine_spec = machine_spec
logger.info(f"initialize EstimatorServer with spec={machine_spec}")

def start(self):
s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
Expand All @@ -134,7 +135,7 @@ def accepted(self, connection):
if shunk is None or shunk.decode()[-1] == "}":
break
decoded_data = data.decode()
y = handle_request(decoded_data)
y = handle_request(decoded_data, self.machine_spec)
response = json.dumps(y)
connection.send(response.encode())

Expand All @@ -158,15 +159,21 @@ def sig_handler(signum, frame) -> None:
default="info",
required=False,
)
def run(log_level: str):
@click.option(
"--machine-spec",
type=click.Path(exists=True),
required=False,
)
def run(log_level: str, machine_spec: str):
level = getattr(logging, log_level.upper())
logging.basicConfig(level=level)

set_env_from_model_config()
clean_socket()
signal.signal(signal.SIGTERM, sig_handler)
try:
server = EstimatorServer(SERVE_SOCKET)
spec = get_machine_spec(machine_spec)
server = EstimatorServer(SERVE_SOCKET, spec)
server.start()
finally:
click.echo("estimator exit")
Expand Down
28 changes: 11 additions & 17 deletions src/kepler_model/estimate/model_server_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,15 @@
from kepler_model.util.train_types import ModelOutputType


# discover_spec: determine node spec in json format (refer to NodeTypeSpec)
def discover_spec():
import psutil

# TODO: reuse node_type_index/generate_spec with loosen selection
cores = psutil.cpu_count(logical=True)
spec = {"cores": cores}
return spec


node_spec = discover_spec()


def make_model_request(power_request):
return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name, "spec": node_spec}
def make_model_request(power_request, machine_spec=None):
model_request = {"metrics": power_request.metrics + power_request.system_features,
"output_type": power_request.output_type,
"source": power_request.energy_source,
"filter": power_request.filter,
"trainer_name": power_request.trainer_name}
if machine_spec is not None:
model_request["spec"] = machine_spec
return model_request


TMP_FILE = "tmp.zip"
Expand All @@ -54,10 +48,10 @@ def unpack(energy_source, output_type, response, replace=True):
return output_path


def make_request(power_request):
def make_request(power_request, machine_spec):
if not is_model_server_enabled():
return None
model_request = make_model_request(power_request)
model_request = make_model_request(power_request, machine_spec)
output_type = ModelOutputType[power_request.output_type]
try:
response = requests.post(get_model_server_req_endpoint(), json=model_request)
Expand Down
67 changes: 56 additions & 11 deletions src/kepler_model/train/profiler/node_type_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
# index_collection.save()

import enum
import logging
import os
import re
import subprocess

import cpuinfo
import psutil
import pyudev

from kepler_model.util.loader import load_node_type_index
from kepler_model.util.loader import load_json, load_node_type_index
from kepler_model.util.saver import save_machine_spec, save_node_type_index

logger = logging.getLogger(__name__)

default_machine_spec_file = "/etc/kepler/models/machine/spec.json"

def rename(name: str) -> str:
name = name.replace("(R)", "")
Expand All @@ -34,6 +39,8 @@ def rename(name: str) -> str:


def format_processor(processor):
if len(processor) < 2: # brand_raw is set to "-" on some machine
return ""
return "_".join(re.sub(r"\(.*\)", "", rename(processor)).split()).replace("-", "_").lower().replace("_v", "v")


Expand All @@ -43,8 +50,7 @@ def format_vendor(vendor):

GB = 1024 * 1024 * 1024


def generate_spec(data_path, machine_id):
def discover_spec_values():
processor = ""
vendor = ""
cpu_info = cpuinfo.get_cpu_info()
Expand All @@ -55,19 +61,44 @@ def generate_spec(data_path, machine_id):
if device.get("ID_VENDOR") is not None:
vendor = format_vendor(device.get("ID_VENDOR"))
break
if vendor == "" and "vendor_id_raw" in cpu_info:
vendor = format_vendor(cpu_info["vendor_id_raw"])

cores = psutil.cpu_count(logical=True)
chips = max(1, int(subprocess.check_output('cat /proc/cpuinfo | grep "physical id" | sort -u | wc -l', shell=True)))
threads_per_core = max(1, cores // psutil.cpu_count(logical=False))
memory = psutil.virtual_memory().total
memory_gb = int(memory / GB)
freq = psutil.cpu_freq(percpu=False)
cpu_freq_mhz = round(max(freq.max, freq.current) / 100) * 100 # round to one decimal of GHz
spec_values = {"vendor": vendor, "processor": processor, "cores": cores, "chips": chips, "memory": memory_gb, "frequency": cpu_freq_mhz, "threads_per_core": threads_per_core}
spec_values = {
"vendor": vendor,
"processor": processor,
"cores": cores,
"chips": chips,
"memory": memory_gb,
"threads_per_core": threads_per_core
}
if freq is not None:
cpu_freq_mhz = round(max(freq.max, freq.current) / 100) * 100 # round to one decimal of GHz
spec_values["frequency"] = cpu_freq_mhz
return spec_values

def generate_spec(data_path, machine_id):
spec_values = discover_spec_values()
spec = NodeTypeSpec(**spec_values)
print(f"Save machine spec ({data_path}): ")
print(str(spec))
logger.info(f"Save machine spec to {data_path}/{machine_id}")
save_machine_spec(data_path, machine_id, spec)

def get_machine_spec(cmd_machine_spec_file: str):
if cmd_machine_spec_file:
spec = load_json(cmd_machine_spec_file)
if spec is not None:
return spec
if os.path.exists(default_machine_spec_file):
spec = load_json(default_machine_spec_file)
if spec is not None:
return spec
return discover_spec_values()

class NodeAttribute(str, enum.Enum):
PROCESSOR = "processor"
Expand All @@ -76,7 +107,6 @@ class NodeAttribute(str, enum.Enum):
MEMORY = "memory"
FREQ = "frequency"


def load_node_type_spec(node_type_index_json):
node_type_spec_index = dict()
if node_type_index_json is not None:
Expand All @@ -86,9 +116,18 @@ def load_node_type_spec(node_type_index_json):
node_type_spec_index[int(index)] = spec
return node_type_spec_index


no_data = None

def attr_has_value(attrs: dict, key: NodeAttribute) -> bool:
if key not in attrs:
return False
value = attrs[key]
if value != no_data and value:
if key != NodeAttribute.PROCESSOR:
if float(value) <= 0:
return False
return True
return False

# NodeTypeSpec defines spec of each node_type index
class NodeTypeSpec:
Expand All @@ -101,6 +140,13 @@ def __init__(self, **kwargs):
self.attrs[NodeAttribute.FREQ] = kwargs.get("frequency", no_data)
self.members = []

# check if all attribute is none
def is_none(self):
for key in self.attrs.keys():
if attr_has_value(self.attrs, key):
return False
return True

def load(self, json_obj):
for attr, attr_values in json_obj["attrs"].items():
self.attrs[attr] = attr_values
Expand All @@ -124,7 +170,7 @@ def cover(self, compare_spec):
if not isinstance(compare_spec, NodeTypeSpec):
return False
for attr in NodeAttribute:
if compare_spec.attrs[attr] is not None:
if attr_has_value(compare_spec.attrs, attr):
try:
# Attempt to convert values to floats
if float(self.attrs[attr]) != float(compare_spec.attrs[attr]):
Expand Down Expand Up @@ -205,4 +251,3 @@ def copy(self):
for node_type in removed_items:
del node_collection.node_type_index[node_type]
return node_collection

17 changes: 12 additions & 5 deletions src/kepler_model/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
#################################################

import os
import requests

from .loader import base_model_url, default_pipelines, default_train_output_pipeline, get_pipeline_url, get_url
from .loader import base_model_url, default_pipelines, default_train_output_pipeline, get_pipeline_url, get_url, default_init_model_name
from .train_types import FeatureGroup, ModelOutputType, is_output_type_supported

# must be writable (for shared volume mount)
Expand Down Expand Up @@ -151,10 +152,16 @@ def get_init_model_url(energy_source, output_type, model_topurl=model_topurl):
if get_energy_source(prefix) == energy_source:
modelURL = get_init_url(prefix)
print("get init url", modelURL)
url = get_url(feature_group=FeatureGroup.BPFOnly, output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl, pipeline_name=pipeline_name)
if modelURL == "" and is_output_type_supported(output_type):
print("init URL is not set, try using default URL".format())
return get_url(feature_group=FeatureGroup.BPFOnly, output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl, pipeline_name=pipeline_name)
else:
return modelURL
if energy_source in default_init_model_name:
model_name = default_init_model_name[energy_source]
modelURL = get_url(feature_group=FeatureGroup.BPFOnly, output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl, pipeline_name=pipeline_name, model_name=model_name)
if url:
response = requests.get(url)
if response.status_code== 200:
modelURL = url
print(f"init URL is not set, use {modelURL}")
return modelURL
print(f"no match config for {output_type}, {energy_source}")
return ""
18 changes: 11 additions & 7 deletions src/kepler_model/util/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ def assure_pipeline_name(pipeline_name, energy_source, nodeCollection):
default_node_type = 0
any_node_type = -1
default_feature_group = FeatureGroup.BPFOnly
# need to set as default node_type is not available in latest release DB
default_init_model_name = {
"rapl-sysfs": "GradientBoostingRegressorTrainer_1",
"acpi": "XgboostFitTrainer_109"
}


def load_json(path: str, name: str):
if name.endswith(".json") is False:
name = name + ".json"

filepath = os.path.join(path, name)
def load_json(path: str, name: str=""):
filepath = path
if name:
if name.endswith(".json") is False:
name = name + ".json"
filepath = os.path.join(path, name)
try:
with open(filepath) as f:
res = json.load(f)
Expand Down Expand Up @@ -107,7 +112,6 @@ def load_remote_pkl(url_path):
logger.error(f"failed to load pkl url {url_path}: {e}")
return None


def load_machine_spec(data_path, machine_id):
machine_spec_path = os.path.join(data_path, MACHINE_SPEC_PATH)
return load_json(machine_spec_path, machine_id)
Expand Down
1 change: 1 addition & 0 deletions tests/data/machine/spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"processor": "intel_xeon_platinum_8259cl", "cores": 96, "chips": 2, "memory": 377, "frequency": 3500}
31 changes: 30 additions & 1 deletion tests/model_server_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# model_server_test.py requires model-server to run
import codecs
import json
import os
Expand All @@ -6,6 +7,7 @@
import requests

from kepler_model.server.model_server import MODEL_SERVER_PORT
from kepler_model.train.profiler.node_type_index import NodeAttribute, attr_has_value
from kepler_model.util.config import download_path
from kepler_model.util.train_types import FeatureGroup, FeatureGroups, ModelOutputType

Expand Down Expand Up @@ -43,8 +45,31 @@ def get_models():
response = json.loads(response.text)
return response

def test_attr_has_value():
attrs = dict()
non_numerical_test_cases = {
"": False,
None: False,
"some": True
}
numerical_test_cases = {
"": False,
None: False,
"0": False,
0: False,
"-1": False,
-1: False,
"1": True,
1: True
}
for tc, value in non_numerical_test_cases.items():
attrs[NodeAttribute.PROCESSOR] = tc
assert attr_has_value(attrs, NodeAttribute.PROCESSOR) == value
for tc, value in numerical_test_cases.items():
attrs[NodeAttribute.CORES] = tc
assert attr_has_value(attrs, NodeAttribute.CORES) == value

if __name__ == "__main__":
def test_model_request():
models = get_models()
assert len(models) > 0, "more than one type of output"
for output_models in models.values():
Expand Down Expand Up @@ -75,3 +100,7 @@ def get_models():
make_request(metrics, output_type, trainer_name=trainer_name, node_type=1, weight=True)
# with acpi source
make_request(metrics, output_type, energy_source="acpi", trainer_name=trainer_name, node_type=1, weight=True)

if __name__ == "__main__":
test_attr_has_value()
test_model_request()

0 comments on commit d53a6ba

Please sign in to comment.