Skip to content

Commit

Permalink
Merge pull request #135 from Yanbo0101/accelerator
Browse files Browse the repository at this point in the history
Add accelerator feature
  • Loading branch information
sunya-ch authored Sep 1, 2023
2 parents 055eaf0 + 6552bb5 commit ab28e10
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 95 deletions.
8 changes: 4 additions & 4 deletions cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ optional arguments:
Linux:

```bash
docker run --rm -v "$(pwd)":/data --network=host quay.io/sustainable_computing_io/kepler-model-server:v0.6 query
docker run --rm -v "$(pwd)":/data --network=host quay.io/sustainable_computing_io/kepler_model_server:v0.6 query
```

mac OS:

```bash
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 query -s http://host.docker.internal:9090
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 query -s http://host.docker.internal:9090
```

output of query will be saved as `output.json` by default

3. Run training pipeline

```bash
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 train -i output.json
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 train -i output.json
```

output of trained model will be under pipeline folder `default` or can be specified by `-p`
Expand Down Expand Up @@ -97,7 +97,7 @@ optional arguments:
4. Test estimation

```bash
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 estimate -i output.json
docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 estimate -i output.json
```

output will be under the folder `output`.
Expand Down
12 changes: 7 additions & 5 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def extract(args):
node_level=False
if ot == ModelOutputType.AbsPower:
node_level=True
feature_power_data, _, _ = extractor.extract(query_results, energy_components, args.feature_group, args.energy_source, node_level=node_level)
feature_power_data, _, _, _ = extractor.extract(query_results, energy_components, args.feature_group, args.energy_source, node_level=node_level)
print(feature_power_data)
if args.output:
feature_power_data.to_csv(args.output)
Expand Down Expand Up @@ -376,14 +376,16 @@ def train(args):
assert success, "failed to process pipeline {}".format(pipeline.name)
for trainer in pipeline.trainers:
if trainer.feature_group == feature_group and trainer.energy_source == energy_source:
if trainer.node_level:
if trainer.node_level and abs_data is not None:
assert_train(trainer, abs_data, energy_components)
else:
elif dyn_data is not None:
assert_train(trainer, dyn_data, energy_components)
# save data
data_saved_path = os.path.join(pipeline.path, preprocessed_data_folder)
save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.AbsPower), abs_data)
save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.DynPower), dyn_data)
if abs_data is not None:
save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.AbsPower), abs_data)
if dyn_data is not None:
save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.DynPower), dyn_data)


print("=========== Train {} Summary ============".format(energy_source))
Expand Down
104 changes: 71 additions & 33 deletions src/train/extractor/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
energy_component_to_query, feature_to_query, \
pkg_id_column, container_id_cols, node_info_column
from loader import default_node_type
from extract_types import container_id_colname, ratio_to_col, component_to_col, get_unit_vals
from extract_types import container_id_colname, ratio_to_col, component_to_col, get_unit_vals, accelerator_type_colname
from preprocess import drop_zero_column, find_correlations

