diff --git a/cmd/README.md b/cmd/README.md index 6f495c28..d7a09a9e 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -51,13 +51,13 @@ optional arguments: Linux: ```bash - docker run --rm -v "$(pwd)":/data --network=host quay.io/sustainable_computing_io/kepler-model-server:v0.6 query + docker run --rm -v "$(pwd)":/data --network=host quay.io/sustainable_computing_io/kepler_model_server:v0.6 query ``` mac OS: ```bash - docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 query -s http://host.docker.internal:9090 + docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 query -s http://host.docker.internal:9090 ``` output of query will be saved as `output.json` by default @@ -65,7 +65,7 @@ optional arguments: 3. Run training pipeline ```bash - docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 train -i output.json + docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 train -i output.json ``` output of trained model will be under pipeline folder `default` or can be specified by `-p` @@ -97,7 +97,7 @@ optional arguments: 4. Test estimation ```bash - docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler-model-server:v0.6 estimate -i output.json + docker run --rm -v "$(pwd)":/data quay.io/sustainable_computing_io/kepler_model_server:v0.6 estimate -i output.json ``` output will be under the folder `output`. diff --git a/cmd/main.py b/cmd/main.py index 2d13d394..ee48f092 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -326,7 +326,7 @@ def extract(args): node_level=False if ot == ModelOutputType.AbsPower: node_level=True - feature_power_data, _, _ = extractor.extract(query_results, energy_components, args.feature_group, args.energy_source, node_level=node_level) + feature_power_data, _, _, _ = extractor.extract(query_results, energy_components, args.feature_group, args.energy_source, node_level=node_level) print(feature_power_data) if args.output: feature_power_data.to_csv(args.output) @@ -376,14 +376,16 @@ def train(args): assert success, "failed to process pipeline {}".format(pipeline.name) for trainer in pipeline.trainers: if trainer.feature_group == feature_group and trainer.energy_source == energy_source: - if trainer.node_level: + if trainer.node_level and abs_data is not None: assert_train(trainer, abs_data, energy_components) - else: + elif dyn_data is not None: assert_train(trainer, dyn_data, energy_components) # save data data_saved_path = os.path.join(pipeline.path, preprocessed_data_folder) - save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.AbsPower), abs_data) - save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.DynPower), dyn_data) + if abs_data is not None: + save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.AbsPower), abs_data) + if dyn_data is not None: + save_csv(data_saved_path, get_preprocessed_data_filename(energy_source, feature_group, ModelOutputType.DynPower), dyn_data) print("=========== Train {} Summary ============".format(energy_source)) diff --git a/src/train/extractor/extractor.py b/src/train/extractor/extractor.py index ce391d9b..cf018234 100644 --- a/src/train/extractor/extractor.py +++ b/src/train/extractor/extractor.py @@ -13,7 +13,7 @@ energy_component_to_query, feature_to_query, \ pkg_id_column, container_id_cols, node_info_column from loader import default_node_type -from extract_types import container_id_colname, ratio_to_col, component_to_col, get_unit_vals +from extract_types import container_id_colname, ratio_to_col, component_to_col, get_unit_vals, accelerator_type_colname from preprocess import drop_zero_column, find_correlations # append ratio for each unit @@ -55,6 +55,7 @@ class Extractor(metaclass=ABCMeta): # - feature_power_data: dataframe of feature columns concat with power columns # - power_columns: identify power columns (labels) # - corr: correlation matrix between features and powers + # - features: updated features @abstractmethod def extract(self, query_results, feature_group): return NotImplemented @@ -70,17 +71,23 @@ def extract(self, query_results, energy_components, feature_group, energy_source # 1. compute energy different per timestamp and concat all energy component and unit power_data = self.get_power_data(query_results, energy_components, energy_source) if power_data is None: - return None, None, None + return None, None, None, None power_data = drop_zero_column(power_data, power_data.columns) power_columns = power_data.columns - features = FeatureGroups[FeatureGroup[feature_group]] + fg = FeatureGroup[feature_group] + features = FeatureGroups[fg] # 2. separate workload and system workload_features = [feature for feature in features if feature not in SYSTEM_FEATURES] system_features = [feature for feature in features if feature in SYSTEM_FEATURES] # 3. compute aggregated utilization different per timestamp and concat them - feature_data = self.get_workload_feature_data(query_results, workload_features) + if fg == FeatureGroup.AcceleratorOnly and node_level is not True: + return None, None, None, None + else: + feature_data, workload_features = self.get_workload_feature_data(query_results, workload_features) + if feature_data is None: - return None, None, None + return None, None, None, None + # join power feature_power_data = feature_data.set_index(TIMESTAMP_COL).join(power_data).sort_index().dropna() @@ -108,16 +115,18 @@ def extract(self, query_results, energy_components, feature_group, energy_source if node_info_column not in feature_power_data.columns: feature_power_data[node_info_column] = int(default_node_type) - # 6. validate input with correlation corr = find_correlations(energy_source, feature_power_data, power_columns, workload_features) # 7. apply utilization ratio to each power unit because the power unit is summation of all container utilization feature_power_data = append_ratio_for_pkg(feature_power_data, is_aggr, query_results, power_columns) - return feature_power_data, power_columns, corr + return feature_power_data, power_columns, corr, workload_features def get_workload_feature_data(self, query_results, features): feature_data = None container_df_map = dict() + accelerator_df_list=[] + cur_accelerator_features = [] + feature_to_remove =[] for feature in features: query = feature_to_query(feature) if query not in query_results: @@ -127,39 +136,61 @@ def get_workload_feature_data(self, query_results, features): print("no data in ", query) return None aggr_query_data = query_results[query].copy() - aggr_query_data.rename(columns={query: feature}, inplace=True) - aggr_query_data[container_id_colname] = aggr_query_data[container_id_cols].apply(lambda x: '/'.join(x), axis=1) - # separate for each container_id - container_id_list = pd.unique(aggr_query_data[container_id_colname]) - - for container_id in container_id_list: - container_df = aggr_query_data[aggr_query_data[container_id_colname]==container_id] - container_df = container_df.set_index([TIMESTAMP_COL])[[feature]] - container_df = container_df.sort_index() - time_diff_values = container_df.reset_index()[[TIMESTAMP_COL]].diff().values - feature_df = pd.DataFrame() - if len(container_df) > 1: - # find current value from aggregated query, dropna remove the first value - # divided by time difference - feature_df = container_df[[feature]].astype(np.float64).diff() - # if delta < 0, set to 0 (unexpected) - feature_df = feature_df/time_diff_values - feature_df = feature_df.mask(feature_df.lt(0)).ffill().fillna(0).convert_dtypes() - if container_id in container_df_map: - # previously found container - container_df_map[container_id] = pd.concat([container_df_map[container_id], feature_df], axis=1) - else: - # newly found container - container_df_map[container_id] = feature_df + + if all(col in aggr_query_data.columns for col in container_id_cols): + aggr_query_data.rename(columns={query: feature}, inplace=True) + aggr_query_data[container_id_colname] = aggr_query_data[container_id_cols].apply(lambda x: '/'.join(x), axis=1) + # separate for each container_id + container_id_list = pd.unique(aggr_query_data[container_id_colname]) + + for container_id in container_id_list: + container_df = aggr_query_data[aggr_query_data[container_id_colname]==container_id] + container_df = container_df.set_index([TIMESTAMP_COL])[[feature]] + container_df = container_df.sort_index() + time_diff_values = container_df.reset_index()[[TIMESTAMP_COL]].diff().values + feature_df = pd.DataFrame() + if len(container_df) > 1: + # find current value from aggregated query, dropna remove the first value + # divided by time difference + feature_df = container_df[[feature]].astype(np.float64).diff() + # if delta < 0, set to 0 (unexpected) + feature_df = feature_df/time_diff_values + feature_df = feature_df.mask(feature_df.lt(0)).ffill().fillna(0).convert_dtypes() + if container_id in container_df_map: + # previously found container + container_df_map[container_id] = pd.concat([container_df_map[container_id], feature_df], axis=1) + else: + # newly found container + container_df_map[container_id] = feature_df + else: + # process AcceleratorOnly fearure + tmp_data_list=[] + feature_to_remove.append(feature) + #separate based on type label + grouped = aggr_query_data.groupby([accelerator_type_colname]) + for group_name, group_data in grouped: + new_colname = "{}_{}".format(feature, group_name) + cur_accelerator_features.append(new_colname) + group_data.rename(columns={query: new_colname}, inplace=True) + group_data = group_data[[TIMESTAMP_COL, new_colname]] + group_data = group_data.groupby([TIMESTAMP_COL]).sum().sort_index() + tmp_data_list += [group_data] + accelerator_df_list += tmp_data_list + container_df_list = [] for container_id, container_df in container_df_map.items(): container_df[container_id_colname] = container_id container_df_list += [container_df] - feature_data = pd.concat(container_df_list) + + sum_df_list = container_df_list + accelerator_df_list + feature_data = pd.concat(sum_df_list) # fill empty timestamp feature_data.fillna(0, inplace=True) + # update feature + if len(feature_to_remove) !=0: + features = self.process_feature(features, feature_to_remove, cur_accelerator_features) # return with reset index for later aggregation - return feature_data.reset_index() + return feature_data.reset_index(), features def get_system_feature_data(self, query_results, features): feature_data_list = [] @@ -239,3 +270,10 @@ def get_node_types(self, query_results): print("No Node Info") return None, None return pd.unique(node_info_data[node_info_column]), node_info_data + + def process_feature(self, features, feature_to_remove, feature_to_add): + new_features =[] + for feature in features: + if feature not in feature_to_remove: + new_features.append(feature) + return new_features + feature_to_add \ No newline at end of file diff --git a/src/train/extractor/smooth_extractor.py b/src/train/extractor/smooth_extractor.py index 8bcfa4b2..1b9fb34a 100644 --- a/src/train/extractor/smooth_extractor.py +++ b/src/train/extractor/smooth_extractor.py @@ -14,7 +14,7 @@ def __init__(self, smooth_window=30): # implement extract function def extract(self, query_results, energy_components, feature_group, energy_source, node_level, aggr=True): - feature_power_data, power_columns, _ = super().extract(query_results, energy_components, feature_group, energy_source, node_level, aggr) + feature_power_data, power_columns, _, features = super().extract(query_results, energy_components, feature_group, energy_source, node_level, aggr) features = FeatureGroups[FeatureGroup[feature_group]] smoothed_data = feature_power_data.copy() @@ -26,5 +26,5 @@ def extract(self, query_results, energy_components, feature_group, energy_source corr = find_correlations(energy_source, feature_power_data, power_columns, workload_features) - return smoothed_data, power_columns, corr + return smoothed_data, power_columns, corr, features diff --git a/src/train/pipeline.py b/src/train/pipeline.py index 9f42f093..9c5b6762 100644 --- a/src/train/pipeline.py +++ b/src/train/pipeline.py @@ -51,13 +51,22 @@ def __init__(self, name, trainers, extractor, isolator): self.metadata["dyn_trainers"] = [trainer.__class__.__name__ for trainer in trainers if not trainer.node_level] def get_abs_data(self, query_results, energy_components, feature_group, energy_source, aggr): - extracted_data, power_labels, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr) + extracted_data, power_labels, _, features = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr) + self.process_accelerator_feature(features) return extracted_data, power_labels def get_dyn_data(self, query_results, energy_components, feature_group, energy_source): - extracted_data, power_labels, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=False) + extracted_data, power_labels, _, features = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=False) + if extracted_data is None or power_labels is None: + return None + self.process_accelerator_feature(features) isolated_data = self.isolator.isolate(extracted_data, label_cols=power_labels, energy_source=energy_source) - return isolated_data + return isolated_data + + def process_accelerator_feature(self, features): + if features is not None and len(features) != 0: + for trainer in self.trainers: + trainer.features = features def prepare_data(self, input_query_results, energy_components, energy_source, feature_group, aggr=True): query_results = input_query_results.copy() @@ -115,10 +124,10 @@ def _train(self, abs_data, dyn_data, power_labels, energy_source, feature_group) continue if trainer.energy_source != energy_source: continue - if trainer.node_level: + if trainer.node_level and abs_data is not None: future = executor.submit(run_train, trainer, abs_data, power_labels, pipeline_lock=self.lock) futures += [future] - else: + elif dyn_data is not None: future = executor.submit(run_train, trainer, dyn_data, power_labels, pipeline_lock=self.lock) futures += [future] self.print_log('Waiting for {} trainers to complete...'.format(len(futures))) @@ -129,7 +138,7 @@ def _train(self, abs_data, dyn_data, power_labels, energy_source, feature_group) def process(self, input_query_results, energy_components, energy_source, feature_group, aggr=True): self.print_log("{} start processing.".format(feature_group)) abs_data, dyn_data, power_labels = self.prepare_data(input_query_results, energy_components, energy_source, feature_group, aggr) - if abs_data is None or dyn_data is None: + if abs_data is None and dyn_data is None: return False, None, None self._train(abs_data, dyn_data, power_labels, energy_source, feature_group) self.print_pipeline_process_end(energy_source, feature_group, abs_data, dyn_data) @@ -137,8 +146,8 @@ def process(self, input_query_results, energy_components, energy_source, feature def process_multiple_query(self, input_query_results_list, energy_components, energy_source, feature_group, aggr=True): abs_data, dyn_data, power_labels = self.prepare_data_from_input_list(input_query_results_list, energy_components, energy_source, feature_group, aggr) - if abs_data is None or dyn_data is None or len(abs_data) == 0 or len(dyn_data) == 0: - return False, None, None + if (abs_data is None or len(abs_data) == 0) and (dyn_data is None or len(dyn_data) == 0): + return False, None, None self._train(abs_data, dyn_data, power_labels, energy_source, feature_group) self.print_pipeline_process_end(energy_source, feature_group, abs_data, dyn_data) return True, abs_data, dyn_data @@ -154,31 +163,38 @@ def save_metadata(self): save_pipeline_metadata(self.path, self.metadata, energy_source, model_type, metadata_df) def print_pipeline_process_end(self, energy_source, feature_group, abs_data, dyn_data): - abs_trainer_names = set([trainer.__class__.__name__ for trainer in self.trainers if trainer.node_level]) - dyn_trainer_names = set([trainer.__class__.__name__ for trainer in self.trainers if not trainer.node_level]) - - abs_metadata_df, abs_group_path = get_metadata_df(model_toppath, ModelOutputType.AbsPower.name, feature_group, energy_source, self.name) - dyn_metadata_df, dyn_group_path = get_metadata_df(model_toppath, ModelOutputType.DynPower.name, feature_group, energy_source, self.name) - - abs_min_mae = -1 if len(abs_metadata_df) == 0 else abs_metadata_df.loc[abs_metadata_df[ERROR_KEY].idxmin()][ERROR_KEY] - dyn_min_mae = -1 if len(dyn_metadata_df) == 0 else dyn_metadata_df.loc[dyn_metadata_df[ERROR_KEY].idxmin()][ERROR_KEY] - - messages = [ - "Pipeline {} has finished for modeling {} power by {} feature".format(self.name, energy_source, feature_group), - " Extractor: {}".format(self.metadata["extractor"]), - " Isolator: {}".format(self.metadata["isolator"]), - "Absolute Power Modeling:", - " Input data size: {}".format(len(abs_data)), - " Model Trainers: {}".format(abs_trainer_names), - " Output: {}".format(abs_group_path), - " Min {}: {}".format(ERROR_KEY, abs_min_mae), - " ", - "Dynamic Power Modeling:", - " Input data size: {}".format(len(dyn_data)), - " Model Trainers: {}".format(dyn_trainer_names), - " Output: {}".format(dyn_group_path), - " Min {}: {}".format(ERROR_KEY, dyn_min_mae), - ] + abs_messages = [] + dyn_messages = [] + if abs_data is not None: + abs_trainer_names = set([trainer.__class__.__name__ for trainer in self.trainers if trainer.node_level]) + abs_metadata_df, abs_group_path = get_metadata_df(model_toppath, ModelOutputType.AbsPower.name, feature_group, energy_source, self.name) + abs_min_mae = -1 if len(abs_metadata_df) == 0 else abs_metadata_df.loc[abs_metadata_df[ERROR_KEY].idxmin()][ERROR_KEY] + abs_messages = [ + "Pipeline {} has finished for modeling {} power by {} feature".format(self.name, energy_source, feature_group), + " Extractor: {}".format(self.metadata["extractor"]), + " Isolator: {}".format(self.metadata["isolator"]), + "Absolute Power Modeling:", + " Input data size: {}".format(len(abs_data)), + " Model Trainers: {}".format(abs_trainer_names), + " Output: {}".format(abs_group_path), + " Min {}: {}".format(ERROR_KEY, abs_min_mae), + " " + ] + + if dyn_data is not None: + dyn_trainer_names = set([trainer.__class__.__name__ for trainer in self.trainers if not trainer.node_level]) + dyn_metadata_df, dyn_group_path = get_metadata_df(model_toppath, ModelOutputType.DynPower.name, feature_group, energy_source, self.name) + dyn_min_mae = -1 if len(dyn_metadata_df) == 0 else dyn_metadata_df.loc[dyn_metadata_df[ERROR_KEY].idxmin()][ERROR_KEY] + + dyn_messages = [ + "Dynamic Power Modeling:", + " Input data size: {}".format(len(dyn_data)), + " Model Trainers: {}".format(dyn_trainer_names), + " Output: {}".format(dyn_group_path), + " Min {}: {}".format(ERROR_KEY, dyn_min_mae), + ] + + messages = abs_messages + dyn_messages print_bounded_multiline_message(messages) def archive_pipeline(self): diff --git a/src/train/trainer/XGBoostTrainer/main.py b/src/train/trainer/XGBoostTrainer/main.py index 5cb57438..f91d444f 100644 --- a/src/train/trainer/XGBoostTrainer/main.py +++ b/src/train/trainer/XGBoostTrainer/main.py @@ -80,7 +80,7 @@ def train(self, prom_client=None, refined_results=None) -> None: # results can be used directly by extractor.py extractor = DefaultExtractor() # Train all models with extractor - extracted_data, _ = extractor.extract(results, self.energy_components_labels, self.feature_group.name, self.energy_source, node_level=self.node_level) + extracted_data, _, _, _ = extractor.extract(results, self.energy_components_labels, self.feature_group.name, self.energy_source, node_level=self.node_level) if extracted_data is not None: clean_df = self._generate_clean_model_training_data(extracted_data) diff --git a/src/util/extract_types.py b/src/util/extract_types.py index 44db7060..d8ccc902 100644 --- a/src/util/extract_types.py +++ b/src/util/extract_types.py @@ -5,6 +5,7 @@ container_id_colname = "id" all_container_key = "all containers" +accelerator_type_colname = "type" node_level_index = [TIMESTAMP_COL] pkg_level_index = [TIMESTAMP_COL, pkg_id_column] diff --git a/src/util/prom_types.py b/src/util/prom_types.py index 65ad9ce1..f3254eaf 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -45,6 +45,8 @@ def get_energy_unit(component): def feature_to_query(feature): if feature in SYSTEM_FEATURES: return "{}_{}".format(node_query_prefix, feature) + if feature in FeatureGroups[FeatureGroup.AcceleratorOnly]: + return "{}_{}".format(node_query_prefix, feature) return "{}_{}_{}".format(container_query_prefix, feature, container_query_suffix) def energy_component_to_query(component): diff --git a/src/util/train_types.py b/src/util/train_types.py index ea22631f..7f8b3cf6 100644 --- a/src/util/train_types.py +++ b/src/util/train_types.py @@ -19,7 +19,8 @@ BPF_FEATURES = ["bpf_cpu_time_us"] IRQ_FEATURES = ["bpf_block_irq", "bpf_net_rx_irq", "bpf_net_tx_irq"] KUBELET_FEATURES =['kubelet_memory_bytes', 'kubelet_cpu_usage'] -WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + KUBELET_FEATURES +ACCELERATE_FEATURES = ['accelerator_intel_qat'] +WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + KUBELET_FEATURES + ACCELERATE_FEATURES BASIC_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + KUBELET_FEATURES PowerSourceMap = { @@ -38,17 +39,18 @@ } class FeatureGroup(enum.Enum): - Full = 1 - WorkloadOnly = 2 - CounterOnly = 3 - CgroupOnly = 4 - BPFOnly = 5 - KubeletOnly = 6 - IRQOnly = 7 - CounterIRQCombined = 8 - Basic = 9 - BPFIRQ = 10 - Unknown = 99 + Full = 1 + WorkloadOnly = 2 + CounterOnly = 3 + CgroupOnly = 4 + BPFOnly = 5 + KubeletOnly = 6 + IRQOnly = 7 + CounterIRQCombined = 8 + Basic = 9 + BPFIRQ = 10 + AcceleratorOnly = 11 + Unknown = 99 class EnergyComponentLabelGroup(enum.Enum): PackageEnergyComponentOnly = 1 @@ -79,6 +81,7 @@ def deep_sort(elements): FeatureGroup.BPFIRQ: deep_sort(BPF_FEATURES + IRQ_FEATURES), FeatureGroup.CounterIRQCombined: deep_sort(COUNTER_FEAUTRES + IRQ_FEATURES), FeatureGroup.Basic: deep_sort(BASIC_FEATURES), + FeatureGroup.AcceleratorOnly: deep_sort(ACCELERATE_FEATURES), } # XGBoostRegressionTrainType diff --git a/tests/estimator_model_test.py b/tests/estimator_model_test.py index d7fa10fc..271b3193 100644 --- a/tests/estimator_model_test.py +++ b/tests/estimator_model_test.py @@ -22,7 +22,7 @@ from isolator_test import test_isolators, get_isolate_results, isolator_output_path from extractor_test import test_extractors, get_extract_results, test_energy_source, get_expected_power_columns, extractor_output_path -# extract_result, power_columns, corr = extractor.extract(query_results, energy_components, feature_group, energy_source, node_level) +# extract_result, power_columns, corr, features = extractor.extract(query_results, energy_components, feature_group, energy_source, node_level) # test_data_with_label = extract_result // abs # test_data_with_label = isolator.isolate(extract_result, _energy_source, label_cols=power_columns) // dyn def test_model(group_path, model_name, test_data_with_label, power_columns, power_range = None): diff --git a/tests/extractor_test.py b/tests/extractor_test.py index 39125d36..17c3f561 100644 --- a/tests/extractor_test.py +++ b/tests/extractor_test.py @@ -82,10 +82,10 @@ def process(query_results, feature_group, save_path=extractor_output_path, custo for extractor_name in customize_extractors: test_extractors += [load_class("extractor", extractor_name)] for test_instance in test_extractors: - extracted_data, power_columns, corr = test_instance.extract(query_results, energy_components, feature_group, energy_source, node_level=True) + extracted_data, power_columns, corr, _ = test_instance.extract(query_results, energy_components, feature_group, energy_source, node_level=True) assert_extract(extracted_data, power_columns, energy_components, num_of_unit, feature_group) save_extract_results(test_instance, feature_group, extracted_data, True, save_path=save_path) - extracted_data, power_columns, corr = test_instance.extract(query_results, energy_components, feature_group, energy_source, node_level=False) + extracted_data, power_columns, corr, _ = test_instance.extract(query_results, energy_components, feature_group, energy_source, node_level=False) assert_extract(extracted_data, power_columns, energy_components, num_of_unit, feature_group) save_extract_results(test_instance, feature_group, extracted_data, False, save_path=save_path) print("Correlations:\n") diff --git a/tests/model_tester.py b/tests/model_tester.py index 8dfa7a77..90a6d13c 100644 --- a/tests/model_tester.py +++ b/tests/model_tester.py @@ -79,7 +79,7 @@ def process(train_dataset_name, test_dataset_name, test_json_file_path, test_idl for model_path in model_paths: model = load_model(model_path) energy_components = PowerSourceMap[energy_source] - extracted_data, power_columns = extractor.extract(test_data, energy_components, feature_group, energy_source, node_level=False) + extracted_data, power_columns, _, _ = extractor.extract(test_data, energy_components, feature_group, energy_source, node_level=False) feature_columns = [col for col in extracted_data.columns if col not in power_columns] if not model.feature_check(feature_columns): print("model {} ({}/{}/{})is not valid to test".format(model.name, energy_source, output_type.name, feature_group)) diff --git a/tests/xgboost_test.py b/tests/xgboost_test.py index f6ecab99..c1842f7e 100644 --- a/tests/xgboost_test.py +++ b/tests/xgboost_test.py @@ -40,7 +40,7 @@ def read_sample_query_results(): query_results = read_sample_query_results() assert len(query_results) > 0, "cannot read_sample_query_results" instance = DefaultExtractor() - extracted_data, power_columns = instance.extract(query_results, energy_components, feature_group, energy_source, node_level=True) + extracted_data, power_columns, _, _ = instance.extract(query_results, energy_components, feature_group, energy_source, node_level=True) xgb_container_level_pipeline_kfold = XGBoostRegressionStandalonePipeline(XGBoostRegressionTrainType.KFoldCrossValidation, "test_models/XGBoost/", node_level=True) xgb_node_pipeline_kfold = XGBoostRegressionStandalonePipeline(XGBoostRegressionTrainType.KFoldCrossValidation, "test_models/XGBoost/", node_level=False) xgb_container_level_pipeline_tts = XGBoostRegressionStandalonePipeline(XGBoostRegressionTrainType.TrainTestSplitFit, "test_models/XGBoost/", node_level=False)