From 5be4ab6a4c3550f4948cdd68b7dd6746a5ff67b2 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 16:16:06 -0800 Subject: [PATCH 01/10] Update `emit_mq_message` to support SelectConnections Related to neon-iris improvements --- neon_mq_connector/connector.py | 39 +++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 321ad3e..d66467f 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -271,7 +271,8 @@ def create_unique_id(): @classmethod def emit_mq_message(cls, - connection: pika.BlockingConnection, + connection: Union[pika.BlockingConnection, + pika.SelectConnection], request_data: dict, exchange: Optional[str] = '', queue: Optional[str] = '', @@ -302,22 +303,26 @@ def emit_mq_message(cls, .get("mq", {}).get("message_id") or cls.create_unique_id()) - with connection.channel() as channel: - if exchange: - channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - auto_delete=False) - if queue: - declared_queue = channel.queue_declare(queue=queue, - auto_delete=False) - if exchange_type == ExchangeType.fanout.value: - channel.queue_bind(queue=declared_queue.method.queue, - exchange=exchange) - channel.basic_publish(exchange=exchange or '', - routing_key=queue, - body=dict_to_b64(request_data), - properties=pika.BasicProperties( - expiration=str(expiration))) + channel = connection.channel() + + if exchange: + channel.exchange_declare(exchange=exchange, + exchange_type=exchange_type, + auto_delete=False) + if queue: + declared_queue = channel.queue_declare(queue=queue, + auto_delete=False) + if exchange_type == ExchangeType.fanout.value: + channel.queue_bind(queue=declared_queue.method.queue, + exchange=exchange) + channel.basic_publish(exchange=exchange or '', + routing_key=queue, + body=dict_to_b64(request_data), + properties=pika.BasicProperties( + expiration=str(expiration))) + + channel.close() + LOG.debug(f"sent message: {request_data['message_id']}") return request_data['message_id'] From 531d7835f823553795888684577f95be085e4464 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 16:31:59 -0800 Subject: [PATCH 02/10] Update `emit_mq_message` to support SelectConnection channel creation callback --- neon_mq_connector/connector.py | 75 ++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index d66467f..f370816 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -30,6 +30,8 @@ import copy import time import uuid +from asyncio import Event + import pika import pika.exceptions @@ -46,7 +48,6 @@ from neon_mq_connector.utils.network_utils import dict_to_b64 from neon_mq_connector.utils.thread_utils import RepeatingTimer - # DO NOT REMOVE ME: Defined for backward compatibility ConsumerThread = BlockingConsumerThread @@ -272,7 +273,7 @@ def create_unique_id(): @classmethod def emit_mq_message(cls, connection: Union[pika.BlockingConnection, - pika.SelectConnection], + pika.SelectConnection], request_data: dict, exchange: Optional[str] = '', queue: Optional[str] = '', @@ -303,26 +304,30 @@ def emit_mq_message(cls, .get("mq", {}).get("message_id") or cls.create_unique_id()) - channel = connection.channel() - - if exchange: - channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - auto_delete=False) - if queue: - declared_queue = channel.queue_declare(queue=queue, - auto_delete=False) - if exchange_type == ExchangeType.fanout.value: - channel.queue_bind(queue=declared_queue.method.queue, - exchange=exchange) - channel.basic_publish(exchange=exchange or '', - routing_key=queue, - body=dict_to_b64(request_data), - properties=pika.BasicProperties( - expiration=str(expiration))) - - channel.close() - + def _on_channel_open(new_channel): + if exchange: + new_channel.exchange_declare(exchange=exchange, + exchange_type=exchange_type, + auto_delete=False) + if queue: + declared_queue = new_channel.queue_declare(queue=queue, + auto_delete=False) + if exchange_type == ExchangeType.fanout.value: + new_channel.queue_bind(queue=declared_queue.method.queue, + exchange=exchange) + new_channel.basic_publish(exchange=exchange or '', + routing_key=queue, + body=dict_to_b64(request_data), + properties=pika.BasicProperties( + expiration=str(expiration))) + + new_channel.close() + + if isinstance(connection, pika.BlockingConnection): + _on_channel_open(connection.channel()) + else: + connection.channel(on_open_callback=_on_channel_open) + LOG.debug(f"sent message: {request_data['message_id']}") return request_data['message_id'] @@ -453,17 +458,17 @@ def register_consumer(self, name: str, vhost: str, queue: str, self.consumer_properties.setdefault(name, {}) self.consumer_properties[name]['properties'] = \ dict( - name=name, - connection_params=self.get_connection_params(vhost), - queue=queue, - queue_reset=queue_reset, - callback_func=callback, - exchange=exchange, - exchange_reset=exchange_reset, - exchange_type=exchange_type, - error_func=error_handler, - auto_ack=auto_ack, - queue_exclusive=queue_exclusive, + name=name, + connection_params=self.get_connection_params(vhost), + queue=queue, + queue_reset=queue_reset, + callback_func=callback, + exchange=exchange, + exchange_reset=exchange_reset, + exchange_type=exchange_type, + error_func=error_handler, + auto_ack=auto_ack, + queue_exclusive=queue_exclusive, ) self.consumer_properties[name]['restart_attempts'] = int(restart_attempts) self.consumer_properties[name]['started'] = False @@ -561,8 +566,8 @@ def run_consumers(self, names: Optional[tuple] = None, daemon=True): names = list(self.consumers) for name in names: if (isinstance(self.consumers.get(name), SUPPORTED_THREADED_CONSUMERS) - and self.consumers[name].is_consumer_alive - and not self.consumers[name].is_consuming): + and self.consumers[name].is_consumer_alive + and not self.consumers[name].is_consuming): self.consumers[name].daemon = daemon self.consumers[name].start() self.consumer_properties[name]['started'] = True From 4e744773f16045e4535dc523361f899254ad6dd7 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 16:53:31 -0800 Subject: [PATCH 03/10] Loosen dependency to allow newer `pika` --- requirements/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 5ff8a05..205866d 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,3 +1,3 @@ -pika==1.2.0 +pika~=1.2 ovos-config~=0.0,>=0.0.8 ovos-utils~=0.0,>=0.0.32 From 49142bf6a3df9cb1568e5ce584e628902356aaf6 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 17:14:54 -0800 Subject: [PATCH 04/10] Troubleshoot channel opening --- neon_mq_connector/connector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index f370816..f01dac0 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -305,6 +305,8 @@ def emit_mq_message(cls, cls.create_unique_id()) def _on_channel_open(new_channel): + while not new_channel.is_open: + LOG.warning(f"Waiting for channel to open ({new_channel})") if exchange: new_channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, From 2c1806d92e16e694775b072867ad220ec532200e Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 17:25:54 -0800 Subject: [PATCH 05/10] Add more logging --- neon_mq_connector/connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index f01dac0..a99128a 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -326,6 +326,7 @@ def _on_channel_open(new_channel): new_channel.close() if isinstance(connection, pika.BlockingConnection): + LOG.info("Using blocking connection") _on_channel_open(connection.channel()) else: connection.channel(on_open_callback=_on_channel_open) From b13d6e94cd1f7e413311d5ef31fd052e7a144464 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 17:41:25 -0800 Subject: [PATCH 06/10] Add more logging to troubleshoot blocking connection usage --- neon_mq_connector/connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index a99128a..6eddf42 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -326,9 +326,10 @@ def _on_channel_open(new_channel): new_channel.close() if isinstance(connection, pika.BlockingConnection): - LOG.info("Using blocking connection") + LOG.info(f"Using blocking connection for queue: {queue}") _on_channel_open(connection.channel()) else: + LOG.info(f"Using select connection for queue: {queue}") connection.channel(on_open_callback=_on_channel_open) LOG.debug(f"sent message: {request_data['message_id']}") From 9f75081215b8922fd545401729bbe78ff98cc550 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 20 Jan 2025 17:53:23 -0800 Subject: [PATCH 07/10] Cleanup logging Troubleshoot blocking connection usage --- neon_mq_connector/connector.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 6eddf42..03db8c6 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -305,8 +305,6 @@ def emit_mq_message(cls, cls.create_unique_id()) def _on_channel_open(new_channel): - while not new_channel.is_open: - LOG.warning(f"Waiting for channel to open ({new_channel})") if exchange: new_channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, @@ -326,10 +324,10 @@ def _on_channel_open(new_channel): new_channel.close() if isinstance(connection, pika.BlockingConnection): - LOG.info(f"Using blocking connection for queue: {queue}") + LOG.info(f"Using blocking connection for request: {request_data}") _on_channel_open(connection.channel()) else: - LOG.info(f"Using select connection for queue: {queue}") + LOG.debug(f"Using select connection for queue: {queue}") connection.channel(on_open_callback=_on_channel_open) LOG.debug(f"sent message: {request_data['message_id']}") From 154c59511971435c83616b47a7c855428c3d213d Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 21 Jan 2025 11:29:45 -0800 Subject: [PATCH 08/10] Update `emit_mq_message` to prevent mutating input data Add test coverage for `emit_mq_message` for Blocking and Select connections --- neon_mq_connector/connector.py | 3 ++ tests/test_connector.py | 79 +++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 03db8c6..c4e2f2f 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -294,6 +294,9 @@ def emit_mq_message(cls, :raises ValueError: invalid request data provided :returns message_id: id of the sent message """ + # Make a copy of request_data to prevent modifying the input object + request_data = dict(request_data) + if not isinstance(request_data, dict): raise TypeError(f"Expected dict and got {type(request_data)}") if not request_data: diff --git a/tests/test_connector.py b/tests/test_connector.py index 0915db9..9239c52 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -34,6 +34,8 @@ from unittest.mock import Mock, patch from ovos_utils.log import LOG +from pika.adapters.blocking_connection import BlockingConnection +from pika.adapters.select_connection import SelectConnection from pika.exchange_type import ExchangeType from neon_mq_connector.connector import MQConnector, ConsumerThreadInstance @@ -384,4 +386,79 @@ def test_init_rmq_down(self, get_timeout): callback.assert_called_once() connector.stop() - # TODO: test other methods + def test_emit_mq_message(self): + from neon_mq_connector.utils.network_utils import b64_to_dict + + test_config = {"server": "127.0.0.1", + "port": self.rmq_instance.port, + "users": { + "test": { + "user": "test_user", + "password": "test_password" + }}} + test_vhost = "/neon_testing" + test_queue = "test_queue" + connector = MQConnector(test_config, "test") + connector.vhost = test_vhost + + request_data = {"test": True, + "data": ["test"]} + + callback_event = threading.Event() + callback = Mock(side_effect=lambda *args: callback_event.set()) + connector.register_consumer("test_consumer", vhost=test_vhost, + queue=test_queue, callback=callback) + connector.run() + + close_event = threading.Event() + on_open = Mock() + on_error = Mock() + on_close = Mock(side_effect=lambda *args: close_event.set()) + + blocking_connection = BlockingConnection( + parameters=connector.get_connection_params(test_vhost)) + + async_connection = SelectConnection( + parameters=connector.get_connection_params(test_vhost), + on_open_callback=on_open, on_open_error_callback=on_error, + on_close_callback=on_close) + async_thread = threading.Thread(target=async_connection.ioloop.start, + daemon=True) + async_thread.start() + + # Blocking connection emit + message_id = connector.emit_mq_message(blocking_connection, + request_data, queue=test_queue) + self.assertIsInstance(message_id, str) + callback_event.wait(timeout=5) + self.assertTrue(callback_event.is_set()) + callback.assert_called_once() + self.assertEqual(b64_to_dict(callback.call_args.args[3]), + {**request_data, "message_id": message_id}) + callback.reset_mock() + callback_event.clear() + + # Async connection emit + on_open.assert_called_once() + message_id_2 = connector.emit_mq_message(async_connection, + request_data, queue=test_queue) + self.assertIsInstance(message_id, str) + self.assertNotEqual(message_id, message_id_2) + callback_event.wait(timeout=5) + self.assertTrue(callback_event.is_set()) + callback.assert_called_once() + self.assertEqual(b64_to_dict(callback.call_args.args[3]), + {**request_data, "message_id": message_id_2}) + + on_close.assert_not_called() + connector.stop() + + async_connection.close() + close_event.wait(timeout=5) + self.assertTrue(close_event.is_set()) + on_close.assert_called_once() + + async_thread.join(3) + on_error.assert_not_called() + +# TODO: test other methods From 5be88763518eb9ea30d80474b3e4afe7792223d7 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 21 Jan 2025 12:18:30 -0800 Subject: [PATCH 09/10] Update log to DEBUG Disable sync and observer threads in test Connector instance --- neon_mq_connector/connector.py | 2 +- tests/test_connector.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index c4e2f2f..a78a4ac 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -327,7 +327,7 @@ def _on_channel_open(new_channel): new_channel.close() if isinstance(connection, pika.BlockingConnection): - LOG.info(f"Using blocking connection for request: {request_data}") + LOG.debug(f"Using blocking connection for request: {request_data}") _on_channel_open(connection.channel()) else: LOG.debug(f"Using select connection for queue: {queue}") diff --git a/tests/test_connector.py b/tests/test_connector.py index 9239c52..1779481 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -408,7 +408,7 @@ def test_emit_mq_message(self): callback = Mock(side_effect=lambda *args: callback_event.set()) connector.register_consumer("test_consumer", vhost=test_vhost, queue=test_queue, callback=callback) - connector.run() + connector.run(run_sync=False, run_observer=False) close_event = threading.Event() on_open = Mock() From 8abfc90419f3afa6f10c33957f80e0ededd306c5 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 21 Jan 2025 12:40:43 -0800 Subject: [PATCH 10/10] Update test to wait for open event --- tests/test_connector.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_connector.py b/tests/test_connector.py index 1779481..2ac7ce5 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -410,8 +410,9 @@ def test_emit_mq_message(self): queue=test_queue, callback=callback) connector.run(run_sync=False, run_observer=False) + open_event = threading.Event() close_event = threading.Event() - on_open = Mock() + on_open = Mock(side_effect=lambda *args: open_event.set()) on_error = Mock() on_close = Mock(side_effect=lambda *args: close_event.set()) @@ -439,6 +440,8 @@ def test_emit_mq_message(self): callback_event.clear() # Async connection emit + open_event.wait(timeout=5) + self.assertTrue(open_event.is_set()) on_open.assert_called_once() message_id_2 = connector.emit_mq_message(async_connection, request_data, queue=test_queue)