diff --git a/nslsii/__init__.py b/nslsii/__init__.py index fc4f100e..1b1b79ac 100644 --- a/nslsii/__init__.py +++ b/nslsii/__init__.py @@ -1,4 +1,3 @@ -from collections import namedtuple from distutils.version import LooseVersion import logging from logging.handlers import SysLogHandler, TimedRotatingFileHandler @@ -464,6 +463,12 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None): See `tests/test_kafka_configuration.py` for an example configuration file. """ + from nslsii.kafka_utils import ( + _read_bluesky_kafka_config_file, + _subscribe_kafka_publisher, + _subscribe_kafka_queue_thread_publisher + ) + bluesky_kafka_config_path = None kafka_publisher_details = None @@ -481,6 +486,11 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None): # which is the format required by the confluent python api bootstrap_servers = ",".join(bluesky_kafka_configuration["bootstrap_servers"]) + runengine_producer_config = {} + if "producer_consumer_security_config" in bluesky_kafka_configuration: + runengine_producer_config.update(bluesky_kafka_configuration["producer_consumer_security_config"]) + runengine_producer_config.update(bluesky_kafka_configuration["runengine_producer_config"]) + if bluesky_kafka_configuration["abort_run_on_kafka_exception"]: kafka_publisher_details = _subscribe_kafka_publisher( RE, @@ -615,272 +625,3 @@ def migrate_metadata(): os.makedirs(directory, exist_ok=True) new_md = PersistentDict(directory) new_md.update(old_md) - - -def _read_bluesky_kafka_config_file(config_file_path): - """Read a YAML file of Kafka producer configuration details. - - The file must have three top-level entries as shown: - --- - abort_run_on_kafka_exception: true - bootstrap_servers: - - kafka1:9092 - - kafka2:9092 - runengine_producer_config: - acks: 0 - message.timeout.ms: 3000 - compression.codec: snappy - - Parameters - ---------- - config_file_path: str - path to the YAML file of Kafka producer configuration details - - Returns - ------- - dict of configuration details - """ - import yaml - - # read the Kafka Producer configuration details - if Path(config_file_path).exists(): - with open(config_file_path) as f: - bluesky_kafka_config = yaml.safe_load(f) - else: - raise FileNotFoundError(config_file_path) - - required_sections = ("abort_run_on_kafka_exception", "bootstrap_servers", "runengine_producer_config") - missing_required_sections = [ - required_section - for required_section - in required_sections - if required_section not in bluesky_kafka_config - ] - - if missing_required_sections: - raise Exception( - f"Bluesky Kafka configuration file '{config_file_path}' is missing required section(s) `{missing_required_sections}`" - ) - - return bluesky_kafka_config - - -""" -A namedtuple for holding details of the publisher created by -_subscribe_kafka_publisher. -""" -_SubscribeKafkaPublisherDetails = namedtuple( - "SubscribeKafkaPublisherDetails", - { - "beamline_topic", - "bootstrap_servers", - "producer_config", - "re_subscribe_token" - } -) - - -def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None): - """ - Subscribe a RunRouter to the specified RE to create Kafka Publishers. - Each Publisher will publish documents from a single run to the - Kafka topic ".bluesky.runengine.documents". - Parameters - ---------- - RE: RunEngine - the RunEngine to which the RunRouter will be subscribed - beamline_name: str - beamline start_name, for example "csx", to be used in building the - Kafka topic to which messages will be published - bootstrap_servers: str - Comma-delimited list of Kafka server addresses as a string such as ``'10.0.137.8:9092'`` - producer_config: dict - dictionary of Kafka Producer configuration settings - _publisher_factory: callable, optional - intended only for testing, default is bluesky_kafka.Publisher, optionally specify a callable - that constructs a Publisher-like object - - Returns - ------- - topic: str - the Kafka topic on which bluesky documents will be published - runrouter_token: int - subscription token corresponding to the RunRouter subscribed to the RunEngine - by this function - """ - from bluesky_kafka import Publisher - from bluesky_kafka.utils import list_topics - from event_model import RunRouter - - topic = f"{beamline_name.lower()}.bluesky.runengine.documents" - - if _publisher_factory is None: - _publisher_factory = Publisher - - def kafka_publisher_factory(start_name, start_doc): - # create a Kafka Publisher for a single run - # in response to a start document - kafka_publisher = _publisher_factory( - topic=topic, - bootstrap_servers=bootstrap_servers, - key=start_doc["uid"], - producer_config=producer_config, - flush_on_stop_doc=True, - ) - - def publish_or_abort_run(name_, doc_): - """ - Exceptions _should_ interrupt the current run. - """ - try: - kafka_publisher(name_, doc_) - except (BaseException, Exception) as exc_: - # log the exception and re-raise it to abort the current run - logger = logging.getLogger("nslsii") - logger.exception( - "an error occurred when %s published\n start_name: %s\n doc %s", - kafka_publisher, - name_, - doc_, - ) - raise exc_ - - try: - # on each start document call list_topics to test if we can connect to a Kafka broker - cluster_metadata = list_topics( - bootstrap_servers=bootstrap_servers, producer_config=producer_config, timeout=5.0 - ) - logging.getLogger("nslsii").info( - "connected to Kafka broker(s): %s", cluster_metadata - ) - return [publish_or_abort_run], [] - except (BaseException, Exception) as exc: - # log the exception and re-raise it to indicate no connection could be made to a Kafka broker - nslsii_logger = logging.getLogger("nslsii") - nslsii_logger.exception("'%s' failed to connect to Kafka", kafka_publisher) - raise exc - - rr = RunRouter(factories=[kafka_publisher_factory]) - runrouter_token = RE.subscribe(rr) - - # log this only once - logging.getLogger("nslsii").info( - "RE will publish documents to Kafka topic '%s'", topic - ) - - subscribe_kafka_publisher_details = _SubscribeKafkaPublisherDetails( - beamline_topic=topic, - bootstrap_servers=bootstrap_servers, - producer_config=producer_config, - re_subscribe_token=runrouter_token - ) - - return subscribe_kafka_publisher_details - - -""" -A namedtuple for holding details of the publisher created by -_subscribe_kafka_queue_thread_publisher. -""" -_SubscribeKafkaQueueThreadPublisherDetails = namedtuple( - "SubscribeKafkaQueueThreadPublisherDetails", - { - "beamline_topic", - "bootstrap_servers", - "producer_config", - "publisher_queue_thread_details", - "re_subscribe_token" - } -) - - -def _subscribe_kafka_queue_thread_publisher( - RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1 -): - """ - Create and start a separate thread to publish bluesky documents as Kafka - messages on a beamline-specific topic. - - This function performs three tasks: - 1) verify a Kafka broker with the expected beamline-specific topic is available - 2) instantiate a bluesky_kafka.Publisher with the expected beamline-specific topic - 3) delegate connecting the RunEngine and Publisher to _subscribe_kafka_publisher - - Parameters - ---------- - RE: RunEngine - the RunEngine to which the RunRouter will be subscribed - beamline_name: str - beamline name, for example "csx", to be used in building the - Kafka topic to which messages will be published - bootstrap_servers: str - Comma-delimited list of Kafka server addresses or hostnames and ports as a string - such as ``'kafka1:9092,kafka2:9092`` - producer_config: dict - dictionary of Kafka Producer configuration settings - - Returns - ------- - topic: str - the Kafka topic on which bluesky documents will be published, for example - "csx.bluesky.runengine.documents" - publisher_thread_re_token: int - RunEngine subscription token corresponding to the function subscribed to the RunEngine - that places (name, document) tuples on the publisher queue. This token is needed to - un-subscribe the function from the RunEngine, in case someone ever wants to do that. - - """ - from bluesky_kafka import BlueskyKafkaException - from bluesky_kafka.tools.queue_thread import build_kafka_publisher_queue_and_thread - - nslsii_logger = logging.getLogger("nslsii") - beamline_runengine_topic = None - kafka_publisher_token = None - publisher_thread_stop_event = None - kafka_publisher_re_token = None - publisher_queue_thread_details = None - - try: - nslsii_logger.info("connecting to Kafka broker(s): '%s'", bootstrap_servers) - beamline_runengine_topic = ( - f"{beamline_name.lower()}.bluesky.runengine.documents" - ) - - publisher_queue_thread_details = build_kafka_publisher_queue_and_thread( - topic=beamline_runengine_topic, - bootstrap_servers=bootstrap_servers, - producer_config=producer_config, - publisher_queue_timeout=publisher_queue_timeout, - ) - - publisher_thread_stop_event = publisher_queue_thread_details.publisher_thread_stop_event - - kafka_publisher_re_token = RE.subscribe( - publisher_queue_thread_details.put_on_publisher_queue - ) - - nslsii_logger.info( - "RunEngine will publish bluesky documents on Kafka topic '%s'", - beamline_runengine_topic, - ) - - except BaseException: - """ - An exception at this point means bluesky documents - will not be published as Kafka messages. - The exception will stop here so the run will not be aborted. - """ - nslsii_logger.exception( - "RunEngine is not able to publish bluesky documents as Kafka messages on topic '%s'", - beamline_runengine_topic, - ) - - subscribe_kafka_queue_thread_publisher_details = _SubscribeKafkaQueueThreadPublisherDetails( - beamline_topic=beamline_runengine_topic, - bootstrap_servers=bootstrap_servers, - producer_config=producer_config, - publisher_queue_thread_details=publisher_queue_thread_details, - re_subscribe_token=kafka_publisher_re_token - ) - - return subscribe_kafka_queue_thread_publisher_details diff --git a/nslsii/kafka_utils.py b/nslsii/kafka_utils.py new file mode 100644 index 00000000..01bac5fc --- /dev/null +++ b/nslsii/kafka_utils.py @@ -0,0 +1,282 @@ +import logging + +from collections import namedtuple +from pathlib import Path + + +def _read_bluesky_kafka_config_file(config_file_path): + """Read a YAML file of Kafka producer configuration details. + + The file must have three top-level entries as shown: + --- + abort_run_on_kafka_exception: true + bootstrap_servers: + - kafka1:9092 + - kafka2:9092 + runengine_producer_config: + acks: 0 + message.timeout.ms: 3000 + compression.codec: snappy + + Parameters + ---------- + config_file_path: str + path to the YAML file of Kafka producer configuration details + + Returns + ------- + dict of configuration details + """ + import yaml + + # read the Kafka Producer configuration details + if Path(config_file_path).exists(): + with open(config_file_path) as f: + bluesky_kafka_config = yaml.safe_load(f) + else: + raise FileNotFoundError(config_file_path) + + required_sections = ( + "abort_run_on_kafka_exception", + "bootstrap_servers", + # "producer_consumer_security_config", not required yet + "runengine_producer_config", + ) + missing_required_sections = [ + required_section + for required_section in required_sections + if required_section not in bluesky_kafka_config + ] + + if missing_required_sections: + raise Exception( + f"Bluesky Kafka configuration file '{config_file_path}' is missing required section(s) `{missing_required_sections}`" + ) + + return bluesky_kafka_config + + +""" +A namedtuple for holding details of the publisher created by +_subscribe_kafka_publisher. +""" +_SubscribeKafkaPublisherDetails = namedtuple( + "SubscribeKafkaPublisherDetails", + {"beamline_topic", "bootstrap_servers", "producer_config", "re_subscribe_token"}, +) + + +def _subscribe_kafka_publisher( + RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None +): + """ + Subscribe a RunRouter to the specified RE to create Kafka Publishers. + Each Publisher will publish documents from a single run to the + Kafka topic ".bluesky.runengine.documents". + + Parameters + ---------- + RE: RunEngine + the RunEngine to which the RunRouter will be subscribed + beamline_name: str + beamline start_name, for example "csx", to be used in building the + Kafka topic to which messages will be published + bootstrap_servers: str + Comma-delimited list of Kafka server addresses as a string such as ``'10.0.137.8:9092'`` + producer_config: dict + dictionary of Kafka Producer configuration settings + _publisher_factory: callable, optional + intended only for testing, default is bluesky_kafka.Publisher, optionally specify a callable + that constructs a Publisher-like object + + Returns + ------- + topic: str + the Kafka topic on which bluesky documents will be published + runrouter_token: int + subscription token corresponding to the RunRouter subscribed to the RunEngine + by this function + """ + from bluesky_kafka import Publisher + from bluesky_kafka.utils import list_topics + from event_model import RunRouter + + topic = f"{beamline_name.lower()}.bluesky.runengine.documents" + + if _publisher_factory is None: + _publisher_factory = Publisher + + def kafka_publisher_factory(start_name, start_doc): + # create a Kafka Publisher for a single run + # in response to a start document + + kafka_publisher = _publisher_factory( + topic=topic, + bootstrap_servers=bootstrap_servers, + key=start_doc["uid"], + producer_config=producer_config, + flush_on_stop_doc=True, + ) + + def publish_or_abort_run(name_, doc_): + """ + Exceptions _should_ interrupt the current run. + """ + try: + kafka_publisher(name_, doc_) + except (BaseException, Exception) as exc_: + # log the exception and re-raise it to abort the current run + logger = logging.getLogger("nslsii") + logger.exception( + "an error occurred when %s published\n start_name: %s\n doc %s", + kafka_publisher, + name_, + doc_, + ) + raise exc_ + + try: + # on each start document call list_topics to test if we can connect to a Kafka broker + cluster_metadata = list_topics( + bootstrap_servers=bootstrap_servers, + producer_config=producer_config, + timeout=5.0, + ) + logging.getLogger("nslsii").info( + "connected to Kafka broker(s): %s", cluster_metadata + ) + return [publish_or_abort_run], [] + except (BaseException, Exception) as exc: + # log the exception and re-raise it to indicate no connection could be made to a Kafka broker + nslsii_logger = logging.getLogger("nslsii") + nslsii_logger.exception("'%s' failed to connect to Kafka", kafka_publisher) + raise exc + + rr = RunRouter(factories=[kafka_publisher_factory]) + runrouter_token = RE.subscribe(rr) + + # log this only once + logging.getLogger("nslsii").info( + "RE will publish documents to Kafka topic '%s'", topic + ) + + subscribe_kafka_publisher_details = _SubscribeKafkaPublisherDetails( + beamline_topic=topic, + bootstrap_servers=bootstrap_servers, + producer_config=producer_config, + re_subscribe_token=runrouter_token, + ) + + return subscribe_kafka_publisher_details + + +""" +A namedtuple for holding details of the publisher created by +_subscribe_kafka_queue_thread_publisher. +""" +_SubscribeKafkaQueueThreadPublisherDetails = namedtuple( + "SubscribeKafkaQueueThreadPublisherDetails", + { + "beamline_topic", + "bootstrap_servers", + "producer_config", + "publisher_queue_thread_details", + "re_subscribe_token", + }, +) + + +def _subscribe_kafka_queue_thread_publisher( + RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1 +): + """ + Create and start a separate thread to publish bluesky documents as Kafka + messages on a beamline-specific topic. + + This function performs three tasks: + 1) verify a Kafka broker with the expected beamline-specific topic is available + 2) instantiate a bluesky_kafka.Publisher with the expected beamline-specific topic + 3) delegate connecting the RunEngine and Publisher to _subscribe_kafka_publisher + + Parameters + ---------- + RE: RunEngine + the RunEngine to which the RunRouter will be subscribed + beamline_name: str + beamline name, for example "csx", to be used in building the + Kafka topic to which messages will be published + bootstrap_servers: str + Comma-delimited list of Kafka server addresses or hostnames and ports as a string + such as ``'kafka1:9092,kafka2:9092`` + producer_config: dict + dictionary of Kafka Producer configuration settings + + Returns + ------- + topic: str + the Kafka topic on which bluesky documents will be published, for example + "csx.bluesky.runengine.documents" + publisher_thread_re_token: int + RunEngine subscription token corresponding to the function subscribed to the RunEngine + that places (name, document) tuples on the publisher queue. This token is needed to + un-subscribe the function from the RunEngine, in case someone ever wants to do that. + + """ + from bluesky_kafka import BlueskyKafkaException + from bluesky_kafka.tools.queue_thread import build_kafka_publisher_queue_and_thread + + nslsii_logger = logging.getLogger("nslsii") + beamline_runengine_topic = None + kafka_publisher_token = None + publisher_thread_stop_event = None + kafka_publisher_re_token = None + publisher_queue_thread_details = None + + try: + nslsii_logger.info("connecting to Kafka broker(s): '%s'", bootstrap_servers) + beamline_runengine_topic = ( + f"{beamline_name.lower()}.bluesky.runengine.documents" + ) + + publisher_queue_thread_details = build_kafka_publisher_queue_and_thread( + topic=beamline_runengine_topic, + bootstrap_servers=bootstrap_servers, + producer_config=producer_config, + publisher_queue_timeout=publisher_queue_timeout, + ) + + publisher_thread_stop_event = ( + publisher_queue_thread_details.publisher_thread_stop_event + ) + + kafka_publisher_re_token = RE.subscribe( + publisher_queue_thread_details.put_on_publisher_queue + ) + + nslsii_logger.info( + "RunEngine will publish bluesky documents on Kafka topic '%s'", + beamline_runengine_topic, + ) + + except BaseException: + """ + An exception at this point means bluesky documents + will not be published as Kafka messages. + The exception will stop here so the run will not be aborted. + """ + nslsii_logger.exception( + "RunEngine is not able to publish bluesky documents as Kafka messages on topic '%s'", + beamline_runengine_topic, + ) + + subscribe_kafka_queue_thread_publisher_details = ( + _SubscribeKafkaQueueThreadPublisherDetails( + beamline_topic=beamline_runengine_topic, + bootstrap_servers=bootstrap_servers, + producer_config=producer_config, + publisher_queue_thread_details=publisher_queue_thread_details, + re_subscribe_token=kafka_publisher_re_token, + ) + ) + + return subscribe_kafka_queue_thread_publisher_details diff --git a/nslsii/tests/test_kafka_configuration.py b/nslsii/tests/test_kafka_configuration.py index 1e40d3bb..20f04d8c 100644 --- a/nslsii/tests/test_kafka_configuration.py +++ b/nslsii/tests/test_kafka_configuration.py @@ -1,23 +1,24 @@ import os -from pathlib import Path import pytest -from nslsii import ( - configure_kafka_publisher, +from nslsii import configure_kafka_publisher + +from nslsii.kafka_utils import ( _read_bluesky_kafka_config_file, - _SubscribeKafkaPublisherDetails, - _SubscribeKafkaQueueThreadPublisherDetails ) +# these test configurations include localhost:9092 +# because configure_kafka_publisher verifies that +# a connection can be made to a broker test_bluesky_kafka_config_true = """\ --- abort_run_on_kafka_exception: true bootstrap_servers: + - localhost:9092 - kafka1:9092 - kafka2:9092 - - kafka3:9092 runengine_producer_config: acks: 0 message.timeout.ms: 3000 @@ -28,29 +29,59 @@ --- abort_run_on_kafka_exception: false bootstrap_servers: + - localhost:9092 - kafka1:9092 - kafka2:9092 - - kafka3:9092 runengine_producer_config: acks: 0 message.timeout.ms: 3000 compression.codec: snappy """ +test_bluesky_kafka_config_security_section = """\ +--- + abort_run_on_kafka_exception: false + bootstrap_servers: + - localhost:9092 + - kafka2:9092 + - kafka3:9092 + producer_consumer_security_config: + security.protocol: SASL_SSL + sasl.mechanisms: PLAIN + ssl.ca.location: /etc/ssl/certs/ca-bundle.crt + consumer_config: + auto.offset.reset: latest + runengine_producer_config: + compression.codec: snappy + security.protocol: SASL_SSL + sasl.mechanisms: PLAIN + ssl.ca.location: /etc/ssl/certs/ca-bundle.crt + runengine_topics: + - "{endstation}.bluesky.runengine.documents" + - "{endstation}.bluesky.runengine.{document_name}.documents" +""" + -def test_bluesky_kafka_config_path_env_var(tmp_path, RE): +def test_bluesky_kafka_config_path_env_var(tmp_path, RE, temporary_topics): """Test specifying a configuration file path by environment variable.""" - # write a temporary file for this test - test_config_file_path = tmp_path / "bluesky_kafka_config_content.yml" - with open(test_config_file_path, "wt") as f: - f.write(test_bluesky_kafka_config_false) - # add an extra item to test for later - f.write(f" config_file_path: {test_config_file_path}") + with temporary_topics(topics=["abc.bluesky.runengine.documents"]) as ( + beamline_topic, + ): + # write a temporary file for this test + test_config_file_path = tmp_path / "bluesky_kafka_config_content.yml" + with open(test_config_file_path, "wt") as f: + f.write(test_bluesky_kafka_config_false) + # add an extra item to test for later + f.write(f" config_file_path: {test_config_file_path}") - os.environ["BLUESKY_KAFKA_CONFIG_PATH"] = str(test_config_file_path) - bluesky_kafka_configuration, publisher_details = configure_kafka_publisher(RE, "abc") + os.environ["BLUESKY_KAFKA_CONFIG_PATH"] = str(test_config_file_path) + bluesky_kafka_configuration, publisher_details = configure_kafka_publisher( + RE, "abc" + ) - assert bluesky_kafka_configuration["config_file_path"] == str(test_config_file_path) + assert bluesky_kafka_configuration["config_file_path"] == str( + test_config_file_path + ) def test_bluesky_kafka_config_path_env_var_negative(tmp_path, RE): @@ -85,9 +116,9 @@ def test__read_bluesky_kafka_config_file(tmp_path): bluesky_kafka_config = _read_bluesky_kafka_config_file(str(test_config_file_path)) assert bluesky_kafka_config["bootstrap_servers"] == [ + "localhost:9092", "kafka1:9092", "kafka2:9092", - "kafka3:9092", ] runengine_producer_config = bluesky_kafka_config["runengine_producer_config"] @@ -97,6 +128,59 @@ def test__read_bluesky_kafka_config_file(tmp_path): assert runengine_producer_config["compression.codec"] == "snappy" +def test__read_bluesky_kafka_config_file_producer_consumer_security(tmp_path): + # write a temporary file for this test + test_config_file_path = tmp_path / "bluesky_kafka_config_content.yml" + with open(test_config_file_path, "wt") as f: + f.write(test_bluesky_kafka_config_security_section) + + bluesky_kafka_config = _read_bluesky_kafka_config_file(str(test_config_file_path)) + + assert bluesky_kafka_config["bootstrap_servers"] == [ + "localhost:9092", + "kafka2:9092", + "kafka3:9092", + ] + + producer_consumer_security_config = bluesky_kafka_config[ + "producer_consumer_security_config" + ] + assert len(producer_consumer_security_config) == 3 + assert producer_consumer_security_config["security.protocol"] == "SASL_SSL" + assert producer_consumer_security_config["sasl.mechanisms"] == "PLAIN" + assert ( + producer_consumer_security_config["ssl.ca.location"] + == "/etc/ssl/certs/ca-bundle.crt" + ) + + runengine_producer_config = bluesky_kafka_config["runengine_producer_config"] + assert len(runengine_producer_config) == 4 + assert runengine_producer_config["compression.codec"] == "snappy" + assert producer_consumer_security_config["security.protocol"] == "SASL_SSL" + assert producer_consumer_security_config["sasl.mechanisms"] == "PLAIN" + assert ( + producer_consumer_security_config["ssl.ca.location"] + == "/etc/ssl/certs/ca-bundle.crt" + ) + + +def test__read_bluesky_kafka_config_file_runengine_topics(tmp_path): + # write a temporary file for this test + test_config_file_path = tmp_path / "bluesky_kafka_config_content.yml" + with open(test_config_file_path, "wt") as f: + f.write(test_bluesky_kafka_config_security_section) + + bluesky_kafka_config = _read_bluesky_kafka_config_file(str(test_config_file_path)) + + runengine_topics = bluesky_kafka_config["runengine_topics"] + assert len(runengine_topics) == 2 + assert runengine_topics[0] == "{endstation}.bluesky.runengine.documents" + assert ( + runengine_topics[1] + == "{endstation}.bluesky.runengine.{document_name}.documents" + ) + + def test__read_bluesky_kafka_config_file_failure(tmp_path): """Raise FileNotFoundError if the configuration file does not exist. @@ -136,14 +220,14 @@ def test_configure_kafka_publisher_abort_run_true(tmp_path, RE): f.write(test_bluesky_kafka_config_true) bluesky_kafka_configuration, publisher_details = configure_kafka_publisher( - RE, - "abc", - override_config_path=test_config_file_path + RE, "abc", override_config_path=test_config_file_path ) assert publisher_details.__class__.__name__ == "SubscribeKafkaPublisherDetails" assert publisher_details.beamline_topic == "abc.bluesky.runengine.documents" - assert publisher_details.bootstrap_servers == "kafka1:9092,kafka2:9092,kafka3:9092" + assert ( + publisher_details.bootstrap_servers == "localhost:9092,kafka1:9092,kafka2:9092" + ) assert publisher_details.re_subscribe_token == 0 @@ -157,12 +241,15 @@ def test_configure_kafka_publisher_abort_run_false(tmp_path, RE): f.write(test_bluesky_kafka_config_false) bluesky_kafka_configuration, publisher_details = configure_kafka_publisher( - RE, - "abc", - override_config_path=test_config_file_path + RE, "abc", override_config_path=test_config_file_path ) - assert publisher_details.__class__.__name__ == "SubscribeKafkaQueueThreadPublisherDetails" + assert ( + publisher_details.__class__.__name__ + == "SubscribeKafkaQueueThreadPublisherDetails" + ) assert publisher_details.beamline_topic == "abc.bluesky.runengine.documents" - assert publisher_details.bootstrap_servers == "kafka1:9092,kafka2:9092,kafka3:9092" + assert ( + publisher_details.bootstrap_servers == "localhost:9092,kafka1:9092,kafka2:9092" + ) assert publisher_details.re_subscribe_token is None diff --git a/nslsii/tests/test_kafka_publisher.py b/nslsii/tests/test_kafka_publisher.py index ecac0571..7659696b 100644 --- a/nslsii/tests/test_kafka_publisher.py +++ b/nslsii/tests/test_kafka_publisher.py @@ -5,6 +5,7 @@ import pytest import nslsii +import nslsii.kafka_utils from bluesky.plans import count from bluesky_kafka import BlueskyKafkaException @@ -64,15 +65,17 @@ def test__subscribe_kafka_publisher( beamline_topic, ): - subscribe_kafka_publisher_details = nslsii._subscribe_kafka_publisher( - RE=RE, - beamline_name=beamline_name, - bootstrap_servers=kafka_bootstrap_servers, - producer_config={ - "acks": "all", - "enable.idempotence": False, - "request.timeout.ms": 1000, - }, + subscribe_kafka_publisher_details = ( + nslsii.kafka_utils._subscribe_kafka_publisher( + RE=RE, + beamline_name=beamline_name, + bootstrap_servers=kafka_bootstrap_servers, + producer_config={ + "acks": "all", + "enable.idempotence": False, + "request.timeout.ms": 1000, + }, + ) ) assert subscribe_kafka_publisher_details.beamline_topic == beamline_topic @@ -148,15 +151,17 @@ def test_no_broker( beamline_topic, ): - subscribe_kafka_publisher_details = nslsii._subscribe_kafka_publisher( - RE=RE, - beamline_name=beamline_name, - bootstrap_servers="100.100.100.100:9092", - producer_config={ - "acks": "all", - "enable.idempotence": False, - "request.timeout.ms": 1000, - }, + subscribe_kafka_publisher_details = ( + nslsii.kafka_utils._subscribe_kafka_publisher( + RE=RE, + beamline_name=beamline_name, + bootstrap_servers="100.100.100.100:9092", + producer_config={ + "acks": "all", + "enable.idempotence": False, + "request.timeout.ms": 1000, + }, + ) ) assert subscribe_kafka_publisher_details.beamline_topic == beamline_topic @@ -213,7 +218,7 @@ def mock_publisher_factory(*args, **kwargs): # but only __call__ will be invoked return Mock(side_effect=BlueskyKafkaException) - subscribe_kafka_publisher_details = nslsii._subscribe_kafka_publisher( + subscribe_kafka_publisher_details = nslsii.kafka_utils._subscribe_kafka_publisher( RE=RE, beamline_name=beamline_name, bootstrap_servers=kafka_bootstrap_servers, diff --git a/nslsii/tests/test_kafka_queue_thread_publisher.py b/nslsii/tests/test_kafka_queue_thread_publisher.py index 8caa2bff..e40bccbb 100644 --- a/nslsii/tests/test_kafka_queue_thread_publisher.py +++ b/nslsii/tests/test_kafka_queue_thread_publisher.py @@ -4,7 +4,9 @@ from bluesky.plans import count from event_model import sanitize_doc + import nslsii +import nslsii.kafka_utils def test_build_and_subscribe_kafka_queue_thread_publisher( @@ -60,19 +62,26 @@ def test_build_and_subscribe_kafka_queue_thread_publisher( beamline_topic, ): - subscribe_kafka_queue_thread_publisher_details = nslsii._subscribe_kafka_queue_thread_publisher( - RE=RE, - beamline_name=beamline_name, - bootstrap_servers=kafka_bootstrap_servers, - producer_config={ - "acks": "all", - "enable.idempotence": False, - "request.timeout.ms": 1000, - }, + subscribe_kafka_queue_thread_publisher_details = ( + nslsii.kafka_utils._subscribe_kafka_queue_thread_publisher( + RE=RE, + beamline_name=beamline_name, + bootstrap_servers=kafka_bootstrap_servers, + producer_config={ + "acks": "all", + "enable.idempotence": False, + "request.timeout.ms": 1000, + }, + ) ) - assert subscribe_kafka_queue_thread_publisher_details.beamline_topic == beamline_topic - assert isinstance(subscribe_kafka_queue_thread_publisher_details.re_subscribe_token, int) + assert ( + subscribe_kafka_queue_thread_publisher_details.beamline_topic + == beamline_topic + ) + assert isinstance( + subscribe_kafka_queue_thread_publisher_details.re_subscribe_token, int + ) published_bluesky_documents = [] @@ -89,10 +98,8 @@ def store_published_document(name, document): # documents: start, descriptor, event, stop assert len(published_bluesky_documents) == 4 - consumed_bluesky_documents = ( - consume_documents_from_kafka_until_first_stop_document( - kafka_topic=subscribe_kafka_queue_thread_publisher_details.beamline_topic - ) + consumed_bluesky_documents = consume_documents_from_kafka_until_first_stop_document( + kafka_topic=subscribe_kafka_queue_thread_publisher_details.beamline_topic ) assert len(published_bluesky_documents) == len(consumed_bluesky_documents) @@ -137,7 +144,7 @@ def test_no_beamline_topic(kafka_bootstrap_servers, RE): # use a random string as the beamline name so topics will not be duplicated across tests beamline_name = str(uuid.uuid4())[:8] - nslsii._subscribe_kafka_queue_thread_publisher( + nslsii.kafka_utils._subscribe_kafka_queue_thread_publisher( RE=RE, beamline_name=beamline_name, bootstrap_servers=kafka_bootstrap_servers, @@ -162,7 +169,7 @@ def test_publisher_with_no_broker(RE, hw): Test the case of no Kafka broker. """ beamline_name = str(uuid.uuid4())[:8] - subscribe_kafka_queue_thread_publisher_details = nslsii._subscribe_kafka_queue_thread_publisher( + subscribe_kafka_queue_thread_publisher_details = nslsii.kafka_utils._subscribe_kafka_queue_thread_publisher( RE=RE, beamline_name=beamline_name, # specify a bootstrap server that does not exist diff --git a/scripts/bitnami-kafka-docker-compose.yml b/scripts/bitnami-kafka-docker-compose.yml index 7132d320..03334810 100644 --- a/scripts/bitnami-kafka-docker-compose.yml +++ b/scripts/bitnami-kafka-docker-compose.yml @@ -1,33 +1,38 @@ -version: '2' +version: '3' services: zookeeper: image: 'docker.io/bitnami/zookeeper:latest' ports: - '2181:2181' - volumes: - - 'zookeeper_data:/bitnami' + #volumes: + # - 'zookeeper_data:/bitnami' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'docker.io/bitnami/kafka:latest' ports: - '9092:9092' - - '29092:29092' - volumes: - - 'kafka_data:/bitnami' + #- '29092:29092' + #volumes: + # - 'kafka_data:/bitnami' environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + #- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + #- ALLOW_PLAINTEXT_LISTENER=yes + #- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + #- KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092 + #- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 depends_on: - zookeeper -volumes: - zookeeper_data: - driver: local - kafka_data: - driver: local +#volumes: +# zookeeper_data: +# driver: local +# kafka_data: +# driver: local