diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index b98b23ce..276d00ec 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -7,20 +7,29 @@ on: jobs: build: runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - - uses: actions/checkout@v3 - - name: Set up Python - uses: actions/setup-python@v4 + - uses: actions/checkout@v4 + - name: Set up OnAIR test environment + uses: conda-incubator/setup-miniconda@v3 with: - python-version: 3.11.2 + activate-environment: onair + environment-file: environment.yml + python-version: ${{ matrix.python-version }} + auto-activate-base: false - name: Install dependencies + shell: bash -l {0} run: | - python -m pip install --upgrade pip - if [ -f requirements_pip.txt ]; then pip install -r requirements_pip.txt; fi + conda info + conda list - name: Test with pytest + shell: bash -l {0} run: python -m coverage run --branch --source=onair,plugins -m pytest ./test/ - name: Coverage report + shell: bash -l {0} run: coverage report --skip-empty - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 diff --git a/Dockerfile b/Dockerfile index b852b779..be69a0ca 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,12 +39,7 @@ RUN \ # OnAIR Dependencies RUN \ - apt-get install -y python3.9 && \ - apt-get install -y python3.9-dev && \ - apt-get install -y python3-pip - -# Add new packages to install here to prevent re-running previous instructions - + apt-get install -y wget # Ensure that all packages are up to date after new packages have been added above RUN \ @@ -56,16 +51,26 @@ RUN \ RUN adduser onair_dev sudo RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers -# Make OnAir requirements file accessible by onair_dev user -COPY requirements_pip.txt /home/onair_dev/requirements_onair.txt -RUN chown onair_dev /home/onair_dev/requirements_onair.txt - USER onair_dev -# Python stuff is being installed for the local user -ENV PATH="${PATH}:/home/onair_dev/.local/bin" +# Install miniconda +ENV CONDA_DIR /home/onair_dev/conda +RUN \ + mkdir -p $CONDA_DIR && \ + wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda.sh && \ + bash ~/miniconda.sh -b -u -p $CONDA_DIR && \ + rm -rf ~/miniconda.sh +ENV PATH=$CONDA_DIR/bin:$PATH -# Install OnAIR deps -RUN python3.9 -m pip install --upgrade pip setuptools wheel -RUN python3.9 -m pip install --user -r /home/onair_dev/requirements_onair.txt +# Make OnAir requirements file accessible by onair_dev user +COPY environment.yml /home/onair_dev/environment.yml +RUN \ + . $CONDA_DIR/etc/profile.d/conda.sh && \ + conda init bash && \ + . ~/.bashrc && \ + conda env create -f /home/onair_dev/environment.yml && \ + conda activate onair +# Make sure that the onair conda environment is loaded +RUN \ + echo "conda activate onair" >> ~/.bashrc diff --git a/README.md b/README.md index e8c13dbe..6703437f 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,111 @@ It is intended to explore research concepts in autonomous operations in a simula Create a conda environment with the necessary packages - conda create --name onair --file requirements_pip.txt + conda env create -f environment.yml + +## Redis example + +Using the redis_adapter.py as the DataSource, telemetry can be received through multiple Redis channels and inserted into the full data frame. + +### OnAIR config file (.ini) + +The redis_example.ini uses a very basic setup: + - meta : redis_example_CONFIG.json + - parser : onair/data_handling/redis_adapter.py + - plugins : one of each type, all of them 'generic' that do nothing + +### Telemetry config file (.json) + +The telemetry file defines the subscribed channels, data frame and subsystems. + - subscriptions : defines channel names where telemetry will be received + - order : designation of where each 'channel.telemetry_item' is to be put in full data frame (the data header) + - subsystems : data for each specific telemetry item (descriptions only for redis example) + + +### Receipt of telemetry + +The Redis adapter expects any published telemetry on a channel to include: + - time + - every telemetry_item as described under "order" as 'channel.telemetry_item' + +All messages sent must be json format (key to value) and will warn when it is not then discard the message (outputting what was received first). Keys should match the required telemetry_item names with the addition of "time." Values should be floats. + +### Running the example + +If not already running, start a Redis server on 'localhost', port:6379 (typical defaults) +``` +redis-server +``` + +Start up OnAIR with the redis_example.ini file: +``` +python driver.py onair/config/example.ini +``` +You should see: +``` +Redis Adapter ignoring file + +---- Redis adapter connecting to server... + +---- ... connected! + +---- Subscribing to channel: state_0 + +---- Subscribing to channel: state_1 + +---- Subscribing to channel: state_2 + +---- Redis adapter: channel 'state_0' received message type: subscribe. + +---- Redis adapter: channel 'state_1' received message type: subscribe. + +---- Redis adapter: channel 'state_2' received message type: subscribe. + +*************************************************** +************ SIMULATION STARTED ************ +*************************************************** +``` + +In another process run the experimental publisher: +``` +python redis-experiment-publisher.py +``` +This will send telemetry every 2 seconds, one channel at random until all 3 channels have recieved data then repeat for a total of 9 times (all of which can be changed in the file). Its output should be similar to this: +``` +Published data to state_0, [0, 0.1, 0.2] +Published data to state_1, [1, 1.1, 1.2] +Published data to state_2, [2, 2.1, 2.2] +Completed 1 loops +Published data to state_2, [3, 3.1, 3.2] +Published data to state_1, [4, 4.1, 4.2] +``` +And OnAir should begin receiving data similarly to this: +``` +--------------------- STEP 1 --------------------- + +CURRENT DATA: [0, 0.1, 0.2, '-', '-', '-', '-'] +INTERPRETED SYSTEM STATUS: --- + +--------------------- STEP 2 --------------------- + +CURRENT DATA: [1, 0.1, 0.2, 1.1, 1.2, '-', '-'] +INTERPRETED SYSTEM STATUS: --- + +--------------------- STEP 3 --------------------- + +CURRENT DATA: [2, 0.1, 0.2, 1.1, 1.2, 2.1, 2.2] +INTERPRETED SYSTEM STATUS: --- + +--------------------- STEP 4 --------------------- + +CURRENT DATA: [3, 0.1, 0.2, 1.1, 1.2, 3.1, 3.2] +INTERPRETED SYSTEM STATUS: --- + +--------------------- STEP 5 --------------------- + +CURRENT DATA: [4, 0.1, 0.2, 4.1, 4.2, 3.1, 3.2] +INTERPRETED SYSTEM STATUS: --- +``` ## Running unit tests @@ -36,12 +140,12 @@ python driver.py -t #### A few optional settings for the driver.py file Options that may be added to the driver.py test run. Use these at your own discretion. -`--conftest-seed=###` - set the random values seed for this run -`--randomly-seed=###` - set the random order seed for this run -`--verbose` or `-v` - set verbosity level, also -vv, -vvv, etc. -`-k KEYWORD` - only run tests that match the KEYWORD (see `pytest --help`) +`--conftest-seed=###` - set the random values seed for this run +`--randomly-seed=###` - set the random order seed for this run +`--verbose` or `-v` - set verbosity level, also -vv, -vvv, etc. +`-k KEYWORD` - only run tests that match the KEYWORD (see `pytest --help`) -NOTE: Running tests will output results using provided seeds, but each seed is random when not set directly. +NOTE: Running tests will output results using provided seeds, but each seed is random when not set directly. Example start of test output: ``` Using --conftest-seed=1691289424 @@ -60,33 +164,33 @@ python -m coverage run --branch --source=onair,plugins -m pytest ./test/ #### Command breakdown: -`python -m` - invokes the python runtime on the library following the -m -`coverage run` - runs coverage data collection during testing, wrapping itself on the test runner used -`--branch` - includes code branching information in the coverage report -`--source=onair,plugins` - tells coverage where the code under test exists for reporting line hits -`-m pytest` - tells coverage what test runner (framework) to wrap -`./test` - run all tests found in this directory and subdirectories +`python -m` - invokes the python runtime on the library following the -m +`coverage run` - runs coverage data collection during testing, wrapping itself on the test runner used +`--branch` - includes code branching information in the coverage report +`--source=onair,plugins` - tells coverage where the code under test exists for reporting line hits +`-m pytest` - tells coverage what test runner (framework) to wrap +`./test` - run all tests found in this directory and subdirectories #### A few optional settings for the command line Options that may be added to the command line test run. Use these at your own discretion. -`--disable-warnings` - removes the warning reports, but displays count (i.e., 124 passed, 1 warning in 0.65s) +`--disable-warnings` - removes the warning reports, but displays count (i.e., 124 passed, 1 warning in 0.65s) `-p no:randomly` - ONLY required to stop random order testing IFF pytest-randomly installed -`--conftest-seed=###` - set the random values seed for this run -`--randomly-seed=###` - set the random order seed for this run -`--verbose` or `-v` - set verbosity level, also -vv, -vvv, etc. -`-k KEYWORD` - only run tests that match the KEYWORD (see `pytest --help`) +`--conftest-seed=###` - set the random values seed for this run +`--randomly-seed=###` - set the random order seed for this run +`--verbose` or `-v` - set verbosity level, also -vv, -vvv, etc. +`-k KEYWORD` - only run tests that match the KEYWORD (see `pytest --help`) NOTE: see note about seeds in driver.py section above ### To view testing line coverage after run: NOTE: you may or may not need the `python -m` to run coverage report or html -`coverage report` - prints basic results in terminal +`coverage report` - prints basic results in terminal or `coverage html` - creates htmlcov/index.html, automatic when using driver.py for testing -then +then ` htmlcov/index.html` - browsable coverage (i.e., `firefox htmlcov/index.html`) ## License and Copyright diff --git a/environment.yml b/environment.yml new file mode 100644 index 00000000..cc70880e --- /dev/null +++ b/environment.yml @@ -0,0 +1,14 @@ +name: onair +channels: + - default + - conda-forge +dependencies: + - python>=3.8,<3.13 + - numpy + - coverage + - pytest + - pytest-mock + - pytest-randomly + - pip + - pip: + - redis diff --git a/onair/config/redis_example.ini b/onair/config/redis_example.ini index d7e95f3c..b0a599b0 100644 --- a/onair/config/redis_example.ini +++ b/onair/config/redis_example.ini @@ -2,16 +2,14 @@ TelemetryFilePath = onair/data/raw_telemetry_data/data_physics_generation/Errors TelemetryFile = 700_crash_to_earth_1.csv MetaFilePath = onair/data/telemetry_configs/ -MetaFile = data_physics_generation_CONFIG.json - -[DATA_HANDLING] -DataSourceFile = onair/data_handling/redis_adapter.py +MetaFile = redis_example_CONFIG.json +ParserFileName = onair/data_handling/redis_adapter.py [PLUGINS] -KnowledgeRepPluginDict = {'generic':'plugins/generic/__init__.py'} -LearnersPluginDict = {'generic':'plugins/generic/__init__.py'} -PlannersPluginDict = {'generic':'plugins/generic/__init__.py'} -ComplexPluginDict = {'generic':'plugins/generic/__init__.py'} +KnowledgeRepPluginDict = {'knowledge':'plugins/generic/__init__.py'} +LearnersPluginDict = {'learner':'plugins/generic/__init__.py'} +PlannersPluginDict = {'planner':'plugins/generic/__init__.py'} +ComplexPluginDict = {'complex':'plugins/generic/__init__.py'} [OPTIONS] IO_Enabled = true diff --git a/onair/data/telemetry_configs/redis_example_CONFIG.json b/onair/data/telemetry_configs/redis_example_CONFIG.json new file mode 100644 index 00000000..1813570c --- /dev/null +++ b/onair/data/telemetry_configs/redis_example_CONFIG.json @@ -0,0 +1,41 @@ +{ + "subsystems": { + "STATES": { + "time": { + "description": "Time of latest receipt of values" + }, + "state_0.x": { + "description": "Vehicle 0's current state of x" + }, + "state_0.y": { + "description": "Vehicle 0's current state of y" + }, + "state_1.x": { + "description": "Vehicle 1's current state of x" + }, + "state_1.y": { + "description": "Vehicle 1's current state of y" + }, + "state_2.x": { + "description": "Vehicle 2's current state of x" + }, + "state_2.y": { + "description": "Vehicle 2's current state of y" + } + } + }, + "redis_subscriptions": [ + "state_0", + "state_1", + "state_2" + ], + "order": [ + "time", + "state_0.x", + "state_0.y", + "state_1.x", + "state_1.y", + "state_2.x", + "state_2.y" + ] +} diff --git a/onair/data_handling/on_air_data_source.py b/onair/data_handling/on_air_data_source.py index 80bdc4b5..edd60066 100644 --- a/onair/data_handling/on_air_data_source.py +++ b/onair/data_handling/on_air_data_source.py @@ -8,7 +8,10 @@ # See "NOSA GSC-19165-1 OnAIR.pdf" from abc import ABC, abstractmethod -from .parser_util import * +from .parser_util import * + +class ConfigKeyError(KeyError): + pass class OnAirDataSource(ABC): def __init__(self, data_file, meta_file, ss_breakdown = False): diff --git a/onair/data_handling/redis_adapter.py b/onair/data_handling/redis_adapter.py index d469f71e..ba346004 100644 --- a/onair/data_handling/redis_adapter.py +++ b/onair/data_handling/redis_adapter.py @@ -1,8 +1,9 @@ # GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" # -# Copyright © 2023 United States Government as represented by the Administrator of -# the National Aeronautics and Space Administration. No copyright is claimed in the -# United States under Title 17, U.S. Code. All Other Rights Reserved. +# Copyright © 2023 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# No copyright is claimed in the United States under Title 17, U.S. Code. +# All Other Rights Reserved. # # Licensed under the NASA Open Source Agreement version 1.3 # See "NOSA GSC-19165-1 OnAIR.pdf" @@ -19,6 +20,7 @@ import json from onair.data_handling.on_air_data_source import OnAirDataSource +from onair.data_handling.on_air_data_source import ConfigKeyError from onair.data_handling.tlm_json_parser import parseJson from onair.src.util.print_io import * from onair.data_handling.parser_util import * @@ -30,12 +32,14 @@ def __init__(self, data_file, meta_file, ss_breakdown = False): self.address = 'localhost' self.port = 6379 self.db = 0 - self.server = None + self.server = None self.new_data_lock = threading.Lock() self.new_data = False self.currentData = [] - self.currentData.append({'headers':None, 'data':None}) - self.currentData.append({'headers':None, 'data':None}) + self.currentData.append({'headers':self.order, + 'data':list('-' * len(self.order))}) + self.currentData.append({'headers':self.order, + 'data':list('-' * len(self.order))}) self.double_buffer_read_index = 0 self.connect() self.subscribe(self.subscriptions) @@ -63,8 +67,17 @@ def subscribe(self, subscriptions): print_msg(f"No subscriptions given!") def parse_meta_data_file(self, meta_data_file, ss_breakdown): - configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown) + configs = extract_meta_data_handle_ss_breakdown( + meta_data_file, ss_breakdown) meta = parseJson(meta_data_file) + keys = meta.keys() + + if 'order' in keys: + self.order = meta['order'] + else: + raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \ + 'missing required key \'order\'') + if 'redis_subscriptions' in meta.keys(): self.subscriptions = meta['redis_subscriptions'] else: @@ -85,14 +98,15 @@ def get_next(self): while not data_available: with self.new_data_lock: data_available = self.has_data() - + if not data_available: time.sleep(0.01) read_index = 0 with self.new_data_lock: self.new_data = False - self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2 + self.double_buffer_read_index = ( + self.double_buffer_read_index + 1) % 2 read_index = self.double_buffer_read_index return self.currentData[read_index]['data'] @@ -102,18 +116,61 @@ def has_more(self): return True def message_listener(self): - """Loop for listening for messages on channel""" + """Loop for listening for messages on channels""" for message in self.pubsub.listen(): if message['type'] == 'message': - data = json.loads(message['data']) - - currentData = self.currentData[(self.double_buffer_read_index + 1) %2] - currentData['headers'] = list(data.keys()) - currentData['data'] = list(data.values()) - + channel_name = f"{message['channel'].decode()}" + # Attempt to load message as json + try: + data = json.loads(message['data']) + except ValueError: + # Warn of non-json conforming channel data received + non_json_msg = f'Subscribed channel `{channel_name}\' ' \ + 'message received but is not in json ' \ + f'format.\nMessage:\n{message["data"]}' + print_msg(non_json_msg, ['WARNING']) + continue + # Select the current data + currentData = self.currentData[ + (self.double_buffer_read_index + 1) % 2] + # turn all data points to unknown + currentData['data'] = ['-' for _ in currentData['data']] + # Find expected keys for received channel + expected_message_keys = \ + [k for k in currentData['headers'] if channel_name in k] + # Time is an expected key for all channels + expected_message_keys.append("time") + # Parse through the message keys for data points + for key in list(data.keys()): + if key.lower() == 'time': + header_string = key.lower() + else: + header_string = f"{channel_name}.{key}" + # Look for channel specific values + try: + index = currentData['headers'].index(header_string) + currentData['data'][index] = data[key] + expected_message_keys.remove(header_string) + # Unexpected key in data + except ValueError: + # warn user about key in data that is not in header + print_msg(f"Unused key `{key}' in message " \ + f'from channel `{channel_name}.\'', + ['WARNING']) with self.new_data_lock: self.new_data = True + # Warn user about expected keys missing from received data + for k in expected_message_keys: + print_msg(f'Message from channel `{channel_name}\' ' \ + f'did not contain `{k}\' key\nMessage:\n' \ + f'{data}', ['WARNING']) + else: + # Warn user about non message receipts + print_msg(f"Redis adapter: channel " \ + f"'{message['channel'].decode()}' received " \ + f"message type: {message['type']}.", ['WARNING']) + # When listener loop exits warn user + print_msg("Redis subscription listener exited.", ['WARNING']) def has_data(self): return self.new_data - \ No newline at end of file diff --git a/redis-experiment-publisher.py b/redis-experiment-publisher.py new file mode 100644 index 00000000..b5b6b53e --- /dev/null +++ b/redis-experiment-publisher.py @@ -0,0 +1,38 @@ +import redis +import time +import random + +# Initialize the Redis connection +redis_host = "localhost" +redis_port = 6379 +# When your Redis server requires a password, fill it in here +redis_password = "" +# Connect to Redis +r = redis.Redis(host=redis_host, + port=redis_port, + password=redis_password, + decode_responses=True) +# List of channel names +channels = ['state_0', 'state_1', 'state_2'] +# Publish messages on each channel in random order +def publish_messages(): + loop_count = 0 + inner_loop_count = 0 + max_loops = 9 + while loop_count < max_loops: + random.shuffle(channels) + for channel in channels: + r.publish(channel, f'{{"time":{inner_loop_count}, ' \ + f'"x":{inner_loop_count+0.1}, ' \ + f'"y":{inner_loop_count+0.2}}}') + print(f"Published data to {channel}, " \ + f"[{inner_loop_count}, " \ + f"{inner_loop_count+0.1}, " \ + f"{inner_loop_count+0.2}]") + inner_loop_count += 1 + time.sleep(2) + loop_count += 1 + print(f"Completed {loop_count} loops") + +if __name__ == "__main__": + publish_messages() diff --git a/requirements_pip.txt b/requirements_pip.txt deleted file mode 100644 index cdfa7fd0..00000000 --- a/requirements_pip.txt +++ /dev/null @@ -1,6 +0,0 @@ -coverage==6.5.0 -numpy==1.23.4 -pytest==7.2.0 -pytest-mock==3.10.0 -pytest-randomly==3.12.0 -redis==4.6.0 diff --git a/test/onair/data_handling/test_redis_adapter.py b/test/onair/data_handling/test_redis_adapter.py index 38bc6928..777dfbb3 100644 --- a/test/onair/data_handling/test_redis_adapter.py +++ b/test/onair/data_handling/test_redis_adapter.py @@ -12,6 +12,7 @@ import onair.data_handling.redis_adapter as redis_adapter from onair.data_handling.redis_adapter import DataSource from onair.data_handling.on_air_data_source import OnAirDataSource +from onair.data_handling.on_air_data_source import ConfigKeyError import redis import threading @@ -33,6 +34,10 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub cut = DataSource.__new__(DataSource) cut.subscriptions = expected_subscriptions + fake_order = MagicMock() + fake_order.__len__.return_value = \ + pytest.gen.randint(1, 10) # from 1 to 10 arbitrary + cut.order = fake_order mocker.patch.object(OnAirDataSource, '__init__', new=MagicMock()) mocker.patch('threading.Lock', return_value=fake_new_data_lock) @@ -51,7 +56,10 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub assert cut.server == expected_server assert cut.new_data_lock == fake_new_data_lock assert cut.new_data == False - assert cut.currentData == [{'headers':None, 'data':None}, {'headers':None, 'data':None}] + assert cut.currentData == [{'headers':fake_order, + 'data':list('-' * len(fake_order))}, + {'headers':fake_order, + 'data':list('-' * len(fake_order))}] assert cut.double_buffer_read_index == 0 assert cut.connect.call_count == 1 assert cut.connect.call_args_list[0].args == () @@ -206,7 +214,6 @@ def test_redis_adapter_DataSource_subscribe_states_no_subscriptions_given_when_s assert redis_adapter.print_msg.call_args_list[0].args == ("No subscriptions given!",) # get_next tests - def test_redis_adapter_DataSource_get_next_returns_expected_data_when_new_data_is_true_and_double_buffer_read_index_is_0(): # Arrange # Renew DataSource to ensure test independence @@ -324,61 +331,237 @@ def test_redis_adapter_DataSource_has_more_always_returns_True(): assert cut.has_more() == True # message_listener tests -def test_redis_adapter_DataSource_message_listener_does_not_load_json_when_receive_type_is_not_message(mocker): +def test_redis_adapter_DataSource_message_listener_warns_of_exit_and_does_not_run_for_loop_when_listen_returns_StopIteration(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock(name="cut.pubsub") + fake_listener = MagicMock(name='fake_listener') + fake_listener.__next__.side_effect = StopIteration + mocker.patch.object(cut.pubsub, 'listen', side_effect=[fake_listener]) + mocker.patch(redis_adapter.__name__ + '.json.loads') + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 0 + assert redis_adapter.print_msg.call_count == 1 + assert redis_adapter.print_msg.call_args_list[0].args == ("Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_prints_warning_when_receiving_non_message_type(mocker): # Arrange cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock() ignored_message_types = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe', 'pmessage'] fake_message = {} fake_message['type'] = pytest.gen.choice(ignored_message_types) - - cut.pubsub = MagicMock() + fake_message['channel'] = str(MagicMock(name='fake_message')).encode('utf-8') mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads') + mocker.patch(redis_adapter.__name__ + '.print_msg') # Act cut.message_listener() # Assert assert redis_adapter.json.loads.call_count == 0 + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f"Redis adapter: channel '{fake_message['channel'].decode()}' received " \ + f"message type: {fake_message['type']}.", ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_prints_warning_when_data_not_json_format_and_does_not_update_frame(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock() + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = str(MagicMock(name='fake_message_data')) + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', side_effect=ValueError) + mocker.patch(redis_adapter.__name__ + '.print_msg') -def test_redis_adapter_DataSource_message_listener_loads_message_info_when_receive_type_is_message(mocker): + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Subscribed channel `{fake_message["channel"].decode()}\' message ' \ + 'received but is not in json format.\nMessage:\n' \ + f'{fake_message["data"]}', ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_user_when_processed_data_did_not_contain_time(mocker): # Arrange cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.currentData = {0: {'headers': [], 'data': []}, + 1: {'headers': [], 'data': []}} + cut.pubsub = MagicMock() cut.new_data_lock = MagicMock() - cut.new_data = None - cut.double_buffer_read_index = pytest.gen.randint(0,1) - cut.currentData = [{}, {}] + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = '{}' # empty_message + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `time\' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_of_received_key_that_does_not_exist_in_header(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.currentData = {0: {'headers': ['time'], + 'data': ['-']}, + 1: {'headers': ['time'], + 'data': ['-']}} + cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = '{"time":0, "unknown_key":0}' + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={"time":0, "unknown_key":0}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f"Unused key `unknown_key' in message " \ + f'from channel `{fake_message["channel"].decode()}.\'', ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_of_expected_keys_that_do_not_appear_in_message(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + cut.currentData = {0: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.missing_key'], + 'data': ['-', '-']}, + 1: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.missing_key'], + 'data': ['-', '-']}} + fake_message['data'] = '{}' + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 3 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `{fake_message["channel"].decode()}.missing_key\'' \ + f' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `time\' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[2].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_updates_new_data_with_received_data_by_channel_and_key_matched_to_frame_header(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False fake_message = {} - fake_message_data = {} fake_message['type'] = 'message' - fake_message['data'] = fake_message_data - fake_data = {} - - expected_index = (cut.double_buffer_read_index + 1) % 2 - expected_data_headers = [] - expected_data_values = [] - - num_fake_data = pytest.gen.randint(1,10) - for i in range(num_fake_data): - fake_data_header = str(i) - fake_data_value = MagicMock() - fake_data[fake_data_header] = fake_data_value - expected_data_headers.append(fake_data_header) - expected_data_values.append(fake_data_value) + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + cut.currentData = {0: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.correct_key', 'fakeotherchannel.x'], + 'data': ['-', '-', '0']}, + 1: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.correct_key', 'fakeotherchannel.x'], + 'data': ['-', '-', '0']}} + fake_message['data'] = '{}' mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) - mocker.patch(redis_adapter.__name__ + '.json.loads', return_value=fake_data) + fake_data = { + 'time': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary + 'correct_key': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary + } + mocker.patch(redis_adapter.__name__ + '.json.loads', + return_value=fake_data) + mocker.patch(redis_adapter.__name__ + '.print_msg') # Act cut.message_listener() # Assert assert redis_adapter.json.loads.call_count == 1 - assert redis_adapter.json.loads.call_args_list[0].args == (fake_message_data,) - assert cut.currentData[expected_index]['headers'] == expected_data_headers - assert cut.currentData[expected_index]['data'] == expected_data_values + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) assert cut.new_data == True + print(cut.currentData[cut.double_buffer_read_index]) + assert cut.currentData[(cut.double_buffer_read_index + 1) % 2]['data'] == \ + [fake_data['time'], fake_data['correct_key'], '-'] + assert redis_adapter.print_msg.call_count == 1 + assert redis_adapter.print_msg.call_args_list[0].args == ( + "Redis subscription listener exited.", ['WARNING']) # has_data tests def test_redis_adapter_DataSource_has_data_returns_instance_new_data(): @@ -391,6 +574,34 @@ def test_redis_adapter_DataSource_has_data_returns_instance_new_data(): assert result == expected_result # redis_adapter parse_meta_data tests +def test_redis_adapter_DataSource_parse_meta_data_file_raises_ConfigKeyError_when_order_is_not_in_config_file(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + arg_configFile = MagicMock() + arg_ss_breakdown = MagicMock() + + expected_extracted_configs = MagicMock() + expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary + fake_meta = {'fake_other_stuff': MagicMock(), + 'redis_subscriptions':expected_subscriptions} + + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) + + exception_message = (f'Config file: \'{arg_configFile}\' ' \ + 'missing required key \'order\'') + + # Act + with pytest.raises(ConfigKeyError) as e_info: + cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) + + # Assert + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert e_info.match(exception_message) + def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_when_redis_subscriptions_occupied(mocker): # Arrange cut = DataSource.__new__(DataSource) @@ -399,8 +610,9 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m expected_extracted_configs = MagicMock() expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary - fake_meta = {'fake_other_stuff': MagicMock(), 'redis_subscriptions':expected_subscriptions} - expected_result_configs = {'redis_subscriptions':expected_subscriptions} + fake_meta = {'fake_other_stuff': MagicMock(), + 'order': MagicMock(), + 'redis_subscriptions':expected_subscriptions} mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) @@ -423,8 +635,7 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m arg_ss_breakdown = MagicMock() fake_configs = {'fake_other_stuff': MagicMock()} - expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary - fake_meta = {} + fake_meta = {'order': MagicMock()} mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=fake_configs) mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta)