Skip to content

Commit

Permalink
Merge pull request #8 from Cambio-Project/add_remote_predicate_refine…
Browse files Browse the repository at this point in the history
…ment

Add remote predicate refinement
  • Loading branch information
julianbrott authored Mar 21, 2024
2 parents a2a5ba2 + 858639d commit 3272c37
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 132 deletions.
36 changes: 22 additions & 14 deletions src/handlers/psp_mapper.py → src/handlers/formula.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import re
from numpy import mat
from soupsieve import match
from handlers.psp_patterns import *


class FormulaMapper:
class Formula:
"""A class responsible for the mapping of PSPs to past-MTL formulas"""

def __init__(self, formula=None, formula_params=None, source_pattern=None):
self._formula = formula
self._formula_params = formula_params or {}
self.source_pattern = source_pattern

def get_formula(self):
def to_string(self):
return self._formula.format(**self._formula_params)

def get_lower_bound(self):
Expand All @@ -38,11 +35,11 @@ def set_upper_bound(self, upper_bound):
self._formula_params["upper_bound"] = upper_bound

@staticmethod
def map_pattern(input_pattern):
return FormulaMapper.from_psp(input_pattern).get_formula()
def map_pattern(input_pattern) -> str:
return Formula.from_psp(input_pattern).to_string()

@staticmethod
def extract_list_predicates(input_pattern):
def extract_list_predicates(input_pattern) -> list:
"""
Extracts the list of predicates from the input pattern
Expand All @@ -58,7 +55,19 @@ def extract_list_predicates(input_pattern):
return list_predicates

@staticmethod
def from_tbv(input_pattern) -> "FormulaMapper":
def from_specification(specification) -> "Formula":
"""
Create specification from specification dictionary.
"""
if specification["specification_type"] == "psp":
return Formula.from_psp(specification["specification"])
elif specification["specification_type"] == "tbv":
return Formula.from_tbv(specification["specification"])
else:
return Formula(specification["specification"])

@staticmethod
def from_tbv(input_pattern) -> "Formula":
"""
maps a pattern regex to the past-MTL formula
"""
Expand All @@ -76,17 +85,16 @@ def from_tbv(input_pattern) -> "FormulaMapper":
formula = input_pattern
formula_params = {}

return FormulaMapper(formula, formula_params, input_pattern)

return Formula(formula, formula_params, input_pattern)

@staticmethod
def from_psp(input_pattern) -> "FormulaMapper":
def from_psp(input_pattern) -> "Formula":
"""
maps a pattern regex to the past-MTL formula
:return: the past-MTL formula
"""
list_predicates = FormulaMapper.extract_list_predicates(input_pattern)
list_predicates = Formula.extract_list_predicates(input_pattern)

if re.match(absence_after_q, input_pattern):
formula = "always((once({pred1})) -> ((not {pred2}) since {pred1}))"
Expand Down Expand Up @@ -230,4 +238,4 @@ def from_psp(input_pattern) -> "FormulaMapper":
else:
raise Exception("PSP pattern could not be matched!")

return FormulaMapper(formula, formula_params, input_pattern)
return Formula(formula, formula_params, input_pattern)
51 changes: 0 additions & 51 deletions src/handlers/formula_handler.py

This file was deleted.

78 changes: 78 additions & 0 deletions src/handlers/specification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Dict, Callable, Union

from handlers.formula import Formula
from handlers.predicate_functions import Predicates


class Specification:
"""
A class containing all the necessary information for evaluating an MTL formula.
"""

def __init__(self, formula_info):
"""
Create a specification from a formula_info dictionary.
:param formula_info: The formula_info dictionary.
"""
self._formula_info = formula_info
self.formula = Formula.from_specification(formula_info)

self.predicates = {}
for pred in self._formula_info["predicates_info"]:
if comparison_value := pred.get("predicate_comparison_value", None):
try:
comparison_value = float(comparison_value)
except ValueError:
pass
if comparison_value in ["True", "False"]:
comparison_value = comparison_value == "True"

self.predicates[pred["predicate_name"]] = {
"predicate_logic": pred["predicate_logic"],
"predicate_comparison_value": comparison_value,
}
self.params = self.get_params()

def get_predicate_logic(self, predicate_name) -> str:
return self.predicates[predicate_name]["predicate_logic"]

def set_predicate_logic(self, predicate_name, predicate_logic) -> None:
self.predicates[predicate_name]["predicate_logic"] = predicate_logic

def get_predicate_comparison_value(self, predicate_name) -> Union[float, bool, str]:
return self.predicates[predicate_name]["predicate_comparison_value"]

def set_predicate_comparison_value(self, predicate_name, predicate_comparison_value) -> None:
self.predicates[predicate_name]["predicate_comparison_value"] = predicate_comparison_value

def get_predicate_function(self, predicate_name) -> Callable:
"""
Get the predicate function for a predicate.
"""
return Predicates(self.predicates[predicate_name]["predicate_comparison_value"]).__getattribute__(
self.predicates[predicate_name]["predicate_logic"])

def get_params_string(self) -> str:
"""
Create parameter string for the formula.
Format: "Predicate '<predicate_name>' is set to <predicate_comparison_value>; ..."
:return: The parameter string.
"""
params = []
for pred_name, pred_info in self.predicates.items():
params.append(f"Predicate '{pred_name}' is set to {pred_info['predicate_comparison_value']}")
return "; ".join(params)

def get_params(self) -> Dict[str, Callable]:
"""
Create parameter string for the formula.
:return: Mapping from predicate names to predicate functions.
"""
params = {}
for pred_name, pred_info in self.predicates.items():
params[pred_name] = self.get_predicate_function(pred_name)
return params
36 changes: 24 additions & 12 deletions src/logic_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from data_retrieval.influx_data_retriever import InfluxDataRetriever
from data_retrieval.prometheus_data_retriever import PrometheusDataRetriever
from data_retrieval.misim_retriever import MisimDataRetriever
from handlers.formula_handler import FormulaHandler
from handlers.specification import Specification
from mtl_evaluation.mtl_evaluator import MTLEvaluator
from mtl_evaluation.mtl_refinement import MTLRefiner
from mtl_evaluation.mtl_refinement import MTLTimeRefiner, MTLPredicateRefiner
import handlers.predicate_functions as predicate_functions
import pandas as pd
import configparser
Expand All @@ -32,28 +32,39 @@ def after_request(response):


@logic_api.route('/refine_timebound', methods=['POST'])
def refine():
def refine_timebound():
"""
Refines the timebound of the transient behavior specification.
:return: Lower and upper bounds of when the transient behavior specification is satisfied.
"""
formula_info = _formula_info_from_request()
points_names, multi_dim_array = _data_from_formula_info(formula_info)
refiner = MTLRefiner(formula_info, points_names, multi_dim_array)
refiner = MTLTimeRefiner(formula_info, points_names, multi_dim_array)
result = refiner.refine_timebound()
return json.dumps(result)


@logic_api.route('/refine_predicate', methods=['POST'])
def refine_predicate():
"""
Refines the predicate specification.
:return: The interval (min, max, step) and the result of the refined predicate.
"""
predicate = request.args['predicate']
metric = request.args['metric']
formula_info = _formula_info_from_request()
points_names, multi_dim_array = _data_from_formula_info(formula_info)
refiner = MTLPredicateRefiner(formula_info, points_names, multi_dim_array)
interval, result = refiner.refine_predicate(predicate, metric)
return json.dumps({"interval": interval, "result": result})


@logic_api.route('/monitor', methods=['POST'])
def monitor():
formula_info = _formula_info_from_request()
formula, params_string = FormulaHandler().handle_formula(formula_info)
mtl_result = start_evaluation(
formula=formula,
params_string=params_string,
formula_info=formula_info,
)
mtl_result = start_evaluation(formula_info=formula_info)
return mtl_result


Expand Down Expand Up @@ -115,7 +126,7 @@ def _data_from_formula_info(formula_info: dict):
return points_names, multi_dim_array


def start_evaluation(formula, params_string, formula_info):
def start_evaluation(formula_info):
points_names, multi_dim_array = _data_from_formula_info(formula_info)

# if the formula is in future-MTL the trace is reversed and the results also
Expand All @@ -126,7 +137,8 @@ def start_evaluation(formula, params_string, formula_info):
reverse = True

options: dict = formula_info.get('options', {})
mtl_eval_output, intervals = MTLEvaluator(formula, params_string).evaluate(
formula_handler = Specification(formula_info)
mtl_eval_output, intervals = MTLEvaluator(formula_handler).evaluate(
points_names,
multi_dim_array,
reverse=reverse,
Expand Down
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from jsonschema import validate
from data_retrieval.csv_data_retriever import CSVDataRetriever
from data_retrieval.influx_data_retriever import InfluxDataRetriever
from handlers.formula_handler import FormulaHandler
from handlers.specification import Specification
from mtl_evaluation.mtl_evaluator import MTLEvaluator
import handlers.predicate_functions as predicate_functions
from pprint import pprint
Expand Down
43 changes: 7 additions & 36 deletions src/mtl_evaluation/mtl_evaluator.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,16 @@
from monitors import mtl
from handlers import predicate_functions
from handlers.specification import Specification
from mtl_evaluation.mtl_plotter import MTLPlotter
import numpy as np
import re


class MTLEvaluator:
"""A class responsible for the execution of the formula verification"""

def __init__(self, formula, params_string):
self.formula = formula
self.params_string = params_string

def _create_monitor(self):
"""
Create a monitor object with the formula and the parameters.
:return: MTL monitor object.
"""
def save_eval(x):
""" Evaluate only in predefined context """
return eval(x, {}, {"predicate_functions": predicate_functions})

params_dict = dict(param.split('=') for param in self.params_string.split(','))
params_dict = {key: save_eval(value) for key, value in params_dict.items()}
return mtl.monitor(self.formula, **params_dict)

def _create_predicate_string(self) -> str:
"""
Read the params_string and create a string with the predicate names and their values.
:return: The predicate values string.
"""
predicate_values_string = ''
predicate_comparison_values = self.params_string.split(',')
for predicate in predicate_comparison_values:
pred_name, pred_func = predicate.split('=')
if not ('boolean' in pred_func or 'trend' in pred_func):
predicate_values = re.findall(r'\(\S+\)', pred_func)
predicate_values_string += f"Predicate '{pred_name}' is set to {''.join(predicate_values)}; "
return predicate_values_string
def __init__(self, specification: Specification):
self.specification = specification
self.formula = self.specification.formula.to_string()
self.params = self.specification.get_params()

def _post_process_intervals(self, intervals, value_assignments):
"""
Expand All @@ -49,7 +20,7 @@ def _post_process_intervals(self, intervals, value_assignments):
:param value_assignments: The value assignments for each metric over time.
:return: The post processed intervals.
"""
predicate_values_string = self._create_predicate_string()
predicate_values_string = self.specification.get_params_string()
for i, row in enumerate(intervals):
mtl_result = bool(row[2])
if not mtl_result:
Expand All @@ -61,7 +32,7 @@ def _post_process_intervals(self, intervals, value_assignments):
return intervals

def evaluate(self, points_names, data_array, reverse=False, create_plots: bool = True):
my_mtl_monitor = self._create_monitor()
my_mtl_monitor = mtl.monitor(self.formula, **self.params)

data = np.array(data_array, dtype=float)
data[data == np.nan] = 0
Expand Down
Loading

0 comments on commit 3272c37

Please sign in to comment.