From 5c298f79d9f2017d8089e4b14d9a6089ba36e5e3 Mon Sep 17 00:00:00 2001 From: Madhukar Reddy Kadireddy Date: Thu, 13 Jun 2024 18:54:31 +0530 Subject: [PATCH] [ROS][Test Infra] Support testing ROS2 service call device_info --- realsense2_camera/test/README.md | 17 +++-- .../live_camera/test_camera_aligned_tests.py | 4 +- .../test_camera_all_profile_tests.py | 2 +- .../test/live_camera/test_camera_fps_tests.py | 5 +- .../test/live_camera/test_camera_imu_tests.py | 2 +- .../test_camera_point_cloud_tests.py | 2 +- .../live_camera/test_camera_service_call.py | 73 +++++++++++++++++++ .../test/live_camera/test_camera_tf_tests.py | 4 +- .../test/live_camera/test_d415_basic_tests.py | 2 +- .../test/live_camera/test_d455_basic_tests.py | 4 +- .../test/utils/pytest_rs_utils.py | 41 +++++++++-- 11 files changed, 129 insertions(+), 27 deletions(-) create mode 100644 realsense2_camera/test/live_camera/test_camera_service_call.py diff --git a/realsense2_camera/test/README.md b/realsense2_camera/test/README.md index 15b882a2f8..4e59fcb620 100644 --- a/realsense2_camera/test/README.md +++ b/realsense2_camera/test/README.md @@ -20,7 +20,7 @@ It is recommended to use the test folder itself for storing all the cpp tests. H ## Test using pytest The default folder for the test py files is realsense2_camera/test. Two test template files test_launch_template.py and test_integration_template.py are available in the same folder for reference. ### Add a new test -To add a new test, the user can create a copy of the test_launch_template.py or test_integration_template.py and start from there. Please name the file in the format test_`testname`.py so that the CMake detects the file and add as a new test. Please be aware that, multiple tests can be added into one file, the test_integration_template.py itself has more than one test.i The marker `@pytest.mark.launch` is used to specify the test entry point. +To add a new test, the user can create a copy of the test_launch_template.py or test_integration_template.py and start from there. Please name the file in the format test_`testname`.py so that the CMake detects the file and add as a new test. Please be aware that, multiple tests can be added into one file, the test_integration_template.py itself has more than one test. The marker `@pytest.mark.launch` is used to specify the test entry point. The test_launch_template.py uses the rs_launch.py to start the camera node, so this template can be used for testing the rs_launch.py together with the rs node. @@ -28,7 +28,7 @@ The test_integration_template.py gives a better control for testing, it uses few The test_integration_template.py has two types of tests, one has a function "test_using_function". If the user wants to have a better control over the launch context for any specific test scenarios, this can be used. Both the function based test and class based tests use a default launch configuration from the utils. It's recommended to modify the camera name to a unique one in the parameters itself so that there are not clashes between tests. -It is expected that the class based test is used as the test format for most of the usecases. The class based test inherits from pytest_rs_utils.RsTestBaseClass and it has three steps, namely: init, run_test and process_data. Unless for the basic tests, the user will have to override the process_data function and check if the data received from the topics are as expected. Also, if the user doesn't want the base class to modify the data, use 'store_raw_data':True in the theme definition. Please see the test_integration_template.py for reference. +It is expected that the class based test is used as the test format for most of the usecases. The class based test inherits from `pytest_rs_utils.RsTestBaseClass` and it has three steps, namely: init, run_test and process_data. Unless for the basic tests, the user will have to override the process_data function and check if the data received from the topics are as expected. Also, if the user doesn't want the base class to modify the data, use 'store_raw_data':True in the theme definition. Please see the test_integration_template.py for reference. An assert command can be used to indicate if the test failed or passed. Please see the template for more info. @@ -47,11 +47,11 @@ new_folder_for_pytest #<-- new folder #but please be aware that the utils functi ``` ### Grouping of tests -The pytests can be grouped using markers. These markers can be used to run a group of tests. However, "colcon test" command doesn't pass a custom marker using (--pytest-args -m `marker_name`) to the pytest internally. This is because, the ament_cmake that works as a bridge between colcon and pytest doesn't pass the pytest arguments to pytest. So till this is fixed, pytest command has to be used directly for running a group of tests. Please see the next session for the commands to run a group py tests. +The pytests can be grouped using markers. These markers can be used to run a group of tests. However, "colcon test" command doesn't pass a custom marker using (--pytest-args -m `marker_name`) to the pytest internally. This is because, the ament_cmake that works as a bridge between colcon and pytest doesn't pass the pytest arguments to pytest. So until this is fixed, pytest command has to be used directly for running a group of tests. Please see the next session for the commands to run a group of py tests. The grouping is specified by adding a marker just before the test declaration. In the test_integration_template.py `rosbag` is specified as a marker specify tests that use rosbag file. This is achieved by adding "@pytest.mark.rosbag" to the begining of the test. So when the pytest parses for test, it detects the marker for the test. If this marker is selected or none of the markers are specified, the test will be added to the list, else will be listed as a deselected test. -It is recommended to use markers such as ds457, rosbag, ds415 etc to differentiate the tests so that it's easier to run a group of tests in a machine that has the required hardware. +It is recommended to use markers such as d457, rosbag, or d435i etc to differentiate the tests so that it's easier to run a group of tests in a machine that has the required hardware. ## Building and running tests @@ -134,7 +134,7 @@ Then, the path to execute the tests would be ~/ros2_ws/src/realsense-ros. 2. Please setup below required environment variables. If not, Please prefix them for every test execution. - PYTHONPATH=$PYTHONPATH:$PWD/realsense2_camera/test/utils:$PWD/realsense2_camera//launch:$PWD/realsense2_camera//scripts + export PYTHONPATH=$PYTHONPATH:$PWD/realsense2_camera/test/utils:$PWD/realsense2_camera//launch:$PWD/realsense2_camera//scripts User can run all the tests in a pytest file directly using the below command: @@ -169,7 +169,7 @@ If a user wants to add a test to this conditional skip, user can add by adding a ## Points to be noted while writing pytests The tests that are in one file are normally run in parallel, there could also be changes in the pytest plugin. So if there are multiple tests in one file, the system capacity can influence the test execution. It's recomended to have 3-4 tests in file, more than that can affect the test results due to delays. ### Passing/changing parameters -The parameters passed while creating the node can be initialized individually for each test, please see the test_parameterized_template example for reference. The default values are taken from rs_launch.py and the passed parameters are used for overriding the default values. The parameters that can be dynamically modified can be changed using the param interface provided. However, the function create_param_ifs has to be called to create this interface. Please see the test_d455_basic_tests.py for reference. There are specific functions to change the string, integer and bool parameters, the utils can be extended if any more types are needed. +The parameters passed while creating the node can be initialized individually for each test, please see the test_parameterized_template example for reference. The default values are taken from rs_launch.py and the passed parameters are used for overriding the default values. The parameters that can be dynamically modified can be changed using the param interface provided. However, the function `create_service_client_ifs()` has to be called to create this interface. Please see the test_d455_basic_tests.py for reference. There are specific functions to change the string, integer and bool parameters, the utils can be extended if any more types are needed. ### Difference in setting the bool parameters There is a difference between setting the default values of bool parameters and setting them dynamically. The bool test params are assinged withn quotes as below. @@ -183,4 +183,7 @@ The bool test params are assinged withn quotes as below. However the function that implements the setting of bool parameter dynamically takes the python bool datatype. For example: self.set_bool_param('enable_accel', False) - +### Adding 'service call' client interface +1. Create a client for respective service call in method `create_service_client_ifs()`, refer client creation for service call `device_info` with service type `DeviceInfo`. +2. Create a `get_` or `set_` method for each service call, refer `get_deviceinfo` in pytest_rs_utils.py +3. Use the `get_`, `set_` service calls as used in test_camera_service_call.py diff --git a/realsense2_camera/test/live_camera/test_camera_aligned_tests.py b/realsense2_camera/test/live_camera/test_camera_aligned_tests.py index 2d6a9839db..27fc224aa4 100644 --- a/realsense2_camera/test/live_camera/test_camera_aligned_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_aligned_tests.py @@ -108,7 +108,7 @@ def test_camera_align_depth_color(self,launch_descr_with_parameters): print("Starting camera test...") self.init_test("RsTest"+params['camera_name']) self.wait_for_node(params['camera_name']) - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) ret = self.run_test(themes) assert ret[0], ret[1] assert self.process_data(themes) @@ -211,7 +211,7 @@ def test_camera_all_align_depth_color(self,launch_descr_with_parameters): if cap == None: debug_print("Device not found? : " + params['device_type']) return - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) color_profiles = set([i[1] for i in cap["color_profile"] if i[2] == "RGB8"]) depth_profiles = set([i[1] for i in cap["depth_profile"] if i[0] == "Depth"]) for color_profile in color_profiles: diff --git a/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py b/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py index ab66cf26be..27cc4a6d32 100644 --- a/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py @@ -170,7 +170,7 @@ def test_LiveCamera_Change_Resolution(self,launch_descr_with_parameters): if cap == None: debug_print("Device not found? : " + params['device_type']) return - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) config["Color"]["default_profile1"],config["Color"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["color_profile"], "Color") config["Depth"]["default_profile1"],config["Depth"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Depth") config["Infrared"]["default_profile1"],config["Infrared"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Infrared") diff --git a/realsense2_camera/test/live_camera/test_camera_fps_tests.py b/realsense2_camera/test/live_camera/test_camera_fps_tests.py index eb66b71a51..467a98e0f1 100644 --- a/realsense2_camera/test/live_camera/test_camera_fps_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_fps_tests.py @@ -83,9 +83,8 @@ def test_camera_test_fps(self,launch_descr_with_parameters): print("Starting camera test...") self.init_test("RsTest"+params['camera_name']) self.wait_for_node(params['camera_name']) - self.create_param_ifs(get_node_heirarchy(params)) - #assert self.set_bool_param('enable_color', False) - + self.create_service_client_ifs(get_node_heirarchy(params)) + themes = [ {'topic':get_node_heirarchy(params)+'/depth/image_rect_raw', 'msg_type':msg_Image, diff --git a/realsense2_camera/test/live_camera/test_camera_imu_tests.py b/realsense2_camera/test/live_camera/test_camera_imu_tests.py index cad8e8033e..0dc16f0cf7 100644 --- a/realsense2_camera/test/live_camera/test_camera_imu_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_imu_tests.py @@ -70,7 +70,7 @@ def test_LiveCamera_check_motion_sensor(self,launch_descr_with_parameters): try: #initialize self.init_test("RsTest"+params['camera_name']) - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) #run with default params and check the data diff --git a/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py b/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py index 3f012f133b..85fc84121c 100644 --- a/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py @@ -100,7 +100,7 @@ def test_camera_test_point_cloud(self,launch_descr_with_parameters): ''' self.init_test("RsTest"+self.params['camera_name']) self.wait_for_node(self.params['camera_name']) - self.create_param_ifs(get_node_heirarchy(self.params)) + self.create_service_client_ifs(get_node_heirarchy(self.params)) ret = self.run_test(themes, timeout=10) assert ret[0], ret[1] ret = self.process_data(themes, False) diff --git a/realsense2_camera/test/live_camera/test_camera_service_call.py b/realsense2_camera/test/live_camera/test_camera_service_call.py new file mode 100644 index 0000000000..9b195064f1 --- /dev/null +++ b/realsense2_camera/test/live_camera/test_camera_service_call.py @@ -0,0 +1,73 @@ +# Copyright 2024 Intel Corporation. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os, sys +import pytest + +sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils")) +import pytest_rs_utils +import pytest_live_camera_utils +from pytest_rs_utils import launch_descr_with_parameters +from pytest_rs_utils import get_node_heirarchy + +test_params_test_srv_d455 = { + 'camera_name': 'D455', + 'device_type': 'D455', + } +test_params_test_srv_d435i = { + 'camera_name': 'D435i', + 'device_type': 'D435i', + } +test_params_test_srv_d415 = { + 'camera_name': 'D415', + 'device_type': 'D415', + } + +''' +The test checks service call device_info with type DeviceInfo +device info includes - device name, FW version, serial number, etc +''' +@pytest.mark.parametrize("launch_descr_with_parameters", [ + pytest.param(test_params_test_srv_d455, marks=pytest.mark.d455), + pytest.param(test_params_test_srv_d435i, marks=pytest.mark.d435i), + pytest.param(test_params_test_srv_d415, marks=pytest.mark.d415), + ],indirect=True) +@pytest.mark.launch(fixture=launch_descr_with_parameters) +class TestCamera_ServiceCall_DeviceInfo(pytest_rs_utils.RsTestBaseClass): + def test_camera_service_call_device_info(self,launch_descr_with_parameters): + params = launch_descr_with_parameters[1] + + if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False: + print("Device not found? : " + params['device_type']) + assert False + + try: + print("Starting camera test...") + self.init_test("RsTest"+params['camera_name']) + self.wait_for_node(params['camera_name']) + self.create_service_client_ifs(get_node_heirarchy(params)) + #No need to call run_test() as no frame integritiy check required + response = self.get_deviceinfo() + if response is not None: + print(f"device_info service response:\n{response}\n") + assert params['device_type'].casefold() in response.device_name.casefold().split('_') + else: + assert False, "No device_info response received" + except Exception as e: + print(e) + finally: + #this step is important because the test will fail next time + pytest_rs_utils.kill_realsense2_camera_node() + self.shutdown() + diff --git a/realsense2_camera/test/live_camera/test_camera_tf_tests.py b/realsense2_camera/test/live_camera/test_camera_tf_tests.py index 15edc4d6f9..9b76aae0ad 100644 --- a/realsense2_camera/test/live_camera/test_camera_tf_tests.py +++ b/realsense2_camera/test/live_camera/test_camera_tf_tests.py @@ -101,7 +101,7 @@ def test_camera_test_tf_static_change(self,launch_descr_with_parameters): ''' self.init_test("RsTest"+self.params['camera_name']) self.wait_for_node(self.params['camera_name']) - self.create_param_ifs(get_node_heirarchy(self.params)) + self.create_service_client_ifs(get_node_heirarchy(self.params)) ret = self.run_test(themes, timeout=10) assert ret[0], ret[1] @@ -192,7 +192,7 @@ def test_camera_test_tf_dyn(self,launch_descr_with_parameters): ''' self.init_test("RsTest"+self.params['camera_name']) self.wait_for_node(self.params['camera_name']) - self.create_param_ifs(get_node_heirarchy(self.params)) + self.create_service_client_ifs(get_node_heirarchy(self.params)) ret = self.run_test(themes, timeout=10) assert ret[0], ret[1] ret = self.process_data(themes, False) diff --git a/realsense2_camera/test/live_camera/test_d415_basic_tests.py b/realsense2_camera/test/live_camera/test_d415_basic_tests.py index f397b70f9a..1b8ca54510 100644 --- a/realsense2_camera/test/live_camera/test_d415_basic_tests.py +++ b/realsense2_camera/test/live_camera/test_d415_basic_tests.py @@ -101,7 +101,7 @@ def test_D415_Change_Resolution(self,launch_descr_with_parameters): ''' self.init_test("RsTest"+params['camera_name']) self.spin_for_time(wait_time=1.0) - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) self.spin_for_time(wait_time=1.0) for key in cap: diff --git a/realsense2_camera/test/live_camera/test_d455_basic_tests.py b/realsense2_camera/test/live_camera/test_d455_basic_tests.py index d7e8e2d29d..687b4949ec 100644 --- a/realsense2_camera/test/live_camera/test_d455_basic_tests.py +++ b/realsense2_camera/test/live_camera/test_d455_basic_tests.py @@ -77,7 +77,7 @@ def test_D455_Change_Resolution(self,launch_descr_with_parameters): print("Starting camera test...") self.init_test("RsTest"+params['camera_name']) self.wait_for_node(params['camera_name']) - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) self.spin_for_time(0.5) assert self.set_bool_param('enable_color', False) self.spin_for_time(0.5) @@ -131,7 +131,7 @@ def test_D455_Seq_ID_update(self,launch_descr_with_parameters): print("Starting camera test...") self.init_test("RsTest"+params['camera_name']) self.wait_for_node(params['camera_name']) - self.create_param_ifs(get_node_heirarchy(params)) + self.create_service_client_ifs(get_node_heirarchy(params)) assert self.set_bool_param('depth_module.hdr_enabled', False) diff --git a/realsense2_camera/test/utils/pytest_rs_utils.py b/realsense2_camera/test/utils/pytest_rs_utils.py index 0a0069115f..75cfb78ecd 100644 --- a/realsense2_camera/test/utils/pytest_rs_utils.py +++ b/realsense2_camera/test/utils/pytest_rs_utils.py @@ -66,6 +66,7 @@ from sensor_msgs.msg import CameraInfo as msg_CameraInfo from realsense2_camera_msgs.msg import Extrinsics as msg_Extrinsics from realsense2_camera_msgs.msg import Metadata as msg_Metadata +from realsense2_camera_msgs.srv import DeviceInfo from sensor_msgs_py import point_cloud2 as pc2 import tf2_ros @@ -785,14 +786,17 @@ def create_subscription(self, msg_type, topic, data_type, store_raw_data=False, - def create_param_ifs(self, camera_name): + def create_service_client_ifs(self, camera_name): self.set_param_if = self.node.create_client(SetParameters, camera_name + '/set_parameters') self.get_param_if = self.node.create_client(GetParameters, camera_name + '/get_parameters') + self.get_device_info = self.node.create_client(DeviceInfo, camera_name + '/device_info') while not self.get_param_if.wait_for_service(timeout_sec=1.0): print('service not available, waiting again...') while not self.set_param_if.wait_for_service(timeout_sec=1.0): print('service not available, waiting again...') - + while not self.get_device_info.wait_for_service(timeout_sec=1.0): + print('service not available, waiting again...') + def send_param(self, req): future = self.set_param_if.call_async(req) while rclpy.ok(): @@ -806,7 +810,7 @@ def send_param(self, req): print("exception raised:") print(e) pass - return False + return False def get_param(self, req): future = self.get_param_if.call_async(req) @@ -820,7 +824,7 @@ def get_param(self, req): print("exception raised:") print(e) pass - return None + return None def set_string_param(self, param_name, param_value): req = SetParameters.Request() @@ -833,7 +837,16 @@ def set_bool_param(self, param_name, param_value): new_param_value = ParameterValue(type=ParameterType.PARAMETER_BOOL, bool_value=param_value) req.parameters = [Parameter(name=param_name, value=new_param_value)] return self.send_param(req) - + + def get_bool_param(self, param_name): + req = GetParameters.Request() + req.names = [param_name] + value = self.get_param(req) + if (value == None) or (value.type != ParameterType.PARAMETER_BOOL): + return None + else: + return value.bool_value + def set_integer_param(self, param_name, param_value): req = SetParameters.Request() new_param_value = ParameterValue(type=ParameterType.PARAMETER_INTEGER, integer_value=param_value) @@ -844,11 +857,25 @@ def get_integer_param(self, param_name): req = GetParameters.Request() req.names = [param_name] value = self.get_param(req) - if (value == None) or (value.type == ParameterType.PARAMETER_NOT_SET): + if (value == None) or (value.type != ParameterType.PARAMETER_INTEGER): return None else: return value.integer_value - + + def get_deviceinfo(self): + self.req = DeviceInfo.Request() + self.future = self.get_device_info.call_async(self.req) + while rclpy.ok(): + rclpy.spin_once(self.node) + if self.future.done(): + try: + response = self.future.result() + return response + except Exception as e: + print("exception raised:") + print(e) + return None + def spin_for_data(self,themes, timeout=5.0): ''' timeout value varies depending upon the system, it needs to be more if