Skip to content

Commit

Permalink
Removed a lingering merge req comment?
Browse files Browse the repository at this point in the history
  • Loading branch information
Evana Gizzi committed Oct 9, 2023
2 parents 7fc6c23 + a708744 commit 0208034
Show file tree
Hide file tree
Showing 10 changed files with 740 additions and 107 deletions.
12 changes: 12 additions & 0 deletions onair/config/redis_example.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[DEFAULT]
TelemetryDataFilePath = onair/data/raw_telemetry_data/data_physics_generation/Errors
TelemetryFile = 700_crash_to_earth_1.csv
TelemetryMetadataFilePath = onair/data/telemetry_configs/
MetaFile = data_physics_generation_CONFIG.json
ParserFileName = onair/data_handling/redis_adapter.py
PluginList = {'generic_plugin':'plugins/generic/generic_plugin.py'}

[RUN_FLAGS]
IO_Flag = true
Dev_Flag = false
Viz_Flag = false
11 changes: 1 addition & 10 deletions onair/data_handling/csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,9 @@ def parse_csv_data(self, data_file):
return all_data

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
parsed_meta_data = extract_meta_data(meta_data_file)
if ss_breakdown == False:
num_elements = len(parsed_meta_data['subsystem_assignments'])
parsed_meta_data['subsystem_assignments'] = [['MISSION'] for elem in range(num_elements)]
return parsed_meta_data
return extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)

##### GETTERS ##################################
def get_sim_data(self):
return self.all_headers, self.sim_data, self.binning_configs

def get_just_data(self):
return self.sim_data

def get_vehicle_metadata(self):
return self.all_headers, self.binning_configs['test_assignments']
Expand Down
7 changes: 7 additions & 0 deletions onair/data_handling/parser_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
from pandas import to_datetime
import datetime

def extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown):
parsed_meta_data = extract_meta_data(meta_data_file)
if ss_breakdown == False:
num_elements = len(parsed_meta_data['subsystem_assignments'])
parsed_meta_data['subsystem_assignments'] = [['MISSION'] for elem in range(num_elements)]
return parsed_meta_data

## Method to extract configuration data and return 3 dictionaries
def extract_meta_data(meta_data_file):
assert meta_data_file != ''
Expand Down
119 changes: 119 additions & 0 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform"
#
# Copyright © 2023 United States Government as represented by the Administrator of
# the National Aeronautics and Space Administration. No copyright is claimed in the
# United States under Title 17, U.S. Code. All Other Rights Reserved.
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"

"""
redis_adapter AdapterDataSource class
Receives messages from REDIS server, serves as a data source for sim.py
"""

import threading
import time
import redis
import json

from onair.data_handling.on_air_data_source import OnAirDataSource
from onair.data_handling.tlm_json_parser import parseJson
from onair.src.util.print_io import *
from onair.data_handling.parser_util import *

class DataSource(OnAirDataSource):

def __init__(self, data_file, meta_file, ss_breakdown = False):
super().__init__(data_file, meta_file, ss_breakdown)
self.address = 'localhost'
self.port = 6379
self.db = 0
self.server = None
self.new_data_lock = threading.Lock()
self.new_data = False
self.currentData = []
self.currentData.append({'headers':None, 'data':None})
self.currentData.append({'headers':None, 'data':None})
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)

def connect(self):
"""Establish connection to REDIS server."""
print_msg('Redis adapter connecting to server...')
self.server = redis.Redis(self.address, self.port, self.db)

if self.server.ping():
print_msg('... connected!')

def subscribe(self, subscriptions):
"""Subscribe to REDIS message channel(s) and launch listener thread."""
if len(subscriptions) != 0 and self.server.ping():
self.pubsub = self.server.pubsub()

for s in subscriptions:
self.pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s}")

listen_thread = threading.Thread(target=self.message_listener)
listen_thread.start()
else:
print_msg(f"No subscriptions given!")

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
self.subscriptions = []

return configs

def process_data_file(self, data_file):
print("Redis Adapter ignoring file")

def get_vehicle_metadata(self):
return self.all_headers, self.binning_configs['test_assignments']

def get_next(self):
"""Provides the latest data from REDIS channel"""
data_available = False

while not data_available:
with self.new_data_lock:
data_available = self.has_data()

if not data_available:
time.sleep(0.01)

read_index = 0
with self.new_data_lock:
self.new_data = False
self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2
read_index = self.double_buffer_read_index

return self.currentData[read_index]['data']

def has_more(self):
"""Live connection should always return True"""
return True

def message_listener(self):
"""Loop for listening for messages on channel"""
for message in self.pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])

currentData = self.currentData[(self.double_buffer_read_index + 1) %2]
currentData['headers'] = list(data.keys())
currentData['data'] = list(data.values())