# append ratio for each unit
Expand Down Expand Up @@ -55,6 +55,7 @@ class Extractor(metaclass=ABCMeta):
# - feature_power_data: dataframe of feature columns concat with power columns
# - power_columns: identify power columns (labels)
# - corr: correlation matrix between features and powers
# - features: updated features
@abstractmethod
def extract(self, query_results, feature_group):
return NotImplemented
Expand All @@ -70,17 +71,23 @@ def extract(self, query_results, energy_components, feature_group, energy_source
# 1. compute energy different per timestamp and concat all energy component and unit
power_data = self.get_power_data(query_results, energy_components, energy_source)
if power_data is None:
return None, None, None
return None, None, None, None
power_data = drop_zero_column(power_data, power_data.columns)
power_columns = power_data.columns
features = FeatureGroups[FeatureGroup[feature_group]]
fg = FeatureGroup[feature_group]
features = FeatureGroups[fg]
# 2. separate workload and system
workload_features = [feature for feature in features if feature not in SYSTEM_FEATURES]
system_features = [feature for feature in features if feature in SYSTEM_FEATURES]
# 3. compute aggregated utilization different per timestamp and concat them
feature_data = self.get_workload_feature_data(query_results, workload_features)
if fg == FeatureGroup.AcceleratorOnly and node_level is not True:
return None, None, None, None
else:
feature_data, workload_features = self.get_workload_feature_data(query_results, workload_features)

if feature_data is None:
return None, None, None
return None, None, None, None

# join power
feature_power_data = feature_data.set_index(TIMESTAMP_COL).join(power_data).sort_index().dropna()

Expand Down Expand Up @@ -108,16 +115,18 @@ def extract(self, query_results, energy_components, feature_group, energy_source
if node_info_column not in feature_power_data.columns:
feature_power_data[node_info_column] = int(default_node_type)


# 6. validate input with correlation
corr = find_correlations(energy_source, feature_power_data, power_columns, workload_features)
# 7. apply utilization ratio to each power unit because the power unit is summation of all container utilization
feature_power_data = append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_columns)
return feature_power_data, power_columns, corr
return feature_power_data, power_columns, corr, workload_features

def get_workload_feature_data(self, query_results, features):
feature_data = None
container_df_map = dict()
accelerator_df_list=[]
cur_accelerator_features = []
feature_to_remove =[]
for feature in features:
query = feature_to_query(feature)
if query not in query_results:
Expand All @@ -127,39 +136,61 @@ def get_workload_feature_data(self, query_results, features):
print("no data in ", query)
return None
aggr_query_data = query_results[query].copy()
aggr_query_data.rename(columns={query: feature}, inplace=True)
aggr_query_data[container_id_colname] = aggr_query_data[container_id_cols].apply(lambda x: '/'.join(x), axis=1)
# separate for each container_id
container_id_list = pd.unique(aggr_query_data[container_id_colname])

for container_id in container_id_list:
container_df = aggr_query_data[aggr_query_data[container_id_colname]==container_id]
container_df = container_df.set_index([TIMESTAMP_COL])[[feature]]
container_df = container_df.sort_index()
time_diff_values = container_df.reset_index()[[TIMESTAMP_COL]].diff().values
feature_df = pd.DataFrame()
if len(container_df) > 1:
# find current value from aggregated query, dropna remove the first value
# divided by time difference
feature_df = container_df[[feature]].astype(np.float64).diff()
# if delta < 0, set to 0 (unexpected)
feature_df = feature_df/time_diff_values
feature_df = feature_df.mask(feature_df.lt(0)).ffill().fillna(0).convert_dtypes()
if container_id in container_df_map:
# previously found container
container_df_map[container_id] = pd.concat([container_df_map[container_id], feature_df], axis=1)
else:
# newly found container
container_df_map[container_id] = feature_df

if all(col in aggr_query_data.columns for col in container_id_cols):
aggr_query_data.rename(columns={query: feature}, inplace=True)
aggr_query_data[container_id_colname] = aggr_query_data[container_id_cols].apply(lambda x: '/'.join(x), axis=1)
# separate for each container_id
container_id_list = pd.unique(aggr_query_data[container_id_colname])

for container_id in container_id_list:
container_df = aggr_query_data[aggr_query_data[container_id_colname]==container_id]
container_df = container_df.set_index([TIMESTAMP_COL])[[feature]]
container_df = container_df.sort_index()
time_diff_values = container_df.reset_index()[[TIMESTAMP_COL]].diff().values
feature_df = pd.DataFrame()
if len(container_df) > 1:
# find current value from aggregated query, dropna remove the first value
# divided by time difference
feature_df = container_df[[feature]].astype(np.float64).diff()
# if delta < 0, set to 0 (unexpected)
feature_df = feature_df/time_diff_values
feature_df = feature_df.mask(feature_df.lt(0)).ffill().fillna(0).convert_dtypes()
if container_id in container_df_map:
# previously found container
container_df_map[container_id] = pd.concat([container_df_map[container_id], feature_df], axis=1)
else:
# newly found container
container_df_map[container_id] = feature_df
else:
# process AcceleratorOnly fearure
tmp_data_list=[]
feature_to_remove.append(feature)
#separate based on type label
grouped = aggr_query_data.groupby([accelerator_type_colname])
for group_name, group_data in grouped:
new_colname = "{}_{}".format(feature, group_name)
cur_accelerator_features.append(new_colname)
group_data.rename(columns={query: new_colname}, inplace=True)
group_data = group_data[[TIMESTAMP_COL, new_colname]]
group_data = group_data.groupby([TIMESTAMP_COL]).sum().sort_index()
tmp_data_list += [group_data]
accelerator_df_list += tmp_data_list

container_df_list = []
for container_id, container_df in container_df_map.items():
container_df[container_id_colname] = container_id
container_df_list += [container_df]
feature_data = pd.concat(container_df_list)

sum_df_list = container_df_list + accelerator_df_list
feature_data = pd.concat(sum_df_list)
# fill empty timestamp
feature_data.fillna(0, inplace=True)
# update feature
if len(feature_to_remove) !=0:
features = self.process_feature(features, feature_to_remove, cur_accelerator_features)
# return with reset index for later aggregation
return feature_data.reset_index()
return feature_data.reset_index(), features

def get_system_feature_data(self, query_results, features):
feature_data_list = []
Expand Down Expand Up @@ -239,3 +270,10 @@ def get_node_types(self, query_results):
print("No Node Info")
return None, None
return pd.unique(node_info_data[node_info_column]), node_info_data

def process_feature(self, features, feature_to_remove, feature_to_add):
new_features =[]
for feature in features:
if feature not in feature_to_remove:
new_features.append(feature)
return new_features + feature_to_add
4 changes: 2 additions & 2 deletions src/train/extractor/smooth_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, smooth_window=30):

# implement extract function
def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True):
feature_power_data, power_columns, _ = super().extract(query_results, energy_components, feature_group, energy_source, node_level, aggr)
feature_power_data, power_columns, _, features = super().extract(query_results, energy_components, feature_group, energy_source, node_level, aggr)

features = FeatureGroups[FeatureGroup[feature_group]]
smoothed_data = feature_power_data.copy()
Expand All @@ -26,5 +26,5 @@ def extract(self, query_results, energy_components, feature_group, energy_source

corr = find_correlations(energy_source, feature_power_data, power_columns, workload_features)

return smoothed_data, power_columns, corr
return smoothed_data, power_columns, corr, features

Loading

0 comments on commit ab28e10

Please sign in to comment.