From 1be64a23f6db8b8f77c6a9a2b88e45838865b850 Mon Sep 17 00:00:00 2001 From: a-baur Date: Tue, 19 Mar 2024 14:14:07 +0100 Subject: [PATCH 1/3] added remote predicate refinement --- src/handlers/{psp_mapper.py => formula.py} | 36 ++++---- src/handlers/formula_handler.py | 51 ------------ src/handlers/specification.py | 76 +++++++++++++++++ src/logic_endpoints.py | 37 ++++++--- src/main.py | 2 +- src/mtl_evaluation/mtl_evaluator.py | 43 ++-------- src/mtl_evaluation/mtl_refinement.py | 95 +++++++++++++++++----- 7 files changed, 207 insertions(+), 133 deletions(-) rename src/handlers/{psp_mapper.py => formula.py} (89%) delete mode 100644 src/handlers/formula_handler.py create mode 100644 src/handlers/specification.py diff --git a/src/handlers/psp_mapper.py b/src/handlers/formula.py similarity index 89% rename from src/handlers/psp_mapper.py rename to src/handlers/formula.py index ffa5f02..3c25f33 100644 --- a/src/handlers/psp_mapper.py +++ b/src/handlers/formula.py @@ -1,10 +1,7 @@ -import re -from numpy import mat -from soupsieve import match from handlers.psp_patterns import * -class FormulaMapper: +class Formula: """A class responsible for the mapping of PSPs to past-MTL formulas""" def __init__(self, formula=None, formula_params=None, source_pattern=None): @@ -12,7 +9,7 @@ def __init__(self, formula=None, formula_params=None, source_pattern=None): self._formula_params = formula_params or {} self.source_pattern = source_pattern - def get_formula(self): + def to_string(self): return self._formula.format(**self._formula_params) def get_lower_bound(self): @@ -38,11 +35,11 @@ def set_upper_bound(self, upper_bound): self._formula_params["upper_bound"] = upper_bound @staticmethod - def map_pattern(input_pattern): - return FormulaMapper.from_psp(input_pattern).get_formula() + def map_pattern(input_pattern) -> str: + return Formula.from_psp(input_pattern).to_string() @staticmethod - def extract_list_predicates(input_pattern): + def extract_list_predicates(input_pattern) -> list: """ Extracts the list of predicates from the input pattern @@ -58,7 +55,19 @@ def extract_list_predicates(input_pattern): return list_predicates @staticmethod - def from_tbv(input_pattern) -> "FormulaMapper": + def from_specification(specification) -> "Formula": + """ + Create specification from specification dictionary. + """ + if specification["specification_type"] == "psp": + return Formula.from_psp(specification["specification"]) + elif specification["specification_type"] == "tbv": + return Formula.from_tbv(specification["specification"]) + else: + return Formula(specification["specification"]) + + @staticmethod + def from_tbv(input_pattern) -> "Formula": """ maps a pattern regex to the past-MTL formula """ @@ -76,17 +85,16 @@ def from_tbv(input_pattern) -> "FormulaMapper": formula = input_pattern formula_params = {} - return FormulaMapper(formula, formula_params, input_pattern) - + return Formula(formula, formula_params, input_pattern) @staticmethod - def from_psp(input_pattern) -> "FormulaMapper": + def from_psp(input_pattern) -> "Formula": """ maps a pattern regex to the past-MTL formula :return: the past-MTL formula """ - list_predicates = FormulaMapper.extract_list_predicates(input_pattern) + list_predicates = Formula.extract_list_predicates(input_pattern) if re.match(absence_after_q, input_pattern): formula = "always((once({pred1})) -> ((not {pred2}) since {pred1}))" @@ -230,4 +238,4 @@ def from_psp(input_pattern) -> "FormulaMapper": else: raise Exception("PSP pattern could not be matched!") - return FormulaMapper(formula, formula_params, input_pattern) + return Formula(formula, formula_params, input_pattern) diff --git a/src/handlers/formula_handler.py b/src/handlers/formula_handler.py deleted file mode 100644 index a560b55..0000000 --- a/src/handlers/formula_handler.py +++ /dev/null @@ -1,51 +0,0 @@ - -from handlers.psp_mapper import FormulaMapper -from handlers.predicate_functions import Predicates - - -class FormulaHandler(): - - def get_params_string(self, formula_info): - param_counter = 0 - params_string = "" - for predicate in formula_info["predicates_info"]: - if (param_counter == 0): - - if ('boolean' in predicate['predicate_logic'] or 'trend' in predicate['predicate_logic']): - params_string = params_string + \ - predicate['predicate_name'] + '=' + \ - "predicate_functions.Predicates()." + \ - predicate['predicate_logic'] + "" - - else: - params_string = params_string + \ - predicate['predicate_name'] + '=' + \ - "predicate_functions.Predicates(" + predicate['predicate_comparison_value'] + \ - ")." + predicate['predicate_logic'] + "" - else: - - if ('boolean' in predicate['predicate_logic'] or "trend" in predicate['predicate_logic']): - params_string = params_string + "," + \ - predicate['predicate_name'] + '=' + \ - "predicate_functions.Predicates()." + \ - predicate['predicate_logic'] + "" - - else: - params_string = params_string + "," + \ - predicate['predicate_name'] + '=' + \ - "predicate_functions.Predicates(" + \ - predicate['predicate_comparison_value'] + \ - ")." + predicate['predicate_logic'] + "" - - param_counter = param_counter + 1 - - return params_string - - def handle_formula(self, formula_info): - formula = formula_info['specification'] - if formula_info['specification_type'] == 'psp': - formula = FormulaMapper.from_psp(formula).get_formula() - if formula_info['specification_type'] == 'tbv': - formula = FormulaMapper.from_tbv(formula).get_formula() - params_string = self.get_params_string(formula_info) - return formula, params_string diff --git a/src/handlers/specification.py b/src/handlers/specification.py new file mode 100644 index 0000000..0a635a4 --- /dev/null +++ b/src/handlers/specification.py @@ -0,0 +1,76 @@ +from typing import Dict, Callable, Union + +from handlers.formula import Formula +from handlers.predicate_functions import Predicates + + +class Specification: + """ + A class containing all the necessary information for evaluating an MTL formula. + """ + + def __init__(self, formula_info): + """ + Create a specification from a formula_info dictionary. + + :param formula_info: The formula_info dictionary. + """ + self._formula_info = formula_info + self.formula = Formula.from_specification(formula_info) + + self.predicates = {} + for pred in self._formula_info["predicates_info"]: + comparison_value = pred.get("predicate_comparison_value", None) + if str.isnumeric(comparison_value): + comparison_value = float(comparison_value) + if comparison_value in ["True", "False"]: + comparison_value = comparison_value == "True" + + self.predicates[pred["predicate_name"]] = { + "predicate_logic": pred["predicate_logic"], + "predicate_comparison_value": comparison_value, + } + self.params = self.get_params() + + def get_predicate_logic(self, predicate_name) -> str: + return self.predicates[predicate_name]["predicate_logic"] + + def set_predicate_logic(self, predicate_name, predicate_logic) -> None: + self.predicates[predicate_name]["predicate_logic"] = predicate_logic + + def get_predicate_comparison_value(self, predicate_name) -> Union[float, bool, str]: + return self.predicates[predicate_name]["predicate_comparison_value"] + + def set_predicate_comparison_value(self, predicate_name, predicate_comparison_value) -> None: + self.predicates[predicate_name]["predicate_comparison_value"] = predicate_comparison_value + + def get_predicate_function(self, predicate_name) -> Callable: + """ + Get the predicate function for a predicate. + """ + return Predicates(self.predicates[predicate_name]["predicate_comparison_value"]).__getattribute__( + self.predicates[predicate_name]["predicate_logic"]) + + def get_params_string(self) -> str: + """ + Create parameter string for the formula. + + Format: "Predicate '' is set to ; ..." + + :return: The parameter string. + """ + params = [] + for pred_name, pred_info in self.predicates.items(): + params.append(f"Predicate '{pred_name}' is set to {pred_info['predicate_comparison_value']}") + return "; ".join(params) + + def get_params(self) -> Dict[str, Callable]: + """ + Create parameter string for the formula. + + :return: Mapping from predicate names to predicate functions. + """ + params = {} + for pred_name, pred_info in self.predicates.items(): + params[pred_name] = self.get_predicate_function(pred_name) + return params diff --git a/src/logic_endpoints.py b/src/logic_endpoints.py index 5957fe6..38b5b4a 100644 --- a/src/logic_endpoints.py +++ b/src/logic_endpoints.py @@ -12,9 +12,9 @@ from data_retrieval.influx_data_retriever import InfluxDataRetriever from data_retrieval.prometheus_data_retriever import PrometheusDataRetriever from data_retrieval.misim_retriever import MisimDataRetriever -from handlers.formula_handler import FormulaHandler +from handlers.specification import Specification from mtl_evaluation.mtl_evaluator import MTLEvaluator -from mtl_evaluation.mtl_refinement import MTLRefiner +from mtl_evaluation.mtl_refinement import MTLTimeRefiner, MTLPredicateRefiner import handlers.predicate_functions as predicate_functions import pandas as pd import configparser @@ -32,7 +32,7 @@ def after_request(response): @logic_api.route('/refine_timebound', methods=['POST']) -def refine(): +def refine_timebound(): """ Refines the timebound of the transient behavior specification. @@ -40,20 +40,32 @@ def refine(): """ formula_info = _formula_info_from_request() points_names, multi_dim_array = _data_from_formula_info(formula_info) - refiner = MTLRefiner(formula_info, points_names, multi_dim_array) + refiner = MTLTimeRefiner(formula_info, points_names, multi_dim_array) result = refiner.refine_timebound() return json.dumps(result) +@logic_api.route('/refine_predicate', methods=['POST']) +def refine_predicate(): + """ + Refines the predicate specification. + + :return: + """ + predicate = request.args['predicate'] + metric = request.args['metric'] + use_formula = request.args.get('use_formula', "true", type=json.loads) + formula_info = _formula_info_from_request() + points_names, multi_dim_array = _data_from_formula_info(formula_info) + refiner = MTLPredicateRefiner(formula_info, points_names, multi_dim_array) + result = refiner.refine_predicate(predicate, metric, use_formula) + return json.dumps({"result": result}) + + @logic_api.route('/monitor', methods=['POST']) def monitor(): formula_info = _formula_info_from_request() - formula, params_string = FormulaHandler().handle_formula(formula_info) - mtl_result = start_evaluation( - formula=formula, - params_string=params_string, - formula_info=formula_info, - ) + mtl_result = start_evaluation(formula_info=formula_info) return mtl_result @@ -115,7 +127,7 @@ def _data_from_formula_info(formula_info: dict): return points_names, multi_dim_array -def start_evaluation(formula, params_string, formula_info): +def start_evaluation(formula_info): points_names, multi_dim_array = _data_from_formula_info(formula_info) # if the formula is in future-MTL the trace is reversed and the results also @@ -126,7 +138,8 @@ def start_evaluation(formula, params_string, formula_info): reverse = True options: dict = formula_info.get('options', {}) - mtl_eval_output, intervals = MTLEvaluator(formula, params_string).evaluate( + formula_handler = Specification(formula_info) + mtl_eval_output, intervals = MTLEvaluator(formula_handler).evaluate( points_names, multi_dim_array, reverse=reverse, diff --git a/src/main.py b/src/main.py index 4c8bbd2..e85dc2f 100644 --- a/src/main.py +++ b/src/main.py @@ -7,7 +7,7 @@ from jsonschema import validate from data_retrieval.csv_data_retriever import CSVDataRetriever from data_retrieval.influx_data_retriever import InfluxDataRetriever -from handlers.formula_handler import FormulaHandler +from handlers.specification import Specification from mtl_evaluation.mtl_evaluator import MTLEvaluator import handlers.predicate_functions as predicate_functions from pprint import pprint diff --git a/src/mtl_evaluation/mtl_evaluator.py b/src/mtl_evaluation/mtl_evaluator.py index cbee786..f87c343 100644 --- a/src/mtl_evaluation/mtl_evaluator.py +++ b/src/mtl_evaluation/mtl_evaluator.py @@ -1,45 +1,16 @@ from monitors import mtl -from handlers import predicate_functions +from handlers.specification import Specification from mtl_evaluation.mtl_plotter import MTLPlotter import numpy as np -import re class MTLEvaluator: """A class responsible for the execution of the formula verification""" - def __init__(self, formula, params_string): - self.formula = formula - self.params_string = params_string - - def _create_monitor(self): - """ - Create a monitor object with the formula and the parameters. - - :return: MTL monitor object. - """ - def save_eval(x): - """ Evaluate only in predefined context """ - return eval(x, {}, {"predicate_functions": predicate_functions}) - - params_dict = dict(param.split('=') for param in self.params_string.split(',')) - params_dict = {key: save_eval(value) for key, value in params_dict.items()} - return mtl.monitor(self.formula, **params_dict) - - def _create_predicate_string(self) -> str: - """ - Read the params_string and create a string with the predicate names and their values. - - :return: The predicate values string. - """ - predicate_values_string = '' - predicate_comparison_values = self.params_string.split(',') - for predicate in predicate_comparison_values: - pred_name, pred_func = predicate.split('=') - if not ('boolean' in pred_func or 'trend' in pred_func): - predicate_values = re.findall(r'\(\S+\)', pred_func) - predicate_values_string += f"Predicate '{pred_name}' is set to {''.join(predicate_values)}; " - return predicate_values_string + def __init__(self, specification: Specification): + self.specification = specification + self.formula = self.specification.formula.to_string() + self.params = self.specification.get_params() def _post_process_intervals(self, intervals, value_assignments): """ @@ -49,7 +20,7 @@ def _post_process_intervals(self, intervals, value_assignments): :param value_assignments: The value assignments for each metric over time. :return: The post processed intervals. """ - predicate_values_string = self._create_predicate_string() + predicate_values_string = self.specification.get_params_string() for i, row in enumerate(intervals): mtl_result = bool(row[2]) if not mtl_result: @@ -61,7 +32,7 @@ def _post_process_intervals(self, intervals, value_assignments): return intervals def evaluate(self, points_names, data_array, reverse=False, create_plots: bool = True): - my_mtl_monitor = self._create_monitor() + my_mtl_monitor = mtl.monitor(self.formula, **self.params) data = np.array(data_array, dtype=float) data[data == np.nan] = 0 diff --git a/src/mtl_evaluation/mtl_refinement.py b/src/mtl_evaluation/mtl_refinement.py index a596b36..ce7b398 100644 --- a/src/mtl_evaluation/mtl_refinement.py +++ b/src/mtl_evaluation/mtl_refinement.py @@ -1,8 +1,10 @@ from dataclasses import dataclass -from typing import Tuple, List, Dict, Optional +from typing import Tuple, List, Dict, Optional, Any + +import numpy as np + from mtl_evaluation.mtl_evaluator import MTLEvaluator -from handlers.formula_handler import FormulaHandler -from handlers.psp_mapper import FormulaMapper +from handlers.specification import Specification @dataclass @@ -18,9 +20,9 @@ def base1_index(self): ) -class MTLRefiner: +class MTLPredicateRefiner: """ - A class responsible for refining the timebound of the transient behavior specification. + A class responsible for refining the predicate specification. :param formula_info: The formula information. :param points_names: The predicate names. @@ -28,15 +30,70 @@ class MTLRefiner: """ def __init__(self, formula_info: Dict, points_names: List[str], data: List): - if formula_info['specification_type'] == 'psp': - self._mapper: FormulaMapper = FormulaMapper.from_psp(formula_info['specification']) - elif formula_info['specification_type'] == 'tbv': - self._mapper: FormulaMapper = FormulaMapper.from_tbv(formula_info['specification']) - else: - raise Exception("Invalid specification type") + self._specification: Specification = Specification(formula_info) + self._data: List = data + self._points_names: List[str] = points_names + + def refine_predicate(self, predicate_name, metric_name, use_formula=True) -> Dict[Any, bool]: + """ + Refines the predicate specification for a given metric. + + If the use_formula parameter is set to False, the predicate is refined + as if the mtl formula was set to ( Dict[str, str]: :return: Lower and upper bounds of when the transient behavior specification is satisfied. """ - if not (self._mapper.get_lower_bound() or self._mapper.get_upper_bound()): + if not (self._specification.formula.has_lower_bound() or self._specification.formula.has_upper_bound()): return {'result': 'false', 'intervals': [], 'lower_bound': None, 'upper_bound': None} current_bounds = Interval( @@ -137,10 +194,10 @@ def _update_bounds(self, bounds: Interval): :param bounds: New bounds of specification. :return: None """ - if self._mapper.has_lower_bound(): - self._mapper.set_lower_bound(bounds.lower) - if self._mapper.has_upper_bound(): - self._mapper.set_upper_bound(bounds.upper) + if self._specification.formula.has_lower_bound(): + self._specification.formula.set_lower_bound(bounds.lower) + if self._specification.formula.has_upper_bound(): + self._specification.formula.set_upper_bound(bounds.upper) def _evaluate_formula(self) -> Tuple[bool, List]: """ @@ -148,6 +205,6 @@ def _evaluate_formula(self) -> Tuple[bool, List]: :return: Whether the formula is satisfied and the interval. """ - evaluator = MTLEvaluator(self._mapper.get_formula(), self._params_string) + evaluator = MTLEvaluator(self._specification) mtl_result, interval = evaluator.evaluate(self._points_names, self._data, create_plots=False) return mtl_result[-1], interval From d41d9e3fb68a7a0419ae18f071dc0fa02134170d Mon Sep 17 00:00:00 2001 From: a-baur Date: Tue, 19 Mar 2024 16:46:18 +0100 Subject: [PATCH 2/3] various fixes --- src/handlers/specification.py | 12 ++++--- src/logic_endpoints.py | 7 ++-- src/mtl_evaluation/mtl_refinement.py | 51 ++++++++++++++++------------ 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/src/handlers/specification.py b/src/handlers/specification.py index 0a635a4..93373c1 100644 --- a/src/handlers/specification.py +++ b/src/handlers/specification.py @@ -20,11 +20,13 @@ def __init__(self, formula_info): self.predicates = {} for pred in self._formula_info["predicates_info"]: - comparison_value = pred.get("predicate_comparison_value", None) - if str.isnumeric(comparison_value): - comparison_value = float(comparison_value) - if comparison_value in ["True", "False"]: - comparison_value = comparison_value == "True" + if comparison_value := pred.get("predicate_comparison_value", None): + try: + comparison_value = float(comparison_value) + except ValueError: + pass + if comparison_value in ["True", "False"]: + comparison_value = comparison_value == "True" self.predicates[pred["predicate_name"]] = { "predicate_logic": pred["predicate_logic"], diff --git a/src/logic_endpoints.py b/src/logic_endpoints.py index 38b5b4a..024e3a8 100644 --- a/src/logic_endpoints.py +++ b/src/logic_endpoints.py @@ -50,16 +50,15 @@ def refine_predicate(): """ Refines the predicate specification. - :return: + :return: The interval (min, max, step) and the result of the refined predicate. """ predicate = request.args['predicate'] metric = request.args['metric'] - use_formula = request.args.get('use_formula', "true", type=json.loads) formula_info = _formula_info_from_request() points_names, multi_dim_array = _data_from_formula_info(formula_info) refiner = MTLPredicateRefiner(formula_info, points_names, multi_dim_array) - result = refiner.refine_predicate(predicate, metric, use_formula) - return json.dumps({"result": result}) + interval, result = refiner.refine_predicate(predicate, metric) + return json.dumps({"interval": interval, "result": result}) @logic_api.route('/monitor', methods=['POST']) diff --git a/src/mtl_evaluation/mtl_refinement.py b/src/mtl_evaluation/mtl_refinement.py index ce7b398..0d53a37 100644 --- a/src/mtl_evaluation/mtl_refinement.py +++ b/src/mtl_evaluation/mtl_refinement.py @@ -34,7 +34,7 @@ def __init__(self, formula_info: Dict, points_names: List[str], data: List): self._data: List = data self._points_names: List[str] = points_names - def refine_predicate(self, predicate_name, metric_name, use_formula=True) -> Dict[Any, bool]: + def refine_predicate(self, predicate_name, metric_name,) -> Tuple[Tuple[int, int, int], List[bool]]: """ Refines the predicate specification for a given metric. @@ -45,9 +45,7 @@ def refine_predicate(self, predicate_name, metric_name, use_formula=True) -> Dic :param predicate_name: The predicate name. :param metric_name: The metric name. - :param use_formula: If True, the predicate is refined using the specification formula. - If False, the predicate is only refined in the context of the given metric. - :return: Mapping from values to whether the predicate is satisfied given the value. + :return: Evaluated interval (min, max, step) and the result of the refinement. """ if predicate_name not in self._specification.predicates: raise ValueError(f"Predicate '{predicate_name}' not found in the specification.") @@ -56,27 +54,36 @@ def refine_predicate(self, predicate_name, metric_name, use_formula=True) -> Dic values = np.array(values, dtype=float) values[values == np.nan] = 0 - max_value = int(np.max(values)) + # min and max values for the metric, with a 10% margin + min_value = max(np.min(values) - np.min(values) / 10, 0) + max_value = np.max(values) + np.max(values) / 10 - result = {} - for i in range(max_value + 2): - self._specification.set_predicate_comparison_value(predicate_name, i) - if use_formula: - result[i] = self._refine_with_formula() - else: - result[i] = self._refine_no_formula(predicate_name, values) - - return result + # choose step value so that the number of iterations is limited to 100 + step = (max_value - min_value) / 100 - def _refine_no_formula(self, predicate_name, values): - pred_func = self._specification.get_predicate_function(predicate_name) - v_pred_func = np.vectorize(pred_func) - return bool(v_pred_func(values).any()) + if max_value > 1: + # round values to the nearest multiple of 5 + min_value = min_value - (min_value % 5) + max_value = max_value + (5 - (max_value % 5)) + step = max(1, step) # step should be at least 1 + _range = np.arange(int(min_value), int(max_value), int(step)) + else: + # round min, max and step to 5 decimal places for return + min_value = np.round(min_value, 5) + max_value = np.round(max_value, 5) + step = np.round(step, 5) + # create range and round to 5 decimal places for iteration + _range = np.arange(min_value, max_value, step) + _range = _range.round(5) + + result = [] + for i in _range: + self._specification.set_predicate_comparison_value(predicate_name, i) + evaluator = MTLEvaluator(self._specification) + mtl_result, _ = evaluator.evaluate(self._points_names, self._data, create_plots=False) + result.append(bool(mtl_result[-1])) - def _refine_with_formula(self): - evaluator = MTLEvaluator(self._specification) - mtl_result, _ = evaluator.evaluate(self._points_names, self._data, create_plots=False) - return bool(mtl_result[-1]) + return (min_value, max_value, step), result class MTLTimeRefiner: From 858639d61202379efd915f9e9ba01bf6c83abb9c Mon Sep 17 00:00:00 2001 From: a-baur Date: Tue, 19 Mar 2024 16:56:36 +0100 Subject: [PATCH 3/3] fixed range to be integers above 1 if max value is above 1 --- src/mtl_evaluation/mtl_refinement.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mtl_evaluation/mtl_refinement.py b/src/mtl_evaluation/mtl_refinement.py index 0d53a37..b9c0b0c 100644 --- a/src/mtl_evaluation/mtl_refinement.py +++ b/src/mtl_evaluation/mtl_refinement.py @@ -63,10 +63,10 @@ def refine_predicate(self, predicate_name, metric_name,) -> Tuple[Tuple[int, int if max_value > 1: # round values to the nearest multiple of 5 - min_value = min_value - (min_value % 5) - max_value = max_value + (5 - (max_value % 5)) - step = max(1, step) # step should be at least 1 - _range = np.arange(int(min_value), int(max_value), int(step)) + min_value = int(min_value - (min_value % 5)) + max_value = int(max_value + (5 - (max_value % 5))) + step = int(max(1, step)) # step should be at least 1 + _range = np.arange(min_value, max_value, step) else: # round min, max and step to 5 decimal places for return min_value = np.round(min_value, 5)