with self.new_data_lock:
self.new_data = True

def has_data(self):
return self.new_data

3 changes: 2 additions & 1 deletion requirements_pip.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ orjson==3.8.8
pandas==1.5.1
pytest==7.2.0
pytest-mock==3.10.0
pytest-randomly==3.12.0
pytest-randomly==3.12.0
redis==4.6.0
102 changes: 6 additions & 96 deletions test/onair/data_handling/test_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,116 +161,26 @@ def test_CSV_parse_csv_data_returns_list_of_data_frames_call_to_iterrows_returns
assert fake_columns_str.contains.call_args_list[0].args == ('^Unnamed', )
assert result == expected_result

# CSV parse_config_data tests # TODO: delete me?
def test_CSV_parse_meta_data_file_returns_call_to_extract_meta_data_file_given_metadata_file_and_csv_set_to_True_when_given_ss_breakdown_does_not_resolve_to_False(mocker, setup_teardown):
# CSV parse_meta_data tests
def test_CSV_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown(mocker, setup_teardown):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = True if pytest.gen.randint(0, 1) else MagicMock()
arg_ss_breakdown = MagicMock()

expected_result = MagicMock()

mocker.patch(csv_parser.__name__ + '.extract_meta_data', return_value=expected_result)
mocker.patch(csv_parser.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_result)
mocker.patch(csv_parser.__name__ + '.len')

# Act
result = pytest.cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown)

# Assert
assert csv_parser.extract_meta_data.call_count == 1
assert csv_parser.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert csv_parser.extract_meta_data_handle_ss_breakdown.call_count == 1
assert csv_parser.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown, )
assert csv_parser.len.call_count == 0
assert result == expected_result

def test_CSV_parse_meta_data_file_returns_call_to_extract_meta_data_file_given_metadata_file_and_csv_set_to_True_with_dict_def_of_subsystem_assigments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_set_to_empty_list_when_len_of_call_value_dict_def_of_subsystem_assigments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_is_0_when_given_ss_breakdown_evaluates_to_False(mocker, setup_teardown):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = False if pytest.gen.randint(0, 1) else 0

forced_return_extract_meta_data = {}
forced_return_len = 0
fake_empty_processed_filepath = MagicMock()
forced_return_extract_meta_data['subsystem_assignments'] = fake_empty_processed_filepath

expected_result = []

mocker.patch(csv_parser.__name__ + '.extract_meta_data', return_value=forced_return_extract_meta_data)
mocker.patch(csv_parser.__name__ + '.len', return_value=forced_return_len)

# Act
result = pytest.cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown)

# Assert
assert csv_parser.extract_meta_data.call_count == 1
assert csv_parser.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert csv_parser.len.call_count == 1
assert csv_parser.len.call_args_list[0].args == (fake_empty_processed_filepath, )
assert result['subsystem_assignments'] == expected_result

def test_CSV_parse_meta_data_file_returns_call_to_extract_meta_data_given_metadata_file_and_csv_set_to_True_with_dict_def_subsystem_assignments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_set_to_single_item_list_str_MISSION_for_each_item_when_given_ss_breakdown_evaluates_to_False(mocker, setup_teardown):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = False if pytest.gen.randint(0, 1) else 0

forced_return_extract_meta_data = {}
forced_return_process_filepath = MagicMock()
fake_processed_filepath = []
num_fake_processed_filepaths = pytest.gen.randint(1,10) # arbitrary, from 1 to 10 (0 has own test)
for i in range(num_fake_processed_filepaths):
fake_processed_filepath.append(i)
forced_return_extract_meta_data['subsystem_assignments'] = fake_processed_filepath
forced_return_len = num_fake_processed_filepaths

expected_result = []
for i in range(num_fake_processed_filepaths):
expected_result.append(['MISSION'])

mocker.patch(csv_parser.__name__ + '.extract_meta_data', return_value=forced_return_extract_meta_data)
mocker.patch(csv_parser.__name__ + '.len', return_value=forced_return_len)

# Act
result = pytest.cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown)

# Assert
assert csv_parser.extract_meta_data.call_count == 1
assert csv_parser.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert csv_parser.len.call_count == 1
assert csv_parser.len.call_args_list[0].args == (fake_processed_filepath, )
assert result['subsystem_assignments'] == expected_result

# CSV get_sim_data tests
def test_CSV_get_sim_data_returns_tuple_of_all_headers_and_sim_data_and_binning_configs(setup_teardown):
# Arrange
fake_all_headers = MagicMock()
fake_sim_data = MagicMock
fake_binning_configs = MagicMock()

expected_result = (fake_all_headers, fake_sim_data, fake_binning_configs)

pytest.cut.all_headers = fake_all_headers
pytest.cut.sim_data = fake_sim_data
pytest.cut.binning_configs = fake_binning_configs

