Skip to content

Commit

Permalink
Initial test successful w/ single plugin
Browse files Browse the repository at this point in the history
OnAIR successful initializes plugin from external folder when specified in .ini config file
  • Loading branch information
cfirth-nasa committed Aug 31, 2023
1 parent 7fceac2 commit 73da159
Show file tree
Hide file tree
Showing 30 changed files with 2,210 additions and 26 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
![Build](https://github.com/nasa/OnAIR/actions/workflows/unit-test.yml/badge.svg)
[![CodeCov](https://codecov.io/gh/nasa/OnAIR/branch/main/graph/badge.svg?token=L0WVOTD5X9)](https://codecov.io/gh/nasa/OnAIR)

# The On-board Artificial Intelligence Research (OnAIR) Platform

The On-board Artificial Intelligence Research (OnAIR) Platform is a framework that enables AI algorithms written in Python to interact with NASA's [cFS](https://github.com/nasa/cFS).
Expand Down
Empty file added edp/__init__.py
Empty file.
181 changes: 181 additions & 0 deletions edp/edp_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform"
#
# Copyright © 2023 United States Government as represented by the Administrator of
# the National Aeronautics and Space Administration. No copyright is claimed in the
# United States under Title 17, U.S. Code. All Other Rights Reserved.
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"

import numpy as np
import redis
import json
import sys
from pathlib import Path
sys.path.append(Path('../'))
from onair.src.util.print_io import *

class EDP:
""" Placeholder Element class """
def __init__(self) -> None:
# Variables for pathing
self.size = 100 # grid size, grab this from some configuration somewhere
self.knowledge = np.full((self.size, self.size), np.nan)
self.current_pos = (0,0,1) # arbitrary Z, where can we pull position from in OnAIR?
self.flying_direction = np.array([0,0]) # numpy array
self.visit_queue = []
self.last = (np.inf, None)
self.peak_found = False
self.just_traversed_neighbors = False

self.arrived_at_last_cmd = False

self.last_cmd_pos = (0,0,1)

# assuming (0,0) is top left, neighbor visitation order:
# right, bottom right, down, bottom left, left, upper left, up, upper right
self.neighbor_directions = np.array([(0,1), (1,1), (1,0), (1,-1),
(0,-1), (-1, -1), (-1,0), (-1,1)])

self.redis_pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
self.redis = redis.Redis(connection_pool=self.redis_pool)

def perform_reasoning(self, info: dict) -> list:
# info format: {"medos": {"sensor_value": float, "timestamp": timestamp, "concentration": float,
# "pos": [float, float, float], "event_in_progress": True} }
# info format: {'sensor_value': float, 'methane event': bool, 'position': tuple
# 'info' can contain more information relevant to planning
# TODO: a param 'task', which specifies what type of planning for EDP to do...
# TODO: compare with greedier algo where we move in direction of first higher neighbor

# Check if sufficient info for current planning task
try:
# verify keys in dict
info['medos']['concentration']
info['medos']['pos']
except KeyError:
print('TERMINATED PLANNING: "concentration" and "pos" are required information')
return []

try:
# verify position values can be integers
# self.current_pos = tuple(int(val) for val in info['medos']['pos']) # [float, float, float]
self.current_pos = tuple(int(val) for val in info['medos']['pos'][:2]) + (info['medos']['pos'][2],)
except ValueError:
print(f'TERMINATED PLANNING: "pos" values ({pos}) cannot be converted to integers')
return []



# dont plan next move until arrived at last location
# careful about comparisons here with the floats
if self.current_pos[:2] == self.last_cmd_pos[:2]:
self.arrived_at_last_cmd = True
if not self.arrived_at_last_cmd:
#print("didn't move yet")
return []

#print('current_pos:',self.current_pos,
# 'last_cmd_pos:', self.last_cmd_pos)

current_concentration = info['medos']['concentration']
xy_pos = self.current_pos[:2]
self.knowledge[xy_pos] = current_concentration

neighbors = self._get_neighbor_data(xy_pos, self.knowledge)
if all( not np.isnan(n[2]) for n in neighbors ) and all( current_concentration >= n[2] for n in neighbors ):
print('== LANDING')
# Land
self.peak_found = True
next_pos = self.current_pos[:2] + (0,)
print('Final position:', next_pos)
result = [list(map(float, next_pos))]
self.redis.publish('drone_cmds', json.dumps(result))
return []
else:
next_pos = self._get_next_best_pos(current_concentration, xy_pos) # only X, Y needed

#next_pos += (self.current_pos[2],) # Add static Z coord
next_pos += (1,) # Add static Z coord

print_generic(f'[EDP] Commanding drone to: {next_pos}', clrs=['EDP'])

from copy import deepcopy
self.last_cmd_pos = deepcopy(next_pos)
self.arrived_at_last_cmd = False

# Send to command channel
result = [list(map(float, next_pos))]
self.redis.publish('drone_cmds', json.dumps(result))

return result

# Params could change depending on what will stay as attributes of EDP class
# pos can be accessed via self.current_pos, knowledge of grid via self.knowledge...
def _get_neighbor_data(self, pos: tuple, knowledge) -> list:
size = knowledge.shape[0]
neighbors = []
for direction in self.neighbor_directions:
#direction = np.array(direction)
position = tuple(pos + direction)
if 0 <= position[0] < size and 0 <= position[1] < size:
concentration = knowledge[position]
neighbors.append((direction, position, concentration))

return neighbors

def _traverse_neighbors(self, pos: tuple, neighbors: list) -> None:
# Each neighbor is a tuple (direction, position, concentration)
# Add neighbors to visit_queue
for n in neighbors:
direction, position, concentration = n

# Only add to the queue if it's an unknown grid cell
if np.isnan(self.knowledge[position]):
self.visit_queue.append(position)

# Return to current position after visiting
self.visit_queue.append(pos)
self.just_traversed_neighbors = True

def _get_next_best_pos(self, current_concentration: float, pos: tuple) -> tuple:
# get neighbors to check for peak
neighbors = self._get_neighbor_data(pos, self.knowledge)
#if all( not np.isnan(n[2]) for n in neighbors ) and all( current_concentration >= n[2] for n in neighbors ):
# self.peak_found = True
# return pos

# if exploring neighbors, continue
if self.visit_queue:
return self.visit_queue.pop(0)

# traverse neighbors if last concentration was higher (meaning we decreased)
if not self.just_traversed_neighbors and current_concentration <= self.last[0]:
if self.flying_direction.any(): # go back
self.visit_queue.append(self.last[1])
pos = self.last[1]
else:
self.last = (current_concentration, pos) # log current
neighbors = self._get_neighbor_data(pos, self.knowledge)
self._traverse_neighbors(pos, neighbors)
self.flying_direction = np.array([0,0])
return self.visit_queue.pop(0)

self.just_traversed_neighbors = False
self.last = (current_concentration, pos) # log current

if self.flying_direction.any():
next_pos = tuple(pos + self.flying_direction)
if 0 <= next_pos[0] < self.size and 0 <= next_pos[1] < self.size:
return next_pos
else: # invalid fly location
self._traverse_neighbors(pos, neighbors)
self.flying_direction = np.array([0,0])
return self.visit_queue.pop(0)

# If none of the above cases, just grab heighest valued neighbor
best_neighbor = max(neighbors, key = lambda x: x[2])
self.flying_direction = best_neighbor[0]
next_pos = tuple(pos + self.flying_direction)
return next_pos

28 changes: 28 additions & 0 deletions external_plugins_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys

sys.path.append('OnAIR')

import pytest
from onair.src.run_scripts.execution_engine import ExecutionEngine
from medos_plugin.medos_plugin import Plugin
from driver import init_global_paths


import numpy as np

def test_plugin_defined_in_config_is_imported_by_ddl_class(mocker):
# Arrange

# Need to add something to pass a dummy frame of data - currently hardcoded in medos_plugin.py
config_file = 'onair/config/namaste_test.ini'
fake_save_path = ''
init_global_paths() # Fix error on execution engine init where internal required paths unavailable


# Action
test_engine = ExecutionEngine(config_file,fake_save_path,False)

# Assert
assert sum([type(construct) == Plugin for construct in test_engine.sim.agent.learning_systems.ai_constructs]) == len(test_engine.sim.agent.learning_systems.ai_constructs)

return
51 changes: 51 additions & 0 deletions generic_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform"
#
# Copyright © 2023 United States Government as represented by the Administrator of
# the National Aeronautics and Space Administration. No copyright is claimed in the
# United States under Title 17, U.S. Code. All Other Rights Reserved.
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"

from abc import ABC, abstractmethod
"""This object serves as a proxy for all plug-ins.
Therefore, the AIPlugIn object is meant to induce
standards and structures of compliance for user-created
and/or imported plug-ins/libraries
"""
class AIPlugIn(ABC):
def __init__(self, _name, _headers):
"""
Superclass for data driven components: VAE, PPO, etc. Allows for easier modularity.
"""
assert(len(_headers)>0)
self.component_name = _name
self.headers = _headers

@abstractmethod
def apriori_training(self, batch_data=[]):
"""
Given data, system should learn any priors necessary for realtime diagnosis.
"""
# I dont know yet whether we should allow empty frames from updates
# The batch data format could change
# depending on how the tutorial fleshes out (too early to tell)
# There will be no return from this function (user can pull training)
# data from the construct itself)
# I dont know yet whether we should allow empty batch data
raise NotImplementedError

@abstractmethod
def update(self, frame=[]):
"""
Given streamed data point, system should update internally
"""
raise NotImplementedError

@abstractmethod
def render_diagnosis(self):
"""
System should return its diagnosis
"""
raise NotImplementedError

99 changes: 99 additions & 0 deletions integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import sys

sys.path.append('OnAIR')

import pytest
from mock import MagicMock
import onair.src.reasoning.agent as agent
from onair.src.reasoning.diagnosis import Diagnosis
from onair.src.systems.vehicle_rep import VehicleRepresentation
from onair.src.data_driven_components.data_driven_learning import DataDrivenLearning

import numpy as np

def test_medos_output_matches_expected_sets_input(mocker):
# Arrange

# Need to add something to pass a dummy frame of data - currently hardcoded in medos_plugin.py
header = ['no_op']
test = [True]
vehicle = VehicleRepresentation(header,test)
test_agent = agent.Agent(vehicle)
test_agent.learning_systems = DataDrivenLearning(header,['medos'])

fakeDiagnosis = MagicMock()
argTimeStep = MagicMock()
mocker.patch('onair.src.reasoning.agent.Diagnosis',return_value=fakeDiagnosis)
mocker.patch.object(vehicle, 'get_current_faulting_mnemonics')
fake_frame = {"sensor_value": 42, "concentration": 4242, "pos": [1.0, 1.0, 10.0], "timestamp": 1}
test_agent.learning_systems.update(fake_frame,status=np.array([1]))

expected = {'medos': {
'sensor_value': 42,
'timestamp': 1,
'concentration': 4242,
'pos': [1.0, 1.0, 10.0],
'event_in_progress': False
}}


# Action
test_agent.diagnose(argTimeStep)

# Assert
assert agent.Diagnosis.call_count == 1
assert agent.Diagnosis.call_args_list[0].args[1] == expected

def test_if_edp_return_result_has_correct_dict_key(mocker):
# Arrange
header = ['no_op']
test = [True]
vehicle = VehicleRepresentation(header,test)

fakeAgent = agent.Agent(vehicle)
fakeTimeStep = MagicMock()
fakeLearningSystem = MagicMock() # may need to make DataDrivenLearning class
fakeAgent.learning_systems = fakeLearningSystem

info_dict = {'medos': {
'sensor_value': 42,
'timestamp': 1,
'concentration': 8.743,
'pos': [0.0, 0.0, 10.0],
'event_in_progress': True
}
}
mocker.patch.object(fakeLearningSystem,
'render_diagnosis',
return_value=info_dict)
mocker.patch.object(vehicle, 'get_current_faulting_mnemonics')

# Action
result = fakeAgent.diagnose(fakeTimeStep)

# Assert
assert "next_pos" in result

def test_medos_handoff_to_edp(mocker):
# Arrange
header = ['no_op']
test = [True]
vehicle = VehicleRepresentation(header,test)
test_agent = agent.Agent(vehicle)
test_agent.learning_systems = DataDrivenLearning(header,['medos'])

fakeDiagnosis = MagicMock()
argTimeStep = MagicMock()
mocker.patch.object(vehicle, 'get_current_faulting_mnemonics')
fake_frame = {"sensor_value": 42, "concentration": 4242, "pos": [1.0, 1.0, 10.0], "timestamp": 1}

# this update method adds a medos result to fake_agent's learning_system_results
test_agent.learning_systems.update(fake_frame,status=np.array([1]))

# Action
test_agent.diagnose(argTimeStep)
result = test_agent.diagnose(argTimeStep)

# Assert
# next_pos should not be equal to the current position from the frame
assert result["next_pos"] != fake_frame["pos"]
Loading

0 comments on commit 73da159

Please sign in to comment.