From 73da1599010b95d46d61926b77c4ff06d2368cb5 Mon Sep 17 00:00:00 2001 From: cfirth-nasa Date: Thu, 31 Aug 2023 13:11:14 -0400 Subject: [PATCH] Initial test successful w/ single plugin OnAIR successful initializes plugin from external folder when specified in .ini config file --- README.md | 3 - edp/__init__.py | 0 edp/edp_plugin.py | 181 +++++++++++++ external_plugins_test.py | 28 ++ generic_plugin.py | 51 ++++ integration.py | 99 +++++++ medos_plugin/.gitignore | 131 ++++++++++ medos_plugin/README.md | 22 ++ medos_plugin/__init__.py | 1 + medos_plugin/actuator.py | 11 + medos_plugin/custom_functions.py | 181 +++++++++++++ medos_plugin/environment.yml | 11 + medos_plugin/event.py | 94 +++++++ medos_plugin/event_config.yml | 15 ++ medos_plugin/medos.py | 185 +++++++++++++ medos_plugin/medos_plugin.py | 151 +++++++++++ medos_plugin/runtime.py | 182 +++++++++++++ medos_plugin/telemetry.py | 170 ++++++++++++ medos_plugin/telemetry_config.yml | 36 +++ medos_plugin/telemetry_manager.py | 243 ++++++++++++++++++ onair/config/namaste_test.ini | 17 ++ onair/data/raw_telemetry_data/namaste_TLM.txt | 7 + .../telemetry_configs/namaste_TLM_CONFIG.json | 33 +++ onair/src/reasoning/agent.py | 10 +- onair/src/run_scripts/execution_engine.py | 7 +- onair/src/run_scripts/redis_adapter.py | 90 +++++++ onair/src/run_scripts/sim.py | 38 ++- onair/src/util/print_io.py | 24 +- test/src/run_scripts/test_redis_adapter.py | 214 +++++++++++++++ test/test_driver.py | 1 + 30 files changed, 2210 insertions(+), 26 deletions(-) create mode 100644 edp/__init__.py create mode 100644 edp/edp_plugin.py create mode 100644 external_plugins_test.py create mode 100644 generic_plugin.py create mode 100644 integration.py create mode 100644 medos_plugin/.gitignore create mode 100644 medos_plugin/README.md create mode 100644 medos_plugin/__init__.py create mode 100644 medos_plugin/actuator.py create mode 100644 medos_plugin/custom_functions.py create mode 100644 medos_plugin/environment.yml create mode 100644 medos_plugin/event.py create mode 100644 medos_plugin/event_config.yml create mode 100644 medos_plugin/medos.py create mode 100644 medos_plugin/medos_plugin.py create mode 100644 medos_plugin/runtime.py create mode 100644 medos_plugin/telemetry.py create mode 100644 medos_plugin/telemetry_config.yml create mode 100644 medos_plugin/telemetry_manager.py create mode 100644 onair/config/namaste_test.ini create mode 100644 onair/data/raw_telemetry_data/namaste_TLM.txt create mode 100644 onair/data/telemetry_configs/namaste_TLM_CONFIG.json create mode 100644 onair/src/run_scripts/redis_adapter.py create mode 100644 test/src/run_scripts/test_redis_adapter.py diff --git a/README.md b/README.md index 854acee4..87733b71 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,3 @@ -![Build](https://github.com/nasa/OnAIR/actions/workflows/unit-test.yml/badge.svg) -[![CodeCov](https://codecov.io/gh/nasa/OnAIR/branch/main/graph/badge.svg?token=L0WVOTD5X9)](https://codecov.io/gh/nasa/OnAIR) - # The On-board Artificial Intelligence Research (OnAIR) Platform The On-board Artificial Intelligence Research (OnAIR) Platform is a framework that enables AI algorithms written in Python to interact with NASA's [cFS](https://github.com/nasa/cFS). diff --git a/edp/__init__.py b/edp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/edp/edp_plugin.py b/edp/edp_plugin.py new file mode 100644 index 00000000..3ce98256 --- /dev/null +++ b/edp/edp_plugin.py @@ -0,0 +1,181 @@ +# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" +# +# Copyright © 2023 United States Government as represented by the Administrator of +# the National Aeronautics and Space Administration. No copyright is claimed in the +# United States under Title 17, U.S. Code. All Other Rights Reserved. +# +# Licensed under the NASA Open Source Agreement version 1.3 +# See "NOSA GSC-19165-1 OnAIR.pdf" + +import numpy as np +import redis +import json +import sys +from pathlib import Path +sys.path.append(Path('../')) +from onair.src.util.print_io import * + +class EDP: + """ Placeholder Element class """ + def __init__(self) -> None: + # Variables for pathing + self.size = 100 # grid size, grab this from some configuration somewhere + self.knowledge = np.full((self.size, self.size), np.nan) + self.current_pos = (0,0,1) # arbitrary Z, where can we pull position from in OnAIR? + self.flying_direction = np.array([0,0]) # numpy array + self.visit_queue = [] + self.last = (np.inf, None) + self.peak_found = False + self.just_traversed_neighbors = False + + self.arrived_at_last_cmd = False + + self.last_cmd_pos = (0,0,1) + + # assuming (0,0) is top left, neighbor visitation order: + # right, bottom right, down, bottom left, left, upper left, up, upper right + self.neighbor_directions = np.array([(0,1), (1,1), (1,0), (1,-1), + (0,-1), (-1, -1), (-1,0), (-1,1)]) + + self.redis_pool = redis.ConnectionPool(host='localhost', port=6379, db=0) + self.redis = redis.Redis(connection_pool=self.redis_pool) + + def perform_reasoning(self, info: dict) -> list: + # info format: {"medos": {"sensor_value": float, "timestamp": timestamp, "concentration": float, + # "pos": [float, float, float], "event_in_progress": True} } + # info format: {'sensor_value': float, 'methane event': bool, 'position': tuple + # 'info' can contain more information relevant to planning + # TODO: a param 'task', which specifies what type of planning for EDP to do... + # TODO: compare with greedier algo where we move in direction of first higher neighbor + + # Check if sufficient info for current planning task + try: + # verify keys in dict + info['medos']['concentration'] + info['medos']['pos'] + except KeyError: + print('TERMINATED PLANNING: "concentration" and "pos" are required information') + return [] + + try: + # verify position values can be integers + # self.current_pos = tuple(int(val) for val in info['medos']['pos']) # [float, float, float] + self.current_pos = tuple(int(val) for val in info['medos']['pos'][:2]) + (info['medos']['pos'][2],) + except ValueError: + print(f'TERMINATED PLANNING: "pos" values ({pos}) cannot be converted to integers') + return [] + + + + # dont plan next move until arrived at last location + # careful about comparisons here with the floats + if self.current_pos[:2] == self.last_cmd_pos[:2]: + self.arrived_at_last_cmd = True + if not self.arrived_at_last_cmd: + #print("didn't move yet") + return [] + + #print('current_pos:',self.current_pos, + # 'last_cmd_pos:', self.last_cmd_pos) + + current_concentration = info['medos']['concentration'] + xy_pos = self.current_pos[:2] + self.knowledge[xy_pos] = current_concentration + + neighbors = self._get_neighbor_data(xy_pos, self.knowledge) + if all( not np.isnan(n[2]) for n in neighbors ) and all( current_concentration >= n[2] for n in neighbors ): + print('== LANDING') + # Land + self.peak_found = True + next_pos = self.current_pos[:2] + (0,) + print('Final position:', next_pos) + result = [list(map(float, next_pos))] + self.redis.publish('drone_cmds', json.dumps(result)) + return [] + else: + next_pos = self._get_next_best_pos(current_concentration, xy_pos) # only X, Y needed + + #next_pos += (self.current_pos[2],) # Add static Z coord + next_pos += (1,) # Add static Z coord + + print_generic(f'[EDP] Commanding drone to: {next_pos}', clrs=['EDP']) + + from copy import deepcopy + self.last_cmd_pos = deepcopy(next_pos) + self.arrived_at_last_cmd = False + + # Send to command channel + result = [list(map(float, next_pos))] + self.redis.publish('drone_cmds', json.dumps(result)) + + return result + + # Params could change depending on what will stay as attributes of EDP class + # pos can be accessed via self.current_pos, knowledge of grid via self.knowledge... + def _get_neighbor_data(self, pos: tuple, knowledge) -> list: + size = knowledge.shape[0] + neighbors = [] + for direction in self.neighbor_directions: + #direction = np.array(direction) + position = tuple(pos + direction) + if 0 <= position[0] < size and 0 <= position[1] < size: + concentration = knowledge[position] + neighbors.append((direction, position, concentration)) + + return neighbors + + def _traverse_neighbors(self, pos: tuple, neighbors: list) -> None: + # Each neighbor is a tuple (direction, position, concentration) + # Add neighbors to visit_queue + for n in neighbors: + direction, position, concentration = n + + # Only add to the queue if it's an unknown grid cell + if np.isnan(self.knowledge[position]): + self.visit_queue.append(position) + + # Return to current position after visiting + self.visit_queue.append(pos) + self.just_traversed_neighbors = True + + def _get_next_best_pos(self, current_concentration: float, pos: tuple) -> tuple: + # get neighbors to check for peak + neighbors = self._get_neighbor_data(pos, self.knowledge) + #if all( not np.isnan(n[2]) for n in neighbors ) and all( current_concentration >= n[2] for n in neighbors ): + # self.peak_found = True + # return pos + + # if exploring neighbors, continue + if self.visit_queue: + return self.visit_queue.pop(0) + + # traverse neighbors if last concentration was higher (meaning we decreased) + if not self.just_traversed_neighbors and current_concentration <= self.last[0]: + if self.flying_direction.any(): # go back + self.visit_queue.append(self.last[1]) + pos = self.last[1] + else: + self.last = (current_concentration, pos) # log current + neighbors = self._get_neighbor_data(pos, self.knowledge) + self._traverse_neighbors(pos, neighbors) + self.flying_direction = np.array([0,0]) + return self.visit_queue.pop(0) + + self.just_traversed_neighbors = False + self.last = (current_concentration, pos) # log current + + if self.flying_direction.any(): + next_pos = tuple(pos + self.flying_direction) + if 0 <= next_pos[0] < self.size and 0 <= next_pos[1] < self.size: + return next_pos + else: # invalid fly location + self._traverse_neighbors(pos, neighbors) + self.flying_direction = np.array([0,0]) + return self.visit_queue.pop(0) + + # If none of the above cases, just grab heighest valued neighbor + best_neighbor = max(neighbors, key = lambda x: x[2]) + self.flying_direction = best_neighbor[0] + next_pos = tuple(pos + self.flying_direction) + return next_pos + diff --git a/external_plugins_test.py b/external_plugins_test.py new file mode 100644 index 00000000..89889384 --- /dev/null +++ b/external_plugins_test.py @@ -0,0 +1,28 @@ +import sys + +sys.path.append('OnAIR') + +import pytest +from onair.src.run_scripts.execution_engine import ExecutionEngine +from medos_plugin.medos_plugin import Plugin +from driver import init_global_paths + + +import numpy as np + +def test_plugin_defined_in_config_is_imported_by_ddl_class(mocker): + # Arrange + + # Need to add something to pass a dummy frame of data - currently hardcoded in medos_plugin.py + config_file = 'onair/config/namaste_test.ini' + fake_save_path = '' + init_global_paths() # Fix error on execution engine init where internal required paths unavailable + + + # Action + test_engine = ExecutionEngine(config_file,fake_save_path,False) + + # Assert + assert sum([type(construct) == Plugin for construct in test_engine.sim.agent.learning_systems.ai_constructs]) == len(test_engine.sim.agent.learning_systems.ai_constructs) + + return \ No newline at end of file diff --git a/generic_plugin.py b/generic_plugin.py new file mode 100644 index 00000000..939300ab --- /dev/null +++ b/generic_plugin.py @@ -0,0 +1,51 @@ +# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" +# +# Copyright © 2023 United States Government as represented by the Administrator of +# the National Aeronautics and Space Administration. No copyright is claimed in the +# United States under Title 17, U.S. Code. All Other Rights Reserved. +# +# Licensed under the NASA Open Source Agreement version 1.3 +# See "NOSA GSC-19165-1 OnAIR.pdf" + +from abc import ABC, abstractmethod +"""This object serves as a proxy for all plug-ins. + Therefore, the AIPlugIn object is meant to induce + standards and structures of compliance for user-created + and/or imported plug-ins/libraries +""" +class AIPlugIn(ABC): + def __init__(self, _name, _headers): + """ + Superclass for data driven components: VAE, PPO, etc. Allows for easier modularity. + """ + assert(len(_headers)>0) + self.component_name = _name + self.headers = _headers + + @abstractmethod + def apriori_training(self, batch_data=[]): + """ + Given data, system should learn any priors necessary for realtime diagnosis. + """ + # I dont know yet whether we should allow empty frames from updates + # The batch data format could change + # depending on how the tutorial fleshes out (too early to tell) + # There will be no return from this function (user can pull training) + # data from the construct itself) + # I dont know yet whether we should allow empty batch data + raise NotImplementedError + + @abstractmethod + def update(self, frame=[]): + """ + Given streamed data point, system should update internally + """ + raise NotImplementedError + + @abstractmethod + def render_diagnosis(self): + """ + System should return its diagnosis + """ + raise NotImplementedError + diff --git a/integration.py b/integration.py new file mode 100644 index 00000000..ca444ec8 --- /dev/null +++ b/integration.py @@ -0,0 +1,99 @@ +import sys + +sys.path.append('OnAIR') + +import pytest +from mock import MagicMock +import onair.src.reasoning.agent as agent +from onair.src.reasoning.diagnosis import Diagnosis +from onair.src.systems.vehicle_rep import VehicleRepresentation +from onair.src.data_driven_components.data_driven_learning import DataDrivenLearning + +import numpy as np + +def test_medos_output_matches_expected_sets_input(mocker): + # Arrange + + # Need to add something to pass a dummy frame of data - currently hardcoded in medos_plugin.py + header = ['no_op'] + test = [True] + vehicle = VehicleRepresentation(header,test) + test_agent = agent.Agent(vehicle) + test_agent.learning_systems = DataDrivenLearning(header,['medos']) + + fakeDiagnosis = MagicMock() + argTimeStep = MagicMock() + mocker.patch('onair.src.reasoning.agent.Diagnosis',return_value=fakeDiagnosis) + mocker.patch.object(vehicle, 'get_current_faulting_mnemonics') + fake_frame = {"sensor_value": 42, "concentration": 4242, "pos": [1.0, 1.0, 10.0], "timestamp": 1} + test_agent.learning_systems.update(fake_frame,status=np.array([1])) + + expected = {'medos': { + 'sensor_value': 42, + 'timestamp': 1, + 'concentration': 4242, + 'pos': [1.0, 1.0, 10.0], + 'event_in_progress': False + }} + + + # Action + test_agent.diagnose(argTimeStep) + + # Assert + assert agent.Diagnosis.call_count == 1 + assert agent.Diagnosis.call_args_list[0].args[1] == expected + +def test_if_edp_return_result_has_correct_dict_key(mocker): + # Arrange + header = ['no_op'] + test = [True] + vehicle = VehicleRepresentation(header,test) + + fakeAgent = agent.Agent(vehicle) + fakeTimeStep = MagicMock() + fakeLearningSystem = MagicMock() # may need to make DataDrivenLearning class + fakeAgent.learning_systems = fakeLearningSystem + + info_dict = {'medos': { + 'sensor_value': 42, + 'timestamp': 1, + 'concentration': 8.743, + 'pos': [0.0, 0.0, 10.0], + 'event_in_progress': True + } + } + mocker.patch.object(fakeLearningSystem, + 'render_diagnosis', + return_value=info_dict) + mocker.patch.object(vehicle, 'get_current_faulting_mnemonics') + + # Action + result = fakeAgent.diagnose(fakeTimeStep) + + # Assert + assert "next_pos" in result + +def test_medos_handoff_to_edp(mocker): + # Arrange + header = ['no_op'] + test = [True] + vehicle = VehicleRepresentation(header,test) + test_agent = agent.Agent(vehicle) + test_agent.learning_systems = DataDrivenLearning(header,['medos']) + + fakeDiagnosis = MagicMock() + argTimeStep = MagicMock() + mocker.patch.object(vehicle, 'get_current_faulting_mnemonics') + fake_frame = {"sensor_value": 42, "concentration": 4242, "pos": [1.0, 1.0, 10.0], "timestamp": 1} + + # this update method adds a medos result to fake_agent's learning_system_results + test_agent.learning_systems.update(fake_frame,status=np.array([1])) + + # Action + test_agent.diagnose(argTimeStep) + result = test_agent.diagnose(argTimeStep) + + # Assert + # next_pos should not be equal to the current position from the frame + assert result["next_pos"] != fake_frame["pos"] diff --git a/medos_plugin/.gitignore b/medos_plugin/.gitignore new file mode 100644 index 00000000..f8148b89 --- /dev/null +++ b/medos_plugin/.gitignore @@ -0,0 +1,131 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +logs \ No newline at end of file diff --git a/medos_plugin/README.md b/medos_plugin/README.md new file mode 100644 index 00000000..d1f37bc1 --- /dev/null +++ b/medos_plugin/README.md @@ -0,0 +1,22 @@ +# Module for Event Driven Operations for Spacecraft (MEDOS) +MEDOS is a spacecraft decision engine that can be used to replace routine, well-defined operations. +## Getting Started +Create an anaconda environment to run the MTS. The `environment.yml` file includes all packages used by MTS provided modules. + +To create the environment: + + $ conda env create -f environemnt.yml + +## Config +There are two configuration files for MEDOS. The first is the telemetry configuration in `telemetry_config.yml`, which contains the raw and derived telemetry definitions. + +The other configuration file is `event_config.yml` which contains all the event trigger definitions. + +If you require custom functions for use in the derived telemetry definitions, add them to `custom_functions.py` + +## Running MEDOS +To run MEDOS, run + + $ python medos.py + +This script has a simple example that steps through each telemetry point. This can be extended using the Redis or CSV runtimes provided in `runtime.py`. Some additional examples of this are shown in `medos.py`. \ No newline at end of file diff --git a/medos_plugin/__init__.py b/medos_plugin/__init__.py new file mode 100644 index 00000000..537e14e8 --- /dev/null +++ b/medos_plugin/__init__.py @@ -0,0 +1 @@ +from .medos_plugin import Plugin \ No newline at end of file diff --git a/medos_plugin/actuator.py b/medos_plugin/actuator.py new file mode 100644 index 00000000..7fb6ba43 --- /dev/null +++ b/medos_plugin/actuator.py @@ -0,0 +1,11 @@ +from typing import Dict + + +class Actuator(): + def __init__(self, actions = Dict[str, function]) -> None: + self.actions = actions + +class Action(): + def __init__(self, event, function) -> None: + self.event = event + self.function = function \ No newline at end of file diff --git a/medos_plugin/custom_functions.py b/medos_plugin/custom_functions.py new file mode 100644 index 00000000..d723dc8b --- /dev/null +++ b/medos_plugin/custom_functions.py @@ -0,0 +1,181 @@ +from math import exp, sqrt +import numpy as np +import time + +def get_value(telem): + return telem.value + +def find_nearest(array, value): + array = np.asarray(array) + idx = (np.abs(array - value)).argmin() + return idx + +def get_width(telem: np.ndarray, axis): + + # Select axes to sum over + axes = tuple([ax for ax in (0,1,2) if ax != axis]) + collapsed = np.sum(telem, axis=axes) + np.seterr(divide='ignore') + collapsed = np.where(collapsed > 0, np.log(collapsed), 0) + np.seterr(divide='warn') + + maximum = max(collapsed) + idx = collapsed.argmax() + half_width = int(collapsed.size/2) + shifted = np.roll(collapsed, half_width - idx) + width = find_nearest(shifted[half_width::-1], maximum/2) + find_nearest(shifted[half_width:], maximum/2) + return width + +def memory(memarray,latest): + if latest != memarray[-1]: + mem = np.roll(memarray,-1) + mem[-1] = latest + else: + mem = memarray + return mem + +def running_avg(runavg,latest,numavg): + avg = ((numavg-1)/numavg)*runavg + (1/numavg)*latest + return avg + +def integrate(telem): + energies = np.array([2.16,3.91,7.07,10.93,14.24,18.54,24.14,31.44,40.94,53.32,69.44,90.43,117.77, + 153.36,199.72,260.10,338.72,441.11,574.45,748.10,974.23,1268.72,1652.24, + 2151.68,2802.10,3649.12,4752.19,6188.69,8059.43,10495.65,13668.31,17800.00]) + + idx = ~np.isnan(telem) + area = np.trapz(np.log10(telem[idx])+4,np.log10(energies[idx]),axis=0) + return area + +def peak(telem): + return np.nanargmax(telem) + +def radbelt_probability(telem): + a = 4.0 + b = 6.5 + n = 2.0 + p = ((telem - b)/a)**n + p = np.clip(p,a_min=0,a_max=10) + return p + +def clip(telem,clip_min,clip_max): + return np.clip(telem,a_min=clip_min,a_max=clip_max) + +def inclination_angle(Bz,Bx,By): + theta = np.degrees(np.arctan(Bz/np.sqrt(Bx**2 + By**2))) + return theta + +def sw_counts_score(telem): + + sw_trig = np.array([1.26868246e-03, 1.55803109e-04, 2.22575871e-05, 8.12401928e-04, + 8.34659515e-05, 1.11287935e-05, 2.83784235e-04, 2.89348632e-04, + 3.44992599e-04, 1.39109919e-04, 0.00000000e+00, 6.67727612e-04, + 6.34341231e-04, 1.66931903e-04, 3.83943377e-04, 5.11924502e-04, + 1.11287935e-04, 5.11924502e-04, 8.17966324e-04, 2.53736492e-03, + 4.37250298e-02, 7.32691944e-01, 1.00000000e+00, 1.11644057e-01, + 4.17051537e-02, 3.87337659e-02, 6.64388974e-03, 2.30922466e-03, + 4.84102518e-04, 4.61844931e-04, 2.33704664e-04, 1.55803109e-04]) + + idx = ~np.isnan(telem) + + return np.abs(np.sum(telem[idx] - sw_trig[idx])) + +def msh_counts_score(telem): + + msh_trig = np.array([0.00310392, 0.00485223, 0.00490602, 0.00550852, 0.00550852, + 0.00639612, 0.00903741, 0.01482027, 0.02501963, 0.04165277, + 0.06035698, 0.09509183, 0.14339893, 0.21081907, 0.30757851, + 0.43883611, 0.53940418, 0.75943817, 0.91935189, 1. , + 0.96844438, 0.88144319, 0.77345154, 0.68022637, 0.5936609 , + 0.47513637, 0.31755194, 0.15126362, 0.04666638, 0.01029081, + 0.00562686, 0.00542782]) + + idx = ~np.isnan(telem) + return np.abs(np.sum(telem[idx] - msh_trig[idx])) + +def msp_counts_score(telem): + + msp_trig = np.array([0.06159759, 0.07295117, 0.08463347, 0.09184009, 0.09287683, + 0.10013402, 0.10640504, 0.09482388, 0.07396263, 0.03904215, + 0.03006549, 0.03373202, 0.03221483, 0.03213897, 0.04220295, + 0.04341669, 0.04045819, 0.04518674, 0.03848585, 0.03347915, + 0.027461 , 0.02695527, 0.02700584, 0.03861228, 0.05573116, + 0.08948846, 0.16828078, 0.28252459, 0.49981035, 0.78643133, + 1. , 0.48463853]) + + idx = ~np.isnan(telem) + return np.abs(np.sum(telem[idx] - msp_trig[idx])) + + +def mean_if_no_zeros(data): + if 0 not in data: + return(np.mean(data)) + else: + return(0) + +def std_if_no_zeros(data): + if 0 not in data: + return(np.std(data)) + else: + return(0) + +def peak_flag_calc(y,mean,std): + threshold = 1.78696 + if mean != 0: + return(y[-1] - mean > (threshold * std)) + else: + return(0) + +def y_filtered_calc(y_filtered,y,flag): + influence = 0.031791 + length = len(y_filtered) + if 0 not in y: + if flag: + y_filtered = np.append(y_filtered,(influence * y[-1] + (1 - influence) * y_filtered[-1])) + return(y_filtered[-length:]) + else: + y_filtered = np.append(y_filtered,y[-1]) + return(y_filtered[-length:]) + else: + return(y) + +def ingest(arr, val): + length = len(arr) + arr = np.append(arr, val) + arr = arr[-length:] + return(arr) + + + + + +custom_functions = { + 'previous': lambda telem: telem.previous_telem.value, + 'value': get_value, + 'exp': lambda telem: exp(telem), + 'sqrt': lambda telem: sqrt(telem), + 'abs': lambda telem: abs(telem), + 'array': lambda telem: np.asarray(telem), + 'width': get_width, + 'sum': lambda telem: np.nansum(telem,axis=0), + 'max': lambda telem: np.nanmax(telem), + 'var': lambda telem: np.var(telem), + 'integrate': integrate, + 'memory': memory, + 'running_avg': running_avg, + 'rb_prob': radbelt_probability, + 'inclination_angle': inclination_angle, + 'clip': clip, + 'peak': peak, + 'sw_counts_score': sw_counts_score, + 'msh_counts_score': msh_counts_score, + 'msp_counts_score': msp_counts_score, + 'diff': np.diff, + 'mean': np.mean, + 'std': np.std, + 'mean_if_no_zeros': mean_if_no_zeros, + 'std_if_no_zeros': std_if_no_zeros, + 'peak_flag_calc': peak_flag_calc, + 'y_filtered_calc': y_filtered_calc, + 'ingest': ingest +} diff --git a/medos_plugin/environment.yml b/medos_plugin/environment.yml new file mode 100644 index 00000000..2b2e1c88 --- /dev/null +++ b/medos_plugin/environment.yml @@ -0,0 +1,11 @@ +name: medos +channels: + - conda-forge +dependencies: + - python=3.10.* + - autopep8 + - numpy + - pyyaml + - simpleeval + - redis-py + - pandas diff --git a/medos_plugin/event.py b/medos_plugin/event.py new file mode 100644 index 00000000..24c395ce --- /dev/null +++ b/medos_plugin/event.py @@ -0,0 +1,94 @@ +from simpleeval import simple_eval +from typing import Dict, List, Tuple +from collections import deque +import numpy as np + +from .telemetry import Telemetry + + +class Event(): + """Event to be detected by MEDOS + + Attributes + ---------- + name: str + Name of the event + telem: Dict[str, Telemetry] + Dictionary of the names and Telemetry object that describes the event + threshold_expression: str + Expression to determine event existence from the score + matrix: np.ndarray + The matrix representation of the event telemetry + mask: np.ndarray + A mask of the required telemetry for the event + + Parameters + ---------- + event_config: Dict[str, Dict] + Dictionary containing the YAML event configuration + telem_order: List[str] + The order to expect telemetry for the matrix calculations + """ + + def __init__(self, event_config: Dict[str, Dict], telem_order: List[str]) -> None: + self.name = event_config['name'] + self.telem: Dict[str, Tuple[float, Telemetry]] = {} + for name, contents in event_config['telemetry'].items(): + self.telem.update({name: (contents['weight'], Telemetry(name, contents['value']))}) + + self.threshold_expression = event_config['threshold'] + self.persistence = deque([False] * event_config['persistence'], maxlen=event_config['persistence']) + self.matrix = self._calculate_matrix(telem_order) + self.mask = np.where(self.matrix != 0, 1, 0) + + def calculate_distance(self, telem: Dict[str, Telemetry]) -> float: + """Calculate how much the telemetry and the expected event telemetry + differ and determine if the event has occured. + + Parameters + ---------- + telem : Dict[str, Telemetry] + The current telemetry + + Returns + ------- + bool + The occurence of the event + """ + score = 0 + for key, event_telem in self.telem.items(): + score += (event_telem[0]*(event_telem[1] - telem[key]))**2 + return score + + def evaluate(self, score: float) -> bool: + """Evaluate the threshold expression + + Parameters + ---------- + score : float + Score for evaluating the threshold boolean expression + + Returns + ------- + bool + Result of threshold evaluation + """ + return np.any(simple_eval(self.threshold_expression, names={'score': score})) + + def _calculate_matrix(self, telem_order: List[str]) -> np.ndarray: + """Calculate the matrix form of the event + + Parameters + ---------- + telem_order : List[str] + The expected telemetry order for matrix calculation + + Returns + ------- + np.ndarray + The matrix of telemetry that defines the event. All undefined telemetry is 0. + """ + return np.asarray([self.telem[key][1].value if key in self.telem.keys() else 0 for key in telem_order]) + + def __repr__(self) -> str: + return str(self.telem) diff --git a/medos_plugin/event_config.yml b/medos_plugin/event_config.yml new file mode 100644 index 00000000..88f896f1 --- /dev/null +++ b/medos_plugin/event_config.yml @@ -0,0 +1,15 @@ +Events: + - name: Increased Methane Concentration + threshold: score == 0 + persistence: 20 + telemetry: + peak_flag: + value: 1 + weight: 1 + - name: Decreased Methane Concentration + threshold: score == 1 + persistence: 20 + telemetry: + peak_flag: + value: 1 + weight: 0.1 \ No newline at end of file diff --git a/medos_plugin/medos.py b/medos_plugin/medos.py new file mode 100644 index 00000000..951b3501 --- /dev/null +++ b/medos_plugin/medos.py @@ -0,0 +1,185 @@ +import argparse +from typing import Dict +from runtime import CSVRunTime +from redis_runtime.runtime import StaticFeederRuntime +from redis_runtime.dynamic_feeder import DynamicFeeder +from telemetry_manager import TelemetryManager +import yaml +import os +from datetime import datetime +import numpy as np +from collections import deque +import time + + +def main(): + with open('telemetry_config.yml', 'r') as f: + telemetry_defs = yaml.safe_load(f) + with open('event_config.yml', 'r') as f: + event_defs = yaml.safe_load(f) + telem_manager = TelemetryManager(telemetry_defs, event_defs) + + telem_manager.run_step({ + "B_Field": 500e-9, + "Voltage": 300, + "Current": 1e-3, + "Mode": 2, + "Sensor_1_Counts": 345, + "Sensor_2_Counts": 234, + "Sensor_3_Counts": 456 + }) + print(telem_manager) + print('----------------------------------------------') + telem_manager.run_step({ + "B_Field": -5e-9, + "Voltage": 300, + "Current": 2e-3, + "Mode": 1, + "Sensor_1_Counts": 345, + "Sensor_2_Counts": 234, + "Sensor_3_Counts": 456 + }) + print(telem_manager) + print('----------------------------------------------') + telem_manager.run_step({ + "B_Field": 5e-9, + "Voltage": 300, + "Current": 1.5e-3, + "Mode": 2, + "Sensor_1_Counts": 345, + "Sensor_2_Counts": 234, + "Sensor_3_Counts": 12 + }) + print(telem_manager) + print('==============================================') + + +def csv(filename): + with open('telemetry_config.yml', 'r') as f: + telemetry_defs = yaml.safe_load(f) + with open('event_config.yml', 'r') as f: + event_defs = yaml.safe_load(f) + telem_manager = TelemetryManager(telemetry_defs, event_defs) + runtime = CSVRunTime(0.01, telem_manager, filename) + event_list = [0] + + + def callback(*args): # Callback modified to debug methane event detection. + # print(telem_manager, '\n') + # print([event_name for event_name, result in events.items() if result]) + # print(telem_manager.matrix) + for arg in args: + print(arg) + print('--------------------------------------------------------------------------------') + + runtime.start(callback) + + +def redis(): + with open('telemetry_config.yml', 'r') as f: + telemetry_defs = yaml.safe_load(f) + with open('event_config.yml', 'r') as f: + event_defs = yaml.safe_load(f) + telem_manager = TelemetryManager(telemetry_defs, event_defs) + runtime = StaticFeederRuntime( + 1.0, '34.232.49.68', 6379, 'default', 'yH4sULJc9k*^', telem_manager, "static_20220302") + + def callback(index, telem, scores, events): + print(telem_manager, '\n') + # breakpoint() + # print([event_name for event_name, result in events.items() if result]) + # print(telem_manager.matrix) + print(events) + print(scores) + print('--------------------------------------------------------------------------------') + + runtime.start(callback) + + +def dynamic_redis(start_time=None, startup_delay=0, profile=False): + + fast_rate = 400 + slow_rate = 15 + transition_RE = 22 + perigees = deque() + inbound_22RE = deque() + outbound_22RE = deque() + time_skip = False + + with open('telemetry_config.yml', 'r') as f: + telemetry_defs = yaml.safe_load(f) + with open('event_config.yml', 'r') as f: + event_defs = yaml.safe_load(f) + telem_manager = TelemetryManager(telemetry_defs, event_defs) + runtime = DynamicFeeder( + delay=0, + host='34.232.49.68', + port=6379, + username='default', + password='yH4sULJc9k*^', + telem_manager=telem_manager, + stream_prefix="dynamic_alpha", + rate=fast_rate, + startup_delay=startup_delay, + start_time=start_time, + log_change_only=True + ) + runtime.set_rate(slow_rate) + + # Parse perigee and 22RE times + with open('MMS3_22RE_2017-2023.txt', 'r') as f: + for line in f: + linesplit = line.strip().split() + time = datetime.strptime(linesplit[0], "%Y/%j-%H:%M:%S") + if linesplit[2] == "Perigee": + perigees.append(time) + elif linesplit[2] == "EarthRadiiCross" and linesplit[4] == "Out": + outbound_22RE.append(time) + elif linesplit[2] == "EarthRadiiCross" and linesplit[4] == "In": + inbound_22RE.append(time) + + def callback(index, telem, scores, events): + # Throttle if we're in the 22RE+ region + if fast_rate != slow_rate and not time_skip: + if telem['radial_position'].value < transition_RE and runtime.rate == fast_rate: + runtime.set_rate(slow_rate) + print(f'New rate: {slow_rate}') + elif telem['radial_position'].value >= transition_RE and runtime.rate == slow_rate: + runtime.set_rate(fast_rate) + print(f'New rate: {fast_rate}') + # Skip if we aren't throttling + elif time_skip: + while index > outbound_22RE[0]: + outbound_22RE.popleft() + while index > inbound_22RE[0]: + inbound_22RE.popleft() + if outbound_22RE[0] > inbound_22RE[0]: + skip_time = inbound_22RE.popleft() + runtime.set_start_time(skip_time) + print(f"Skipping to {skip_time}") + + if profile: + def callback_with_profile(index, telem, scores, events): + sim_time = runtime.get_time() + with open(os.path.join(runtime.log_dir, 'profile.csv'), 'a') as f: + f.write(f'{datetime.now().isoformat()},{index},{sim_time}\n') + callback(index, telem, scores, events) + runtime.start(callback_with_profile) + else: + runtime.start(callback) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description='Run the MEDOS telemetry system.') + + parser.add_argument('-r', '--resume', action='store_true', + help='Resume MEDOS stream', required=False) + + args = parser.parse_args() + if args.resume: + print('Resuming MEDOS stream...') + dynamic_redis(startup_delay=0, profile=True) + else: + dynamic_redis("2020-01-01T22:00:00.000", + startup_delay=60*4, profile=True) diff --git a/medos_plugin/medos_plugin.py b/medos_plugin/medos_plugin.py new file mode 100644 index 00000000..559c97fe --- /dev/null +++ b/medos_plugin/medos_plugin.py @@ -0,0 +1,151 @@ +# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" +# +# Copyright © 2023 United States Government as represented by the Administrator of +# the National Aeronautics and Space Administration. No copyright is claimed in the +# United States under Title 17, U.S. Code. All Other Rights Reserved. +# +# Licensed under the NASA Open Source Agreement version 1.3 +# See "NOSA GSC-19165-1 OnAIR.pdf" +from typing import Dict +import yaml +import numpy as np +import json +import redis + +from generic_plugin import AIPlugIn +from .runtime import StepperRunTime +from .telemetry_manager import TelemetryManager +import sys +from pathlib import Path +sys.path.append(Path('../')) +from onair.src.util.print_io import * + + + +"""This object serves as a proxy for all plug-ins. + Therefore, the AIPlugIn object is meant to induce + standards and structures of compliance for user-created + and/or imported plug-ins/libraries +""" +class Plugin(AIPlugIn): + def __init__(self, name, headers, telemetry_file='telemetry_config.yml', event_file='event_config.yml'): + """ + Superclass for data driven components: VAE, PPO, etc. Allows for easier modularity. + """ + # OnAIR required # + self.component_name = name + self.headers = headers + + + # Initialize algorithm params # + self.w_len = 25 # number of points in buffer + self.threshold = 0.1 # number of std deviations a new point must be away from the mean to trigger a flag + self.influence = 0.9 # influence of new points on mean/std deviation + # (higher influence means the system reacts more quickly to structural breaks) + self.ctrMax = 15 # number of points required for an event to be confirmed + + + # Initialize detection vars # + self.avg_filter = 0 + self.std_filter = 0 + self.ctr = 0 + self.peak_flag = False + + + + # Buffer preloading to mimic old MEDOS operations # + self.y = np.zeros(self.w_len) + self.y_filtered = np.zeros(self.w_len) + + self.counter = 0 + + + + #### START: Classes mandated by plugin architecture + + def apriori_training(self, batch_data=[]): + # Not applicable...MEDOS is homeschooled + pass + + def update(self, frame=[]): # send MEDOS new datapoint + """ + Given streamed data point, system should update internally + """ + # SETS passes "drone_0_state" as + # {"sensor_value": float, "concentration": float, "pos": [float, float, float], "timestamp": datetime (str?)} + reconstruct = dict(zip(self.headers[1:], frame)) + self.drone_state_dict = reconstruct + + def render_diagnosis(self): # return event detections + """ + System should return its diagnosis + """ + detect_flag = False + + sensor_value = self.drone_state_dict['sensor_value'] + # timestamp = self.drone_state_dict['timestamp'] + concentration = self.drone_state_dict['concentration'] + pos = self.drone_state_dict['pos'] + + self.y = self.ingest(self.y, sensor_value) # Append sensor value buffer + + + self.peak_flag = self.y[-1] - \ + self.avg_filter > (self.threshold*self.std_filter) if self.avg_filter != 0 else 0 # calculate algorithm result + + self.y_filtered = self.y_filtered_calc(self.y_filtered, self.y, self.peak_flag,self.influence) # calculate filtered values + + self.avg_filter = np.mean(self.y_filtered) if 0 not in self.y_filtered else 0 # calculate mean if y_filtered is full of nonzero values + self.std_filter = np.std(self.y_filtered) if 0 not in self.y_filtered else 0 # calculate std if " " + + + # if self.peak_flag: # is a peak detected? + # if self.ctr == 0: # is this a "new" event? + # self.peak_start_time = timestamp # if new event, record the earliest instance of the event to pass back once event is confirmed + # ctr += 1 + + # elif self.ctr == self.ctrMax: # event is confirmed + # print(f"Event Detected at {self.peak_start_time}") + # detect_flag = True + + # else: # event detected, but not a new event nor is the counter at the max + # self.ctr += 1 + + # elif not self.peak_flag: # event is no longer detected; reset counter + # self.ctr = 0 + ret = { + 'sensor_value': sensor_value, + # 'timestamp': timestamp, + 'concentration': concentration, + 'pos': pos, + 'event_in_progress': detect_flag + } + if self.counter % 100 == 0: + print_generic(f'[MEDOS] Status: {ret}', clrs=['MEDOS']) + self.counter += 1 + return(ret) + + + + #### END: Classes mandated by plugin architecture + + def ingest(self,arr,val): + length = len(arr) + arr = np.append(arr, val) + arr = arr[-length:] + return (arr) + + + def y_filtered_calc(self,y_filtered, y, flag, influence): + length = len(y_filtered) + if 0 not in y: + if flag: + y_filtered = np.append( + y_filtered, (influence * y[-1] + (1 - influence) * y_filtered[-1])) + return (y_filtered[-length:]) + else: + y_filtered = np.append(y_filtered, y[-1]) + return (y_filtered[-length:]) + else: + return (y) + diff --git a/medos_plugin/runtime.py b/medos_plugin/runtime.py new file mode 100644 index 00000000..fcc74be4 --- /dev/null +++ b/medos_plugin/runtime.py @@ -0,0 +1,182 @@ +from time import sleep +import traceback +import pandas as pd +import os +from .telemetry_manager import TelemetryManager +import numpy as np +import sys +np.set_printoptions(linewidth=np.inf) + + +class RunTime(): + """Class to handle the global loop and time steps. Must be subclassed. + + Attributes + ---------- + steps : int + The number of steps to run through + telem_manager : TelemetryManager + The TelemetryManager to associate with this runtime. + + Parameters + ---------- + steps : int + The number of steps to run through + telem_manager : TelemetryManager + The TelemetryManager to associate with this runtime. + """ + def __init__(self, delay: float, telem_manager: TelemetryManager, log_change_only=False) -> None: + self.delay = delay + self.telem_manager = telem_manager + self.log_change_only = log_change_only + timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') + self.log_dir = os.path.join(os.getcwd(), 'logs', timestamp) + os.makedirs(self.log_dir, exist_ok=True) + self.telem_log = os.path.join(self.log_dir, f'telem_log.csv') + self.score_log = os.path.join(self.log_dir, f'score_log.csv') + self.decision_log = os.path.join(self.log_dir, f'decision_log.csv') + self.previous_decision = None + + def get_next_telem(self): + """Get the next telemetry point. Must be overriden by subclasses. + + Raises + ------ + NotImplemented + Must be overridden by subclasses + """ + pass + + def next_step(self,callback,telem): + """ + Call the runtime a single time with a defined telemetry stream + + Parameters + ---------- + callback : function + Function to return on each step. + telem : array-like + Incoming telemetry. + """ + assert telem != [], "Telem frame is empty" + + scores, decisions = self.telem_manager.run_step(telem) + # self.log(index, self.telem_manager.telem, scores, decisions) + callback(self.telem_manager.telem, scores, decisions) + + + + def start(self, callback, steps=None): + """Start the runtime + + Parameters + ---------- + callback : function + Function to run after each step. It is passed the results of the + event check. + """ + step_cnt = 0 + try: + while steps is None or step_cnt < steps: + index, telem = self.get_next_telem() + if index is None: + index = step_cnt + if telem is not None: + scores, decisions = self.telem_manager.run_step(telem) # Use this function as update() in core + step_cnt += 1 + self.log(index, self.telem_manager.telem, scores, decisions) + callback(index, self.telem_manager.telem, scores, decisions) + else: + break + sleep(self.delay) + except Exception as e: + self.close() + print(traceback.format_exc()) + sys.exit() + except KeyboardInterrupt: + self.close() + sys.exit() + self.close() + + def close(self): + pass + + def log(self, index, telem, scores, decisions): + telem_log_dict = {key: telem[key] for key in self.telem_manager.log_telem} + + # If the log files don't exist, create them and write the headers + if not os.path.exists(self.telem_log): + with open(self.telem_log, 'w') as f: + f.write('step,' + ','.join(telem_log_dict.keys()) + '\n') + if not os.path.exists(self.score_log): + with open(self.score_log, 'w') as f: + f.write('step,' + ','.join(scores.keys()) + '\n') + if not os.path.exists(self.decision_log): + with open(self.decision_log, 'w') as f: + f.write('step,' + ','.join(decisions.keys()) + '\n') + + # Write the log files + with open(self.telem_log, 'a') as f: + f.write(f'{index},' + ','.join([str(v.value) for v in telem_log_dict.values()]) + '\n') + with open(self.score_log, 'a') as f: + f.write(f'{index},' + ','.join([str(v) for v in scores.values()]) + '\n') + if not self.log_change_only or self.previous_decision != decisions: + self.previous_decision = decisions.copy() + with open(self.decision_log, 'a') as f: + f.write(f'{index},' + ','.join([str(v) for v in decisions.values()]) + '\n') + + + + +class CSVRunTime(RunTime): + """Class to handle the global loop and time steps for a CSV telemetry source. + + Attributes + ---------- + steps : int + The number of steps to run through + telem_manager : TelemetryManager + The TelemetryManager to associate with this runtime. + telem_lines + A generator containing the lines of the CSV file + + + Parameters + ---------- + telem_manager : TelemetryManager + The TelemetryManager to associate with this runtime. + filename : str + The file containing the telemetry to run + """ + def __init__(self, delay: float, telem_manager: TelemetryManager, filename: str) -> None: + + df = pd.read_csv(filename) + super().__init__(delay, telem_manager) + + self.telem_lines = iter(df.to_dict(orient='index').values()) + + def get_next_telem(self): + """Get the next telemetry point. + + Returns + ------- + Dict + Dictionary of the telemetry points + """ + try: + return None, next(self.telem_lines) + except StopIteration: + return None, None + + +class StepperRunTime(RunTime): + """ + Subclass that runs a callback only when called. + + Parameters + ---------- + telem_manager : TelemetryManager + The TelemetryManager to associate with this runtime. + """ + def __init__(self, delay: float, telem_manager: TelemetryManager) -> None: + super().__init__(delay,telem_manager) diff --git a/medos_plugin/telemetry.py b/medos_plugin/telemetry.py new file mode 100644 index 00000000..20727b75 --- /dev/null +++ b/medos_plugin/telemetry.py @@ -0,0 +1,170 @@ +from __future__ import annotations +import ast +from typing import Dict + +from simpleeval import simple_eval +from .custom_functions import custom_functions +import numpy as np + + +class Telemetry(): + """Telemetry object + + Attributes + ---------- + name: str + Name of the telemetry + value: + Current value of the telemetry + previous_telem: Telemetry + Telemetry object at the previous time step + + Parameters + ---------- + name: + Name of the telemetry + default_value: + Starting value for the telemetry + """ + + def __init__(self, name, default_value) -> None: + self.name = name + if type(default_value) == list: + self.value = np.asarray(default_value) + else: + self.value = default_value + self.previous_telem = None + + def link_previous_telem(self, previous_telem: Telemetry): + """Create object containing the previous telemetry. + + Parameters + ---------- + previous_telem : Telemetry + Telemetry object at the previous time step + """ + + self.previous_telem = previous_telem + + def save_previous(self): + """Save the current telemetry as the previous telemetry.""" + + try: + self.previous_telem.value = self.value + except TypeError: + print('Must link the previous telemetry before saving') + + def update(self): + raise NotImplemented + + def __eq__(self, other) -> bool: + if isinstance(other, Telemetry): + return self.value == other.value + return self.value == other + + def __repr__(self) -> str: + return f'{self.value}' # {self.name}({self.value})' + + def __add__(self, other): + return self.value + other + + def __radd__(self, other): + return other + self.value + + def __sub__(self, other): + return self.value - other + + def __rsub__(self, other): + return other - self.value + + def __mul__(self, other): + return self.value * other + + def __rmul__(self, other): + return other * self.value + + def __truediv__(self, other): + return self.value / other + + def __rtruediv__(self, other): + return other / self.value + + def __pow__(self, other): + return other**self.value + + def __abs__(self): + return abs(self.value) + + def __float__(self): + return float(self.value) + + def __int__(self): + return int(self.value) + + def __getitem__(self, key): + return self.value[key] + + +class RawTelemetry(Telemetry): + """Telemetry that is not calculated""" + + def __init__(self, name, default_value) -> None: + super().__init__(name, default_value) + + def update(self, raw_telem): + """Updates the current value + + Parameters + ---------- + raw_telem + New telemetry value + """ + self.value = raw_telem + + +class DerivedTelemetry(Telemetry): + """Telemetry that is calculated from an expression + + Attributes + ---------- + expression: str + String containing the math equation to evaluate the derived telemetry + dependency_keys: List[str] + List of the telemetry that this telemetry depends on + + Parameters + ---------- + expression: str + String containing the math equation to evaluate the derived telemetry + """ + + def __init__(self, name, default_value, expression: str) -> None: + super().__init__(name, default_value) + self.expression = expression.strip() + + # Search the telemetry's expression for any variable names that are not its own name + self.nodes = [node.id for node in ast.walk(ast.parse(expression.strip())) if isinstance(node, ast.Name)] + self.dependency_keys = set(self.nodes) - {name} - set(custom_functions.keys()) + + for idx, node in enumerate(self.nodes): + if node == "previous": + self.nodes.remove(self.nodes[idx + 1]) + self.nodes = set(self.nodes) - {name} - set(custom_functions.keys()) + + def update(self, telem: Dict): + """Calculates and updates the current value + + Parameters + ---------- + telem + Dictionary of dependency telemetry required for expression + evaluation. + """ + try: + # Evaluate the expression and update the current value. + # The expression is wrapped in ()*1 to account for passthrough + # expressions that only contain the telemetry name + self.value = simple_eval( + f'({self.expression})*1', names=telem, functions=custom_functions) + except ZeroDivisionError: + self.value = None diff --git a/medos_plugin/telemetry_config.yml b/medos_plugin/telemetry_config.yml new file mode 100644 index 00000000..b8bf9f87 --- /dev/null +++ b/medos_plugin/telemetry_config.yml @@ -0,0 +1,36 @@ +Raw_Telemetry: + deltaR: + initial_value: 0 + # newest value from data stream + time_m: + initial_value: 0 + +Derived_Telemetry: + delta_r_buffer: + initial_value: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + expression: ingest(value(delta_r_buffer),value(deltaR)) + log: True + + delta_r_smoothed: + initial_value: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + expression: ingest(value(delta_r_smoothed),mean_if_no_zeros(value(delta_r_buffer))) + + base_minus_smoothed: + initial_value: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + expression: value(delta_r_buffer)-value(delta_r_smoothed) if (0 not in value(delta_r_buffer) and 0 not in value(delta_r_smoothed)) else previous(base_minus_smoothed) + + peak_y_filtered: + initial_value: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + expression: y_filtered_calc(value(peak_y_filtered),value(base_minus_smoothed),previous(peak_flag)) + + peak_mean: + initial_value: 0 + expression: mean(value(peak_y_filtered)) if 0 not in peak_y_filtered else 0 + + peak_std: + initial_value: 0 + expression: std(value(peak_y_filtered)) if 0 not in peak_y_filtered else 0 + + peak_flag: + initial_value: 0 + expression: peak_flag_calc(value(base_minus_smoothed),value(peak_mean),value(peak_std)) \ No newline at end of file diff --git a/medos_plugin/telemetry_manager.py b/medos_plugin/telemetry_manager.py new file mode 100644 index 00000000..1729e5de --- /dev/null +++ b/medos_plugin/telemetry_manager.py @@ -0,0 +1,243 @@ +from typing import Dict, List +from .event import Event +from .telemetry import RawTelemetry, DerivedTelemetry, Telemetry + + +class TelemetryManager(): + """Object to manage all raw and derived telemetry updates + + Attributes + ---------- + raw_telem: Dict[str, RawTelemetry] + Dictionary of all RawTelemetry objects + derived_telem: Dict[str, DerivedTelemetry] + Dictonary of all DerivedTelmetry objects + log_telem: List[str] + List of all telemetry to log + telem: Dict[str, Telemetry] + Dictionary of all RawTelemetry and DerivedTelemetry objects + derived_process_order: List[str] + List of derived telemetry in the calculation order for updating + + Parameters + ---------- + telem_definitions : Dict[str, Dict] + Dictionary containing raw and derived telemetry definition formatted + according to the YAML config definition + """ + + def __init__(self, telem_definitions: Dict[str, Dict], event_definitions: Dict[str, Dict]) -> None: + self.raw_telem: Dict[str, RawTelemetry] = {} + self.derived_telem: Dict[str, DerivedTelemetry] = {} + self.log_telem: List[str] = [] + self.events: Dict[str, Event] = {} + + self.derived_process_order = [] + try: + for name, init_info in telem_definitions['Raw_Telemetry'].items(): + self.__add_raw_telem(name, init_info) + if 'log' in init_info.keys() and init_info['log']: + self.log_telem.append(name) + if telem_definitions['Derived_Telemetry'] is not None: + for name, init_info in telem_definitions['Derived_Telemetry'].items(): + self.__add_derived_telem(name, init_info) + if 'log' in init_info.keys() and init_info['log']: + self.log_telem.append(name) + except KeyError as e: + print('Invalid telemetry yaml:', e) + quit() + + self.telem: Dict[str, Telemetry] = self.raw_telem | self.derived_telem + self.__calc_derived_telem_process_order() + + try: + for event in event_definitions['Events']: + self.events.update( + {event['name']: Event(event, list(self.telem.keys()))}) + except KeyError as e: + print('Invalid event yaml:', e) + quit() + self.decisions = {event: False for event in self.events.keys()} + self.scores = {event: None for event in self.events.keys()} + + @property + def previous(self): + """Dict[str, Dict[str, Telemetry]]: A dictionary of the telemetry at the previous time step""" + + previous_raw_telem = { + key: val.previous_telem for key, val in self.raw_telem.items()} + previous_derived_telem = { + key: val.previous_telem for key, val in self.derived_telem.items()} + return {"Raw_Telemetry": previous_raw_telem, "Derived_Telemetry": previous_derived_telem} + + def check_events(self): + """Check if each event has occured using the python object method + + Returns + ------- + Dict[str, bool] + Dictionary of event names and the corresponding result + """ + + for _, event in self.events.items(): + score = event.calculate_distance(self.telem) + self.scores[event.name] = score + decision = event.evaluate(score) + event.persistence.pop() + event.persistence.appendleft(decision) + if all(i != self.decisions[event.name] for i in event.persistence): + self.decisions[event.name] = decision + + def run_step(self, telem_list: Dict) -> Dict[str, bool]: + """Run a full step of the MEDOS algorithm. + + Parameters + ---------- + telem_list : Dict + Dictionary of new telemetry + + Returns + ------- + Dict[str, bool] + Dictionary of event names and the corresponding result + """ + self.update_telem(telem_list) + self.make_decision() + return self.scores, self.decisions + + def update_telem(self, telem_list: Dict): + """Update raw telemetry from a dictionary and run an update on the + derived telemetry. + + Parameters + ---------- + telem_list : Dict + Dictionary of new telemetry + """ + + self.__save_previous_step() + for name, telem in telem_list.items(): + if name in self.raw_telem.keys(): + self.raw_telem[name].update(telem) + self.__update_derived_telem() + + def make_decision(self) -> Dict[str, bool]: + """Makes a decision based on the current telemetry. + + Returns + ------- + Dict[str, bool] + Dictionary of event names and the corresponding result + """ + self.check_events() + + def __calc_derived_telem_process_order(self): + """Calculate the order to process derived telemetry from the expressions. + + Calculates the process order of derived telemetry using the keys found + in their expressions. + """ + + self.derived_process_order = [] + for name in self.derived_telem.keys(): + self.__process_dependencies(name, self.derived_process_order) + + def __process_dependencies(self, name: str, process_order: list): + """Process all identified dependencies of a DerivedTelemetry object + + Processes the derived telemetry dependency chain in order to calculate + the order to process the telemetry. Telemetry keys are appended to + `process_order`. + + Parameters + ---------- + name : str + Name of the derived telemetry to be processed + process_order : list + The derived telemetry process order list + """ + # Check if we've processed this telemetry + if name not in process_order: + # Check if we've processed all required telemetry for this one + if not self.derived_telem[name].nodes.issubset(set(process_order)): + # Check and process any child dependencies we're missing + for key in self.derived_telem[name].nodes - set(process_order): + if key not in self.raw_telem.keys(): + self.__process_dependencies(key, process_order) + + # Check if we've processed this telemetry + # (it could be processed from an above chain) + if name not in process_order: + # Process the telemetry + process_order.append(name) + + def __add_raw_telem(self, name: str, init_info: Dict): + """Create a RawTelemetry object and add to telemetry list + + Parameters + ---------- + name : str + Name of the telemetry. Used as the dictionary key. + init_info : Dict + Init info for the telemetry, defined by the YAML config. + """ + + raw_telem = RawTelemetry(name, init_info['initial_value']) + raw_telem.link_previous_telem(RawTelemetry( + name + '_previous', init_info['initial_value'])) + self.raw_telem.update( + {name: raw_telem}) + + def __add_derived_telem(self, name: str, init_info: Dict): + """Create a DerivedTelemetry object and add to telemetry list + + Parameters + ---------- + name : str + Name of the telemetry. Used as the dictionary key. + init_info : Dict + Init info for the telemetry, defined by the YAML config. + """ + + derived_telem = DerivedTelemetry( + name, init_info['initial_value'], init_info['expression']) + derived_telem.link_previous_telem(DerivedTelemetry( + name + '_previous', init_info['initial_value'], init_info['expression'])) + self.derived_telem.update({name: derived_telem}) + + def __save_previous_step(self): + """Save all current telemetry as the previous telemetry to prepare for + the next time step. + + """ + for _, telem in self.raw_telem.items(): + telem.save_previous() + for _, telem in self.derived_telem.items(): + telem.save_previous() + + def __update_derived_telem(self): + """ + Update all derived telemetry. + """ + + for name in self.derived_process_order: + self.__process_derived_telem(name) + + def __process_derived_telem(self, name: str): + """Collect all required telemetry and update a derived telemetry object + + Parameters + ---------- + name : str + Name of the DerivedTelemetry object to be processed + """ + + keys = self.derived_telem[name].dependency_keys.intersection( + set(self.telem.keys())).union({name}) + input_dict = {key: self.telem[key] for key in keys} + + input_dict = {key: self.telem[key] for key in keys} + self.derived_telem[name].update(input_dict) + + def __repr__(self) -> str: + return f'Telemetry Manager:\n\tRaw Telemetry: {self.raw_telem}\n\tDerived Telemetry: {self.derived_telem}' \ No newline at end of file diff --git a/onair/config/namaste_test.ini b/onair/config/namaste_test.ini new file mode 100644 index 00000000..eb10ef92 --- /dev/null +++ b/onair/config/namaste_test.ini @@ -0,0 +1,17 @@ +[DEFAULT] +TelemetryDataFilePath = onair/data/raw_telemetry_data/ +TelemetryMetadataFilePath = onair/data/telemetry_configs/ +MetaFiles = ['namaste_TLM_CONFIG.json'] +TelemetryFiles = ['namaste_TLM.txt'] +ParserFileName = forty_two_parser +ParserName = FortyTwo +SimName = CSV +PluginList = ['medos'] + +[RUN_FLAGS] +IO_Flag = true +Dev_Flag = false +SBN_Flag = false +Viz_Flag = false +Redis_Flag = true + diff --git a/onair/data/raw_telemetry_data/namaste_TLM.txt b/onair/data/raw_telemetry_data/namaste_TLM.txt new file mode 100644 index 00000000..ee86dc9b --- /dev/null +++ b/onair/data/raw_telemetry_data/namaste_TLM.txt @@ -0,0 +1,7 @@ + + +SC[0].sensor_value = 0.000000000000e+00 +SC[0].concentration = 0.000000000000e+00 +SC[0].pos = [0.000000000000e+00, 0.000000000000e+00, 0.000000000000e+00] +[EOF] + diff --git a/onair/data/telemetry_configs/namaste_TLM_CONFIG.json b/onair/data/telemetry_configs/namaste_TLM_CONFIG.json new file mode 100644 index 00000000..ba8e6ece --- /dev/null +++ b/onair/data/telemetry_configs/namaste_TLM_CONFIG.json @@ -0,0 +1,33 @@ +{ + "subsystems": { + "DRONE_0": { + "sensor_value": { + "conversion": "", + "tests": { + "NOOP": "[]" + }, + "description": "No description" + }, + "concentration": { + "conversion": "", + "tests": { + "NOOP": "[]" + }, + "description": "No description" + }, + "pos": { + "conversion": "", + "tests": { + "NOOP": "[]" + }, + "description": "No description" + } + } + }, + "order": [ + "sensor_value", + "concentration", + "pos", + "timestamp" + ] +} \ No newline at end of file diff --git a/onair/src/reasoning/agent.py b/onair/src/reasoning/agent.py index c16da21b..539a1532 100644 --- a/onair/src/reasoning/agent.py +++ b/onair/src/reasoning/agent.py @@ -13,13 +13,15 @@ """ from ..data_driven_components.data_driven_learning import DataDrivenLearning from ..reasoning.diagnosis import Diagnosis +from edp.edp_plugin import EDP class Agent: - def __init__(self, vehicle): + def __init__(self, plugin_list, vehicle): self.vehicle_rep = vehicle - self.learning_systems = DataDrivenLearning(self.vehicle_rep.get_headers()) + self.learning_systems = DataDrivenLearning(self.vehicle_rep.get_headers(),plugin_list) self.mission_status = self.vehicle_rep.get_status() self.bayesian_status = self.vehicle_rep.get_bayesian_status() + self.planner = EDP() # Markov Assumption holds def reason(self, frame): @@ -29,7 +31,9 @@ def reason(self, frame): def diagnose(self, time_step): """ Grab the mnemonics from the """ - learning_system_results = self.learning_systems.render_reasoning() + learning_system_results = self.learning_systems.render_diagnosis() + if not self.planner.peak_found: + self.planner.perform_reasoning(learning_system_results) diagnosis = Diagnosis(time_step, learning_system_results, self.bayesian_status, diff --git a/onair/src/run_scripts/execution_engine.py b/onair/src/run_scripts/execution_engine.py index d7c19043..b46c0490 100644 --- a/onair/src/run_scripts/execution_engine.py +++ b/onair/src/run_scripts/execution_engine.py @@ -18,6 +18,7 @@ import shutil from distutils.dir_util import copy_tree from time import gmtime, strftime +import ast from ...data_handling.time_synchronizer import TimeSynchronizer from ..run_scripts.sim import Simulator @@ -33,6 +34,7 @@ def __init__(self, config_file='', run_name='', save_flag=False): self.Dev_Flag = False self.SBN_Flag = False self.Viz_Flag = False + self.Redis_Flag = False # Init Paths self.dataFilePath = '' @@ -79,12 +81,15 @@ def parse_configs(self, config_filepath): self.parser_file_name = config['DEFAULT']['ParserFileName'] self.parser_name = config['DEFAULT']['ParserName'] self.sim_name = config['DEFAULT']['SimName'] + self.plugin_list = ast.literal_eval(config['DEFAULT']['PluginList']) ## Sort Data: Flags self.IO_Flag = config['RUN_FLAGS'].getboolean('IO_Flag') self.Dev_Flag = config['RUN_FLAGS'].getboolean('Dev_Flag') self.SBN_Flag = config['RUN_FLAGS'].getboolean('SBN_Flag') self.Viz_Flag = config['RUN_FLAGS'].getboolean('Viz_Flag') + self.Redis_Flag = config['RUN_FLAGS'].getboolean('Redis_Flag') + def parse_data(self, parser_name, parser_file_name, dataFilePath, metadataFilePath, subsystems_breakdown=False): parser = importlib.import_module('onair.data_handling.parsers.' + parser_file_name) @@ -95,7 +100,7 @@ def parse_data(self, parser_name, parser_file_name, dataFilePath, metadataFilePa self.processedSimData = TimeSynchronizer(*parsed_data.get_sim_data()) def setup_sim(self): - self.sim = Simulator(self.sim_name, self.processedSimData, self.SBN_Flag) + self.sim = Simulator(self.sim_name, self.processedSimData, self.plugin_list, self.SBN_Flag, self.Redis_Flag) try: fls = ast.literal_eval(self.benchmarkFiles) fp = os.path.dirname(os.path.realpath(__file__)) + '/../..' + self.benchmarkFilePath diff --git a/onair/src/run_scripts/redis_adapter.py b/onair/src/run_scripts/redis_adapter.py new file mode 100644 index 00000000..6cd82686 --- /dev/null +++ b/onair/src/run_scripts/redis_adapter.py @@ -0,0 +1,90 @@ +# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" +# +# Copyright © 2023 United States Government as represented by the Administrator of +# the National Aeronautics and Space Administration. No copyright is claimed in the +# United States under Title 17, U.S. Code. All Other Rights Reserved. +# +# Licensed under the NASA Open Source Agreement version 1.3 +# See "NOSA GSC-19165-1 OnAIR.pdf" + +""" +redis_adapter AdapterDataSource class + +Receives messages from REDIS server, serves as a data source for sim.py +""" + +import threading +import time +import redis +import json + +from ...data_handling.data_source import DataSource + +class AdapterDataSource(DataSource): + + def __init__(self, data=[]): + super().__init__(data) + self.address = 'localhost' + self.port = 6379 + self.db = 0 + self.server = None + self.new_data_lock = threading.Lock() + self.new_data = False + self.currentData = [] + self.currentData.append({'headers':None, 'data':None}) + self.currentData.append({'headers':None, 'data':None}) + self.double_buffer_read_index = 0 + + def connect(self): + """Establish connection to REDIS server.""" + self.server = redis.Redis(self.address, self.port, self.db) + + + def subscribe_message(self, channel): + """Subscribe to REDIS message channel and launch listener thread.""" + if self.server.ping(): + self.pubsub = self.server.pubsub() + self.pubsub.subscribe(channel) + + listen_thread = threading.Thread(target=self.message_listener) + listen_thread.start() + + + def get_next(self): + """Provides the latest data from REDIS channel""" + data_available = False + + while not data_available: + with self.new_data_lock: + data_available = self.new_data + + if not data_available: + time.sleep(0.01) + + read_index = 0 + with self.new_data_lock: + self.new_data = False + self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2 + read_index = self.double_buffer_read_index + + # print("Reading buffer: {}".format(read_index)) + return self.currentData[read_index]['data'] + + def has_more(self): + """Live connection should always return True""" + return True + + def message_listener(self): + """Loop for listening for messages on channel""" + for message in self.pubsub.listen(): + # print("Received from REDIS: ", message) + if message['type'] == 'message': + data = json.loads(message['data']) + + currentData = self.currentData[(self.double_buffer_read_index + 1) %2] + currentData['headers'] = list(data.keys()) + currentData['data'] = list(data.values()) + + with self.new_data_lock: + self.new_data = True + diff --git a/onair/src/run_scripts/sim.py b/onair/src/run_scripts/sim.py index 0aa15708..d8832c89 100644 --- a/onair/src/run_scripts/sim.py +++ b/onair/src/run_scripts/sim.py @@ -13,6 +13,7 @@ """ import importlib +import time from ..reasoning.agent import Agent from ..systems.vehicle_rep import VehicleRepresentation @@ -25,7 +26,7 @@ DIAGNOSIS_INTERVAL = 100 class Simulator: - def __init__(self, simType, parsedData, SBN_Flag): + def __init__(self, simType, parsedData, plugin_list, SBN_Flag, Redis_Flag=False): self.simulator = simType vehicle = VehicleRepresentation(*parsedData.get_vehicle_metadata()) @@ -36,10 +37,15 @@ def __init__(self, simType, parsedData, SBN_Flag): AdapterDataSource = getattr(sbn_adapter, 'AdapterDataSource') self.simData = AdapterDataSource(parsedData.get_sim_data()) self.simData.connect() # this also subscribes to the msgIDs - + elif Redis_Flag: + redis_adapter = importlib.import_module('onair.src.run_scripts.redis_adapter') + AdapterDataSource = getattr(redis_adapter, 'AdapterDataSource') + self.simData = AdapterDataSource(parsedData.get_sim_data()) + self.simData.connect() # this also subscribes to the msgIDs + self.simData.subscribe_message('drone_0_state') else: self.simData = DataSource(parsedData.get_sim_data()) - self.agent = Agent(vehicle) + self.agent = Agent(plugin_list,vehicle) def run_sim(self, IO_Flag=False, dev_flag=False, viz_flag = True): if IO_Flag == True: print_sim_header() @@ -49,22 +55,28 @@ def run_sim(self, IO_Flag=False, dev_flag=False, viz_flag = True): last_diagnosis = time_step last_fault = time_step - while self.simData.has_more() and time_step < MAX_STEPS: + # while self.simData.has_more() and time_step < MAX_STEPS: + while self.simData.has_more(): next = self.simData.get_next() self.agent.reason(next) self.IO_check(time_step, IO_Flag) ### Stop when a fault is reached - if self.agent.mission_status == 'RED': - if last_fault == time_step - 1: #if they are consecutive - if (time_step - last_diagnosis) % DIAGNOSIS_INTERVAL == 0: - diagnosis_list.append(self.agent.diagnose(time_step)) - last_diagnosis = time_step - else: - diagnosis_list.append(self.agent.diagnose(time_step)) - last_diagnosis = time_step - last_fault = time_step + # if self.agent.mission_status == 'RED': + # if last_fault == time_step - 1: #if they are consecutive + # if (time_step - last_diagnosis) % DIAGNOSIS_INTERVAL == 0: + # diagnosis_list.append(self.agent.diagnose(time_step)) + # last_diagnosis = time_step + # else: + # diagnosis_list.append(self.agent.diagnose(time_step)) + # last_diagnosis = time_step + # last_fault = time_step + # time_step += 1 + + diagnosis_list.append(self.agent.diagnose(time_step)) + last_diagnosis = time_step + last_fault = time_step time_step += 1 # Final diagnosis processing diff --git a/onair/src/util/print_io.py b/onair/src/util/print_io.py index 6abef33b..facc0d36 100644 --- a/onair/src/util/print_io.py +++ b/onair/src/util/print_io.py @@ -23,6 +23,8 @@ class bcolors: ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' + MEDOS = '\033[34m' + EDP = '\033[92m' # Global colors dictionary scolors = {'HEADER' : bcolors.HEADER, @@ -32,7 +34,9 @@ class bcolors: 'FAIL' : bcolors.FAIL, 'ENDC' : bcolors.ENDC, 'BOLD' : bcolors.BOLD, - 'UNDERLINE' : bcolors.UNDERLINE} + 'UNDERLINE' : bcolors.UNDERLINE, + 'MEDOS': bcolors.MEDOS, + 'EDP': bcolors.EDP} # Global dictionary for STATUS -> COLOR status_colors = {'GREEN' : bcolors.OKGREEN, @@ -49,7 +53,8 @@ def print_sim_header(): # Print when a new step is starting def print_sim_step(step_num): - print(bcolors.HEADER + bcolors.BOLD + "\n--------------------- STEP " + str(step_num) + " ---------------------\n" + bcolors.ENDC) + # print(bcolors.HEADER + bcolors.BOLD + "\n--------------------- STEP " + str(step_num) + " ---------------------\n" + bcolors.ENDC) + pass # Print a line to separate things def print_separator(color=bcolors.HEADER): @@ -68,10 +73,11 @@ def print_msg(msg, clrs=['HEADER']): # Print interpreted system status def print_system_status(agent, data = None): # print_separator(bcolors.OKBLUE) - if data != None: - print("CURRENT DATA: " + str(data)) - print("INTERPRETED SYSTEM STATUS: " + str(format_status(agent.mission_status))) + # if data != None: + # print("CURRENT DATA: " + str(data)) + # print("INTERPRETED SYSTEM STATUS: " + str(format_status(agent.mission_status))) # print_separator(bcolors.OKBLUE) + pass # Print diagnosis info def print_diagnosis(diagnosis): @@ -127,5 +133,11 @@ def format_status(stat): for status in stat: s = s + format_status(status) + ', ' s = s[:-2] + ')' - return s + return s + +# Print generic, colored message +def print_generic(msg, clrs=['HEADER']): + for clr in clrs: + print(scolors[clr]) + print(msg + bcolors.ENDC) diff --git a/test/src/run_scripts/test_redis_adapter.py b/test/src/run_scripts/test_redis_adapter.py new file mode 100644 index 00000000..f16cc28a --- /dev/null +++ b/test/src/run_scripts/test_redis_adapter.py @@ -0,0 +1,214 @@ +# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" +# +# Copyright © 2023 United States Government as represented by the Administrator of +# the National Aeronautics and Space Administration. No copyright is claimed in the +# United States under Title 17, U.S. Code. All Other Rights Reserved. +# +# Licensed under the NASA Open Source Agreement version 1.3 +# See "NOSA GSC-19165-1 OnAIR.pdf" + +import pytest +from mock import MagicMock, PropertyMock +import onair.src.run_scripts.redis_adapter as redis_adapter +from onair.src.run_scripts.redis_adapter import AdapterDataSource +from importlib import reload +import redis +import threading + +# __init__ tests +def test_redis_adapter_AdapterDataSource__init__sets_all_3_redis_arguments_for_later_use(): + # Arrange + expected_address = 'localhost' + expected_port = 6379 + expected_db = 0 + expected_server = None + + cut = AdapterDataSource.__new__(AdapterDataSource) + + # Act + cut.__init__() + + # Assert + assert cut.address == expected_address + assert cut.port == expected_port + assert cut.db == expected_db + assert cut.server == expected_server + +# connect tests +def test_redis_adapter_AdapterDataSource_connect_establishes_server_with_initialized_attributes(mocker): + # Arrange + expected_address = MagicMock() + expected_port = MagicMock() + expected_db = MagicMock() + fake_server = MagicMock() + + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.address = expected_address + cut.port = expected_port + cut.db = expected_db + + mocker.patch('redis.Redis', return_value=fake_server) + + # Act + cut.connect() + + # Assert + assert redis.Redis.call_count == 1 + assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db) + assert cut.server == fake_server + +# subscribe_message tests +def test_redis_adapter_AdapterDataSource_subscribe_message_and_thread_start_success_when_server_available(mocker): + # Arrange + arg_channel = str(MagicMock()) + fake_server = MagicMock() + fake_pubsub = MagicMock() + fake_subscription = MagicMock() + fake_thread = MagicMock() + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.server = fake_server + + mocker.patch.object(fake_server, 'ping', return_value=True) + mocker.patch.object(fake_server, 'pubsub', return_value=fake_pubsub) + mocker.patch.object(fake_pubsub, 'subscribe', return_value=fake_subscription) + mocker.patch('threading.Thread', return_value=fake_thread) + mocker.patch.object(fake_thread, 'start') + + # Act + cut.subscribe_message(arg_channel) + + # Assert + assert fake_server.ping.call_count == 1 + assert fake_server.pubsub.call_count == 1 + assert fake_pubsub.subscribe.call_count == 1 + assert fake_pubsub.subscribe.call_args_list[0].args == (arg_channel,) + assert threading.Thread.call_count == 1 + assert threading.Thread.call_args_list[0].kwargs == ({'target': cut.message_listener}) + assert fake_thread.start.call_count == 1 + assert cut.pubsub == fake_pubsub + +def test_redis_adapter_AdapterDataSource_subscribe_message_does_nothing_on_False(mocker): + # Arrange + arg_channel = str(MagicMock()) + fake_server = MagicMock() + initial_pubsub = MagicMock() + fake_subscription = MagicMock() + fake_thread = MagicMock() + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.server = fake_server + cut.pubsub = initial_pubsub + + mocker.patch.object(fake_server, 'ping', return_value=False) + mocker.patch.object(fake_server, 'pubsub') + mocker.patch('threading.Thread') + mocker.patch.object(fake_thread, 'start') + + # Act + cut.subscribe_message(arg_channel) + + # Assert + assert fake_server.ping.call_count == 1 + assert fake_server.pubsub.call_count == 0 + assert threading.Thread.call_count == 0 + assert fake_thread.start.call_count == 0 + assert cut.pubsub == initial_pubsub + +# get_next tests + +def test_redis_adapter_AdapterDataSource_get_next_when_new_data_is_true(): + # Arrange + # Renew AdapterDataSource to ensure test independence + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.new_data = True + cut.new_data_lock = MagicMock() + cut.double_buffer_read_index = pytest.gen.randint(0,1) + pre_call_index = cut.double_buffer_read_index + expected_result = MagicMock() + cut.currentData = [] + if pre_call_index == 0: + cut.currentData.append({'data': MagicMock()}) + cut.currentData.append({'data': expected_result}) + else: + cut.currentData.append({'data': MagicMock()}) + cut.currentData.append({'data': expected_result}) + + # Act + result = cut.get_next() + + # Assert + assert cut.new_data == False + if pre_call_index == 0: + assert cut.double_buffer_read_index == 1 + elif pre_call_index == 1: + assert cut.double_buffer_read_index == 0 + else: + assert False + + assert result == expected_result + +def test_redis_adapter_AdapterDataSource_get_next_when_called_multiple_times_when_new_data_is_true(): + # Arrange + # Renew AdapterDataSource to ensure test independence + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.double_buffer_read_index = pytest.gen.randint(0,1) + pre_call_index = cut.double_buffer_read_index + + # Act + results = [] + num_calls = pytest.gen.randint(2,10) # arbitrary, 2 to 10 + for i in range(num_calls): + cut.new_data = True + results.append(cut.get_next()) + + # Assert + assert cut.new_data == False + for i in range(num_calls): + results[i] = cut.currentData[pre_call_index]['data'] + pre_call_index = (pre_call_index + 1) % 2 + assert cut.double_buffer_read_index == pre_call_index + +def test_redis_adapter_AdapterDataSource_get_next_behavior_when_new_data_is_false_then_true(mocker): + # Arrange + # Renew AdapterDataSource to ensure test independence + cut = AdapterDataSource.__new__(AdapterDataSource) + cut.new_data_lock = MagicMock() + cut.double_buffer_read_index = pytest.gen.randint(0,1) + pre_call_index = cut.double_buffer_read_index + expected_result = MagicMock() + cut.currentData = [] + if pre_call_index == 0: + cut.currentData.append({'data': MagicMock()}) + cut.currentData.append({'data': expected_result}) + else: + cut.currentData.append({'data': MagicMock()}) + cut.currentData.append({'data': expected_result}) + + num_falses = pytest.gen.randint(1, 10) + side_effect_list = [False] * num_falses + side_effect_list.append(True) + + print(side_effect_list) + cut.new_data = PropertyMock() + cut.new_data.side_effect=side_effect_list + mocker.patch('onair.src.run_scripts.redis_adapter.time.sleep') + + # Act + result = cut.get_next() + + # Assert + assert redis_adapter.time.sleep.call_count == num_falses + assert cut.new_data == False + if pre_call_index == 0: + assert cut.double_buffer_read_index == 1 + elif pre_call_index == 1: + assert cut.double_buffer_read_index == 0 + else: + assert False + + assert result == expected_result + + +# has_more tests +def test_redis_adapter_AdapterDataSource_has_more_returns_True(): + cut = AdapterDataSource.__new__(AdapterDataSource) + assert cut.has_more diff --git a/test/test_driver.py b/test/test_driver.py index adc9a653..08bb5a42 100644 --- a/test/test_driver.py +++ b/test/test_driver.py @@ -20,5 +20,6 @@ def test_driver(self): # os.system('python3 ' + self.test_path + 'driver.py -t' ) return + if __name__ == '__main__': unittest.main()