# Act
result = pytest.cut.get_sim_data()

# Assert
assert result == expected_result

# CSV get_just_data tests
def test_CSV_get_just_data_returns_list_of_data_frames(setup_teardown):
# Arrange
fake_sim_data = MagicMock()

expected_result = (fake_sim_data)

pytest.cut.sim_data = fake_sim_data

# Act
result = pytest.cut.get_just_data()

# Assert
assert result == expected_result

# CSV get_vehicle_metadata tests
def test_CSV_get_vehicle_metadata_returns_list_of_headers_and_list_of_test_assignments(setup_teardown):
# Arrange
Expand Down
76 changes: 76 additions & 0 deletions test/onair/data_handling/test_parser_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,82 @@

import onair.data_handling.parser_util as parser_util

# extract_meta_data_handle_ss_breakdown
def test_parser_util_extract_meta_data_handle_ss_breakdown_returns_call_to_extract_meta_data_file_given_metadata_file_and_csv_set_to_True_when_given_ss_breakdown_does_not_resolve_to_False(mocker):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = True if pytest.gen.randint(0, 1) else MagicMock()

expected_result = MagicMock()

mocker.patch(parser_util.__name__ + '.extract_meta_data', return_value=expected_result)
mocker.patch(parser_util.__name__ + '.len')

# Act
result = parser_util.extract_meta_data_handle_ss_breakdown(arg_configFile, arg_ss_breakdown)

# Assert
assert parser_util.extract_meta_data.call_count == 1
assert parser_util.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert parser_util.len.call_count == 0
assert result == expected_result

def test_parser_util_extract_meta_data_handle_ss_breakdown_returns_call_to_extract_meta_data_file_given_metadata_file_and_csv_set_to_True_with_dict_def_of_subsystem_assigments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_set_to_empty_list_when_len_of_call_value_dict_def_of_subsystem_assigments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_is_0_when_given_ss_breakdown_evaluates_to_False(mocker):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = False if pytest.gen.randint(0, 1) else 0

forced_return_extract_meta_data = {}
forced_return_len = 0
fake_empty_processed_filepath = MagicMock()
forced_return_extract_meta_data['subsystem_assignments'] = fake_empty_processed_filepath

expected_result = []

mocker.patch(parser_util.__name__ + '.extract_meta_data', return_value=forced_return_extract_meta_data)
mocker.patch(parser_util.__name__ + '.len', return_value=forced_return_len)

# Act
result = parser_util.extract_meta_data_handle_ss_breakdown(arg_configFile, arg_ss_breakdown)

# Assert
assert parser_util.extract_meta_data.call_count == 1
assert parser_util.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert parser_util.len.call_count == 1
assert parser_util.len.call_args_list[0].args == (fake_empty_processed_filepath, )
assert result['subsystem_assignments'] == expected_result

def test_parser_util_extract_meta_data_handle_ss_breakdown_returns_call_to_extract_meta_data_given_metadata_file_and_csv_set_to_True_with_dict_def_subsystem_assignments_def_of_call_to_process_filepath_given_configFile_and_kwarg_csv_set_to_True_set_to_single_item_list_str_MISSION_for_each_item_when_given_ss_breakdown_evaluates_to_False(mocker):
# Arrange
arg_configFile = MagicMock()
arg_ss_breakdown = False if pytest.gen.randint(0, 1) else 0

forced_return_extract_meta_data = {}
forced_return_process_filepath = MagicMock()
fake_processed_filepath = []
num_fake_processed_filepaths = pytest.gen.randint(1,10) # arbitrary, from 1 to 10 (0 has own test)
for i in range(num_fake_processed_filepaths):
fake_processed_filepath.append(i)
forced_return_extract_meta_data['subsystem_assignments'] = fake_processed_filepath
forced_return_len = num_fake_processed_filepaths

expected_result = []
for i in range(num_fake_processed_filepaths):
expected_result.append(['MISSION'])

mocker.patch(parser_util.__name__ + '.extract_meta_data', return_value=forced_return_extract_meta_data)
mocker.patch(parser_util.__name__ + '.len', return_value=forced_return_len)

# Act
result = parser_util.extract_meta_data_handle_ss_breakdown(arg_configFile, arg_ss_breakdown)

# Assert
assert parser_util.extract_meta_data.call_count == 1
assert parser_util.extract_meta_data.call_args_list[0].args == (arg_configFile, )
assert parser_util.len.call_count == 1
assert parser_util.len.call_args_list[0].args == (fake_processed_filepath, )
assert result['subsystem_assignments'] == expected_result

# extract_meta_data tests
def test_parser_util_extract_meta_data_raises_error_when_given_blank_meta_data_file():
# Arrange
Expand Down
Loading

0 comments on commit 0208034

Please sign in to comment.