diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml new file mode 100644 index 00000000..cc7b66d1 --- /dev/null +++ b/.github/workflows/unit-tests.yaml @@ -0,0 +1,40 @@ +name: Run unit tests + +on: + push: + # branches: [main] + paths-ignore: + - 'docs/**' + - 'Images/**' + pull_request: + +jobs: + unit-test: + name: Run unit tests over pyRDDLGym + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Python version ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies and pyRDDLGym locally + run: | + python -m pip install --upgrade pip + pip install pytest scipy + pip install -r requirements.txt + + cd .. + pip install ./pyRDDLGym + + - name: Test with pytest + run: | + pytest \ No newline at end of file diff --git a/.gitignore b/.gitignore index f773fd9f..518df17d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ *.pyc +build/ +*.egg-info/ + .project .pydevproject .settings/ diff --git a/pyRDDLGym/core/intervals.py b/pyRDDLGym/core/intervals.py index 16901cc8..07c845e9 100644 --- a/pyRDDLGym/core/intervals.py +++ b/pyRDDLGym/core/intervals.py @@ -1,5 +1,19 @@ +import traceback import numpy as np -from typing import Dict, List, Optional, Set, Tuple, Union + +# try to load scipy +try: + from scipy.special import gamma + import scipy.stats as stats +except Exception: + raise_warning('failed to import scipy: ' + 'some interval arithmetic operations will fail.', 'red') + traceback.print_exc() + gamma = None + stats = None + +from typing import Dict, Optional, Tuple +from enum import Enum Bounds = Dict[str, Tuple[np.ndarray, np.ndarray]] @@ -17,16 +31,38 @@ from pyRDDLGym.core.simulator import lngamma +class IntervalAnalysisStrategy(Enum): + '''Specifies how bounds on random variables should be propagated.''' + SUPPORT = 1 + PERCENTILE = 2 + MEAN = 3 + + class RDDLIntervalAnalysis: - def __init__(self, rddl: RDDLPlanningModel, logger: Optional[Logger]=None) -> None: + def __init__( + self, + rddl: RDDLPlanningModel, + logger: Optional[Logger]=None, + strategy: Optional[IntervalAnalysisStrategy]=IntervalAnalysisStrategy.SUPPORT, + percentiles: Optional[Tuple[float, float]]=None + ) -> None: '''Creates a new interval analysis object for the given RDDL domain. :param rddl: the RDDL domain to analyze :param logger: to log compilation information during tracing to file + :param strategy: strategy used to compute bounds on fluents that has stochastic components + :param percentiles: percentiles used to compute bounds when strategy is set to PERCENTILE ''' self.rddl = rddl - self.logger = logger + self.logger = logger + self.strategy = strategy + self.percentiles = percentiles + + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + lower, upper = self.percentiles + if lower < 0 or lower > 1 or upper < 0 or upper > 1 or lower > upper: + raise ValueError('Percentiles must be in the range [0, 1] and lower <= upper.') sorter = RDDLLevelAnalysis(rddl, allow_synchronous_state=True, logger=self.logger) self.cpf_levels = sorter.compute_levels() @@ -38,8 +74,10 @@ def __init__(self, rddl: RDDLPlanningModel, logger: Optional[Logger]=None) -> No self.NUMPY_OR_FUNC = np.frompyfunc(self._bound_or_scalar, nin=2, nout=1) self.NUMPY_LITERAL_TO_INT = np.vectorize(self.rddl.object_to_index.__getitem__) - def bound(self, action_bounds: Optional[Bounds]=None, - per_epoch: bool=False) -> Bounds: + def bound(self, + action_bounds: Optional[Bounds]=None, + per_epoch: bool=False, + state_bounds: Optional[Bounds]=None) -> Bounds: '''Computes intervals on all fluents and reward for the planning problem. :param action_bounds: optional bounds on action fluents (defaults to @@ -47,10 +85,12 @@ def bound(self, action_bounds: Optional[Bounds]=None, :param per_epoch: if True, the returned bounds are tensors with leading dimension indicating the decision epoch; if False, the returned bounds are valid across all decision epochs. + :param state_bounds: optional bounds on state fluents (defaults to + the initial state values otherwise) ''' # get initial values as bounds - intervals = self._bound_initial_values() + intervals = self._bound_initial_values(state_bounds) if per_epoch: result = {} @@ -72,7 +112,7 @@ def bound(self, action_bounds: Optional[Bounds]=None, else: return intervals - def _bound_initial_values(self): + def _bound_initial_values(self, state_bounds=None): rddl = self.rddl # initially all bounds are calculated based on the initial values @@ -90,7 +130,11 @@ def _bound_initial_values(self): params = rddl.variable_params[name] shape = rddl.object_counts(params) values = np.reshape(values, newshape=shape) - intervals[name] = (values, values) + if state_bounds is not None and name in state_bounds: + intervals[name] = state_bounds[name] + else: + intervals[name] = (values, values) + return intervals def _bound_next_epoch(self, intervals, action_bounds=None, per_epoch=False): @@ -158,10 +202,11 @@ def _bound(self, expr, intervals): result = self._bound_control(expr, intervals) elif etype == 'randomvar': result = self._bound_random(expr, intervals) - elif etype == 'randomvector': - result = self._bound_random_vector(expr, intervals) - elif etype == 'matrix': - result = self._bound_matrix(expr, intervals) + # TODO: methods not implemented yet + # elif etype == 'randomvector': + # result = self._bound_random_vector(expr, intervals) + # elif etype == 'matrix': + # result = self._bound_matrix(expr, intervals) else: raise RDDLNotImplementedError( f'Internal error: expression type {etype} is not supported.\n' + @@ -240,6 +285,13 @@ def _bound_pvar(self, expr, intervals): @staticmethod def _mask_assign(dest, mask, value, mask_value=False): + '''Assings a value to a destination array based on a mask. + + :param dest: the destination array to assign to + :param mask: the mask array to determine where to assign + :param value: the value to assign + :param mask_value: if True, the value is also masked + ''' assert (np.shape(dest) == np.shape(mask)) if np.shape(dest): if mask_value: @@ -898,11 +950,15 @@ def _bound_random(self, expr, intervals): def _bound_random_kron(self, expr, intervals): args = expr.args arg, = args + + # SUPPORT, PERCENTILE or MEAN strategy return self._bound(arg, intervals) def _bound_random_dirac(self, expr, intervals): args = expr.args arg, = args + + # SUPPORT, PERCENTILE or MEAN strategy return self._bound(arg, intervals) def _bound_uniform(self, expr, intervals): @@ -910,6 +966,15 @@ def _bound_uniform(self, expr, intervals): a, b = args (la, ua) = self._bound(a, intervals) (lb, ub) = self._bound(b, intervals) + + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Uniform distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + lower = (la + lb) / 2 + upper = (ua + ub) / 2 + return (lower, upper) + lower = la upper = ub return (lower, upper) @@ -919,6 +984,18 @@ def _bound_bernoulli(self, expr, intervals): p, = args (lp, up) = self._bound(p, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + lower_percentile, upper_percentile = self.percentiles + lower = np.zeros(shape=np.shape(lp), dtype=np.int64) + upper = np.ones(shape=np.shape(up), dtype=np.int64) + lower = self._mask_assign(lower, lower_percentile > (1 - lp), 1) + upper = self._mask_assign(upper, upper_percentile <= (1 - up), 0) + return (lower, upper) + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (lp, up) + + # SUPPORT strategy lower = np.zeros(shape=np.shape(lp), dtype=np.int64) upper = np.ones(shape=np.shape(up), dtype=np.int64) lower = self._mask_assign(lower, lp >= 1, 1) @@ -931,6 +1008,17 @@ def _bound_normal(self, expr, intervals): (lm, um) = self._bound(mean, intervals) (lv, uv) = self._bound(var, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + # mean + std * normal_inverted_cdf(p) + lower_percentile, upper_percentile = self.percentiles + lower = lm + np.sqrt(lv) * stats.norm.ppf(lower_percentile) + upper = um + np.sqrt(uv) * stats.norm.ppf(upper_percentile) + return (lower, upper) + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (lm, um) + + # SUPPORT strategy lower = np.full(shape=np.shape(lm), fill_value=-np.inf, dtype=np.float64) upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) lower = self._mask_assign(lower, (lv == 0) & (uv == 0), lm, True) @@ -942,6 +1030,12 @@ def _bound_poisson(self, expr, intervals): p, = args (lp, up) = self._bound(p, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Poisson distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (lp, up) + lower = np.zeros(shape=np.shape(lp), dtype=np.int64) upper = np.full(shape=np.shape(up), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -951,6 +1045,12 @@ def _bound_exponential(self, expr, intervals): scale, = args (ls, us) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Exponential distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (ls, us) + lower = np.zeros(shape=np.shape(ls), dtype=np.float64) upper = np.full(shape=np.shape(us), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -961,6 +1061,20 @@ def _bound_weibull(self, expr, intervals): (lsh, ush) = self._bound(shape, intervals) (lsc, usc) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + # scale * (-ln(1 - p))^(1 / shape) + lower_percentile, upper_percentile = self.percentiles + lower = lsc * (-np.log(1 - lower_percentile) ) ** (1 / lsh) + upper = usc * (-np.log(1 - upper_percentile) ) ** (1 / ush) + return (lower, upper) + + if self.strategy == IntervalAnalysisStrategy.MEAN: + # scale * gamma(1 + 1 / shape) + lower = lsc * gamma(1 + 1 / lsh) + upper = usc * gamma(1 + 1 / ush) + return (lower, upper) + + # SUPPORT strategy lower = np.zeros(shape=np.shape(lsh), dtype=np.float64) upper = np.full(shape=np.shape(ush), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -971,8 +1085,15 @@ def _bound_gamma(self, expr, intervals): (lsh, ush) = self._bound(shape, intervals) (lsc, usc) = self._bound(scale, intervals) - lower = np.zeros(shape=np.shape(ls), dtype=np.float64) - upper = np.full(shape=np.shape(us), fill_value=np.inf, dtype=np.float64) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Gamma distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + # shape * scale + return RDDLIntervalAnalysis._bound_arithmetic_expr((lsh, ush), (lsc, usc), '*') + + lower = np.zeros(shape=np.shape(lsh), dtype=np.float64) + upper = np.full(shape=np.shape(usc), fill_value=np.inf, dtype=np.float64) return (lower, upper) def _bound_binomial(self, expr, intervals): @@ -981,6 +1102,13 @@ def _bound_binomial(self, expr, intervals): (ln, un) = self._bound(n, intervals) (lp, up) = self._bound(p, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Binomial distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + # n * p + return RDDLIntervalAnalysis._bound_arithmetic_expr((ln, un), (lp, up), '*') + lower = np.zeros(shape=np.shape(ln), dtype=np.int64) upper = np.copy(un) lower = self._mask_assign(lower, (lp >= 1) & (ln > 0), ln, True) @@ -993,15 +1121,29 @@ def _bound_beta(self, expr, intervals): (ls, us) = self._bound(shape, intervals) (lr, ur) = self._bound(rate, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Beta distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + raise NotImplementedError("Mean strategy is not implemented for Beta distribution yet.") + lower = np.zeros(shape=np.shape(ls), dtype=np.float64) upper = np.ones(shape=np.shape(us), dtype=np.float64) return (lower, upper) def _bound_geometric(self, expr, intervals): args = expr.args - p, = args - + p, = args (lp, up) = self._bound(p, intervals) + + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Geometric distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + # 1 / p + one = np.ones_like(up) + return RDDLIntervalAnalysis._bound_arithmetic_expr((one, one), (lp, up), '/') + lower = np.ones(shape=np.shape(lp), dtype=np.int64) upper = np.full(shape=np.shape(up), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -1012,6 +1154,12 @@ def _bound_pareto(self, expr, intervals): (lsh, ush) = self._bound(shape, intervals) (lsc, usc) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Pareto distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + raise NotImplementedError("Mean strategy is not implemented for Pareto distribution yet.") + lower = lsc upper = np.full(shape=np.shape(usc), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -1021,6 +1169,14 @@ def _bound_student(self, expr, intervals): df, = args (ld, ud) = self._bound(df, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Student distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + lower = np.zeros_like(ld) + upper = np.zeros_like(ud) + return (lower, upper) + lower = np.full(shape=np.shape(ld), fill_value=-np.inf, dtype=np.float64) upper = np.full(shape=np.shape(ud), fill_value=+np.inf, dtype=np.float64) return (lower, upper) @@ -1031,6 +1187,30 @@ def _bound_gumbel(self, expr, intervals): (lm, um) = self._bound(mean, intervals) (ls, us) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Gumbel distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + lower = lm + 0.577215664901532 * ls + upper = um + 0.577215664901532 * us + return (lower, upper) + + lower = np.full(shape=np.shape(lm), fill_value=-np.inf, dtype=np.float64) + upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) + return (lower, upper) + + def _bound_laplace(self, expr, intervals): + args = expr.args + mean, scale = args + (lm, um) = self._bound(mean, intervals) + (ls, us) = self._bound(scale, intervals) + + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Laplace distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (lm, um) + lower = np.full(shape=np.shape(lm), fill_value=-np.inf, dtype=np.float64) upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) return (lower, upper) @@ -1041,6 +1221,12 @@ def _bound_cauchy(self, expr, intervals): (lm, um) = self._bound(mean, intervals) (ls, us) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Cauchy distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + raise ValueError("The mean of a Cauchy distribution is not defined.") + lower = np.full(shape=np.shape(lm), fill_value=-np.inf, dtype=np.float64) upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) return (lower, upper) @@ -1051,6 +1237,12 @@ def _bound_gompertz(self, expr, intervals): (lsh, ush) = self._bound(shape, intervals) (lsc, usc) = self._bound(scale, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Gompertz distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + raise NotImplementedError("Mean strategy is not implemented for Gompertz distribution yet.") + lower = np.zeros(shape=np.shape(lsh), dtype=np.float64) upper = np.full(shape=np.shape(ush), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -1060,6 +1252,12 @@ def _bound_chisquare(self, expr, intervals): df, = args (ld, ud) = self._bound(df, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Chi-square distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + return (ld, ud) + lower = np.zeros(shape=np.shape(ld), dtype=np.float64) upper = np.full(shape=np.shape(ud), fill_value=np.inf, dtype=np.float64) return (lower, upper) @@ -1070,6 +1268,12 @@ def _bound_kumaraswamy(self, expr, intervals): (la, ua) = self._bound(a, intervals) (lb, ub) = self._bound(b, intervals) + if self.strategy == IntervalAnalysisStrategy.PERCENTILE: + raise NotImplementedError("Percentile strategy is not implemented for Kumaraswamy distribution yet.") + + if self.strategy == IntervalAnalysisStrategy.MEAN: + raise NotImplementedError("Mean strategy is not implemented for Kumaraswamy distribution yet.") + lower = np.zeros(shape=np.shape(la), dtype=np.float64) upper = np.ones(shape=np.shape(ua), dtype=np.float64) return (lower, upper) @@ -1099,6 +1303,4 @@ def _bound_discrete_pvar(self, expr, intervals, unnorm): bounds = [(lower_prob[..., i], upper_prob[..., i]) for i in range(lower_prob.shape[-1])] return self._bound_discrete_helper(bounds) - - - \ No newline at end of file + \ No newline at end of file diff --git a/requirements.test.txt b/requirements.test.txt new file mode 100644 index 00000000..feb62d07 --- /dev/null +++ b/requirements.test.txt @@ -0,0 +1,2 @@ +pytest +scipy \ No newline at end of file diff --git a/setup.py b/setup.py index 21153f80..c77fe318 100644 --- a/setup.py +++ b/setup.py @@ -24,8 +24,8 @@ license="MIT License", url="https://github.com/pyrddlgym-project/pyRDDLGym", packages=find_packages(), - install_requires=['ply', 'pillow>=9.2.0', 'matplotlib>=3.5.0', 'numpy>=1.22,<2', 'gymnasium', 'pygame', 'termcolor'], - python_requires=">=3.8,<3.12", + install_requires=['ply', 'pillow>=9.2.0', 'matplotlib>=3.5.0', 'numpy>=1.22', 'gymnasium', 'pygame', 'termcolor'], + python_requires=">=3.8,<3.13", package_data={'': ['*.cfg']}, include_package_data=True, classifiers=[ diff --git a/tests/data/intervalanalysis/domain.rddl b/tests/data/intervalanalysis/domain.rddl new file mode 100644 index 00000000..e899132d --- /dev/null +++ b/tests/data/intervalanalysis/domain.rddl @@ -0,0 +1,55 @@ +domain test_domain { + + requirements = { + concurrent, // different reservoirs are controlled independently + reward-deterministic, // this domain does not use a stochastic reward + intermediate-nodes, // this domain uses intermediate pvariable nodes + constrained-state // this domain uses state constraints + }; + + types { + someobject: object; + }; + + pvariables { + // Constants + LIMIT(someobject): { non-fluent, real, default = 1.0 }; + + // Intermediate fluents + intermfluent(someobject): { interm-fluent, real }; + + // State fluents + realstatefluent(someobject): { state-fluent, real, default = 0.0 }; + diracdeltastatefluent(someobject): { state-fluent, real, default = 0.0 }; + bernoullistatefluent(someobject): { state-fluent, bool, default = false }; + normalstatefluent(someobject): { state-fluent, real, default = 0.0 }; + weibullstatefluent(someobject): { state-fluent, real, default = 0.0 }; + + // Action fluents + realactionfluent(someobject): { action-fluent, real, default = 0.0 }; + }; + + cpfs { + intermfluent(?o) = realstatefluent(?o) + realactionfluent(?o); + + diracdeltastatefluent'(?o) = DiracDelta(1.0); + bernoullistatefluent'(?o) = Bernoulli(0.5); + normalstatefluent'(?o) = Normal(0.0, 1.0); + weibullstatefluent'(?o) = Weibull(1.0, 5.0); + + realstatefluent'(?o) = intermfluent(?o); + }; + + reward = (sum_{?o : someobject} [ realstatefluent(?o) ]); + + action-preconditions { + forall_{?o : someobject} realactionfluent(?o) <= LIMIT(?o); + forall_{?o : someobject} realactionfluent(?o) >= -LIMIT(?o); + }; + + state-invariants { + forall_{?o : someobject} realstatefluent(?o) <= LIMIT(?o); + forall_{?o : someobject} realstatefluent(?o) >= -LIMIT(?o); + }; + +} \ No newline at end of file diff --git a/tests/data/intervalanalysis/instance.rddl b/tests/data/intervalanalysis/instance.rddl new file mode 100644 index 00000000..035e7fd0 --- /dev/null +++ b/tests/data/intervalanalysis/instance.rddl @@ -0,0 +1,21 @@ +non-fluents nf_test_domain { + domain = test_domain; + objects { + someobject : {o1}; + }; + non-fluents { + LIMIT(o1) = 1.0; + }; +} +instance inst_test_domain { + domain = test_domain; + non-fluents = nf_test_domain; + + init-state { + realstatefluent(o1) = 0.0; + }; + + max-nondef-actions = pos-inf; + horizon = 2; + discount = 1.0; +} \ No newline at end of file diff --git a/tests/test_intervals.py b/tests/test_intervals.py new file mode 100644 index 00000000..7f8377df --- /dev/null +++ b/tests/test_intervals.py @@ -0,0 +1,214 @@ +import os +import numpy as np + +import pyRDDLGym +from pyRDDLGym.core.intervals import RDDLIntervalAnalysis, IntervalAnalysisStrategy + +ROOT_DIR = os.path.dirname(__file__) + +TEST_DOMAIN = f'{ROOT_DIR}/data/intervalanalysis/domain.rddl' +TEST_INSTANCE = f'{ROOT_DIR}/data/intervalanalysis/instance.rddl' + +################################################################################## +# Helper functions +################################################################################## + +def get_action_bounds_from_env(env): + action_bounds = {} + for action, prange in env.model.action_ranges.items(): + lower, upper = env._bounds[action] + if prange == 'bool': + lower = np.full(np.shape(lower), fill_value=0, dtype=int) + upper = np.full(np.shape(upper), fill_value=1, dtype=int) + action_bounds[action] = (lower, upper) + + return action_bounds + +def perform_interval_analysis(domain, instance, action_bounds = None, state_bounds = None, strategy = IntervalAnalysisStrategy.SUPPORT, percentiles = None): + env = pyRDDLGym.make(domain, instance, vectorized=True) + + if action_bounds is None: + action_bounds = get_action_bounds_from_env(env) + + analysis = RDDLIntervalAnalysis(env.model, strategy=strategy, percentiles=percentiles) + bounds = analysis.bound(action_bounds=action_bounds, state_bounds=state_bounds, per_epoch=True) + + env.close() + + return bounds + + +################################################################################## +# Test definitions +################################################################################## + +def test_simple_case(): + ''' Evaluate if the interval propagation works well to a simple use case, + with a real-valued state fluent and the reward function, + without setting up the action and state bounds. + ''' + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE) + + reward_lower, reward_upper = bounds['reward'] + np.testing.assert_array_equal(reward_lower, [0.0, -1.0]) + np.testing.assert_array_equal(reward_upper, [0.0, 1.0]) + + realstatefluent_lower, realstatefluent_upper = bounds['realstatefluent'] + + np.testing.assert_array_equal(realstatefluent_lower.flatten(), [-1.0, -2.0]) + np.testing.assert_array_equal(realstatefluent_upper.flatten(), [1.0, 2.0]) + +def test_action_bounds(): + ''' Evaluate if the interval propagation works well to a simple use case, + with a real-valued state fluent and the reward function, + setting up the action bounds. + ''' + action_bounds = { + 'realactionfluent': ( np.array([ -0.5 ]), np.array([ 0.5 ]) ) + } + + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, action_bounds) + + reward_lower, reward_upper = bounds['reward'] + np.testing.assert_array_equal(reward_lower, [0.0, -0.5]) + np.testing.assert_array_equal(reward_upper, [0.0, 0.5]) + + realstatefluent_lower, realstatefluent_upper = bounds['realstatefluent'] + + np.testing.assert_array_equal(realstatefluent_lower.flatten(), [-0.5, -1.0]) + np.testing.assert_array_equal(realstatefluent_upper.flatten(), [0.5, 1.0]) + +def test_state_bounds(): + ''' Evaluate if the interval propagation works well to a simple use case, + with a real-valued state fluent and the reward function, + setting up the state bounds. + ''' + state_bounds = { + 'realstatefluent': ( np.array([ -0.5 ]), np.array([ 0.5 ]) ) + } + + action_bounds = { + 'realactionfluent': ( np.array([ -0.1 ]), np.array([ 0.1 ]) ) + } + + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, action_bounds, state_bounds) + + reward_lower, reward_upper = bounds['reward'] + np.testing.assert_array_equal(reward_lower, [-0.5, -0.6]) + np.testing.assert_array_equal(reward_upper, [0.5, 0.6]) + + realstatefluent_lower, realstatefluent_upper = bounds['realstatefluent'] + + np.testing.assert_array_equal(realstatefluent_lower.flatten(), [-0.6, -0.7]) + np.testing.assert_array_equal(realstatefluent_upper.flatten(), [0.6, 0.7]) + +def test_diracdelta_propagation(): + ''' Evaluate if the interval propagation works well to a state fluent + that has its next value sampled by a Dirac Delta distribution. + ''' + fluent_name = 'diracdeltastatefluent' + + ## Support strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.SUPPORT) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [1.0, 1.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [1.0, 1.0]) + + ## Mean strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.MEAN) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [1.0, 1.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [1.0, 1.0]) + + ## Percentiles strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.PERCENTILE, percentiles=[0.1, 0.9]) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [1.0, 1.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [1.0, 1.0]) + +def test_bernoulli_propagation(): + ''' Evaluate if the interval propagation works well to a state fluent + that has its next value sampled by a Bernoulli distribution. + ''' + + fluent_name = 'bernoullistatefluent' + + ## Support strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.SUPPORT) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [0.0, 0.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [1.0, 1.0]) + + ## Mean strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.MEAN) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [0.5, 0.5]) + np.testing.assert_array_equal(fluent_upper.flatten(), [0.5, 0.5]) + + ## Percentiles strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.PERCENTILE, percentiles=[0.1, 0.9]) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [0.0, 0.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [1.0, 1.0]) + +def test_normal_propagation(): + ''' Evaluate if the interval propagation works well to a state fluent + that has its next value sampled by a Normal distribution. + ''' + + fluent_name = 'normalstatefluent' + + ## Support strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.SUPPORT) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [-np.inf, -np.inf]) + np.testing.assert_array_equal(fluent_upper.flatten(), [np.inf, np.inf]) + + ## Mean strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.MEAN) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [0.0, 0.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [0.0, 0.0]) + + ## Percentiles strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.PERCENTILE, percentiles=[0.1, 0.9]) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_almost_equal(fluent_lower.flatten(), [-1.281552, -1.281552], decimal=5) + np.testing.assert_array_almost_equal(fluent_upper.flatten(), [1.281552, 1.281552], decimal=5) + +def test_weibull_propagation(): + ''' Evaluate if the interval propagation works well to a state fluent + that has its next value sampled by a Weibull distribution. + ''' + + fluent_name = 'weibullstatefluent' + + ## Support strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.SUPPORT) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [0.0, 0.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [np.inf, np.inf]) + + ## Mean strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.MEAN) + + fluent_lower, fluent_upper = bounds[fluent_name] + np.testing.assert_array_equal(fluent_lower.flatten(), [5.0, 5.0]) + np.testing.assert_array_equal(fluent_upper.flatten(), [5.0, 5.0]) + + ## Percentiles strategy + bounds = perform_interval_analysis(TEST_DOMAIN, TEST_INSTANCE, strategy = IntervalAnalysisStrategy.PERCENTILE, percentiles=[0.1, 0.9]) + + fluent_lower, fluent_upper = bounds[fluent_name] # TODO: instead of using precalculated numbers, we could use other libs to evaluate this + np.testing.assert_array_almost_equal(fluent_lower.flatten(), [0.5268, 0.5268], decimal=5) + np.testing.assert_array_almost_equal(fluent_upper.flatten(), [11.51293, 11.51293], decimal=5) \ No newline at end of file