Skip to content

boomerang-io/lib.eventing

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Boomerang Eventing Library

Table of Contents

Description

Boomerang Eventing Library (lib.eventing) is a Maven plugin that integrates event streaming technologies, currently limited to NATS Jetstream only.

This plugin has been built on top of jnats client library, targeting an easier integration with NATS Jetstream for the purposes of Boomerang platform, as well as for external services that want to publish and subscribe to Flow-related events to/from Boomerang through NATS.

Dependencies

Compile Maven Dependencies

  1. Jnats (jnats)
  2. Apache Log4j Core (log4j-core)
  3. JaCoCo Maven Plugin (jacoco-maven-plugin) for generating code coverage reports

Test Maven Dependencies

  1. JUnit (junit)
  2. NATS server (nats-streaming-server-embedded)
  3. Awaitility (awaitility)

Installation

Using Maven

Boomerang Eventing plugin is published to GitHub Packages. To install the package from GitHub's Apache Maven registry, edit the pom.xml file to include the package as a dependency:

<dependency>
  <groupId>io.boomerang</groupId>
  <artifactId>lib-jetstream</artifactId>
  <version>0.0.2</version>
</dependency>

Keep in mind that public GitHub packages require authentication, thus you need to authenticate to GitHub Packages with an access token. For more information, see "Authenticating to GitHub Packages".

Once you have set up the authentication to GitHub Packages and the plugin package has been added to your project's pom.xml file, you can build your project:

# Install
mvn install

# ...or compile
mvn compile

# ...or package
mvn package

Basic usage

Sending and receiving messages through NATS is as simple as connecting to the NATS server and publishing or subscribing for messages.

Connecting

The object responsible for handling the connection to the NATS server is named ConnectionPrimer, it is build on top of jnats Connection, thus sharing a lot of properties and options with jnats.

There are multiple ways to connect to the NATS server with lib.eventing:

  1. Connect to a one or more NATS servers by specifying one or more servers:
// Connect to one server
ConnectionPrimer connectionPrimer = new ConnectionPrimer("nats://localhost:4222");

// Connect to multiple servers
ConnectionPrimer connectionPrimer = new ConnectionPrimer(List.of(
    "nats://mynats1:4222",
    "nats://mynats2:4222",
    "nats://mynats3:4222"));
  1. Connect to NATS by providing a connection configuration Options.Builder object (from jnats, see class source code):
ConnectionPrimer connectionPrimer = new ConnectionPrimer(new Options.Builder()
    .server("nats://localhost:4222")
    .reconnectWait(Duration.ofSeconds(10)));

Take note that using this approach will overwrite the connectionListener and errorListener handlers (if set), since ConnectionPrimer needs these handlers to manage automatically the connection to the NATS server.

Publishing messages to Jetstream

Once the connection to the server is established, there are two ways of publishing messages - through PubTransmitter and PubSubTransceiver, the difference between both is that the latter also supports subscribing for receiving messages, while PubTransmitter is just for publishing messages.

PubTransmitter objects implements the interface PubOnlyTunnel and there are three components that must to be provided when creating a PubTransmitter (for PubSubTransceiver see "Subscribing to receive messages from Jetstream"):

  1. ConnectionPrimer - see connection to NATS server.
  2. StreamConfiguration - this object is from jnats and is used to specify the configuration for retrieving or creating a Jetstream Stream on the server.
  3. PubOnlyConfiguration - this class specifies the configuration for managing a NATS Jetstream Stream on the server, for example automaticallyCreateStream, a property that tells the library if it needs to automatically create a new Stream if one has not been found on the server. See class' javadoc for more information.
ConnectionPrimer connectionPrimer = new ConnectionPrimer("nats://localhost:4222");
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
    .name("test-stream")
    .storageType(StorageType.File)
    .subjects("test.one.*", "test.two.*", "test.six.>")
    .build();
PubOnlyConfiguration pubOnlyConfiguration = new PubOnlyConfiguration.Builder()
    .automaticallyCreateStream(true)
    .build();
PubOnlyTunnel pubOnlyTunnel =
    new PubTransmitter(connectionPrimer, streamConfiguration, pubOnlyConfiguration);

Once the PubTransmitter object has been created, you can easily publish messages to NATS Jetstream:

String subject = "test.one.hello";
String message = "Hello world!";
pubOnlyTunnel.publish(subject, message);

Keep in mind that when publishing a message to NATS Jetstream, lib.eventing checks the subject of the message to be published to have an exact match with the Stream's subject, be it a wildcard or a fixed subject string. If it matches - the message is sent to the server, if not - an exception is raised. See NATS subject wildcard format for more information.

Subscribing to receive messages from Jetstream

To start receiving messages from NATS Jetstream, the procedure is similar to configuring a PubTransmitter object. PubSubTransceiver implements the interface PubSubTunnel and there are four required components when creating the object:

  1. ConnectionPrimer - see connection to NATS server.
  2. StreamConfiguration - same as for PubTransmitter, this object is from jnats and is used to specify the configuration for retrieving or creating a Jetstream Stream on the server.
  3. ConsumerConfiguration - this object is also from jnats and is used to specify the configuration for retrieving or creating a Jetstream Consumer on the server.
  4. PubSubConfiguration - this class specifies the configuration for managing both a NATS Jetstream Stream and a Consumer on the server, for example automaticallyCreateConsumer, a property that tells the library if it needs to automatically create a new Consumer if one has not been found on the server. See class' javadoc for more information.
ConnectionPrimer connectionPrimer = new ConnectionPrimer("nats://localhost:4222");
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
    .name("test-stream")
    .storageType(StorageType.File)
    .subjects("test.one.*", "test.two.*", "test.six.>")
    .build();
ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder()
    .durable("test-consumer-pull")
    .build();
PubSubConfiguration pubSubConfiguration = new PubSubConfiguration.Builder()
    .automaticallyCreateStream(true)
    .automaticallyCreateConsumer(true)
    .build();
// @formatter:on
PubSubTunnel pubSubTunnel = new PubSubTransceiver(connectionPrimer, streamConfiguration,
    consumerConfiguration, pubSubConfiguration);

Once the PubSubTransceiver has been created, subscribing to receive messages from the NATS Jetstream is as simple as:

pubSubTunnel.subscribe(new SubHandler() {
  @Override
  public void newMessageReceived(PubSubTunnel pubSubTunnel, String subject, String message) {
    System.out.println("Received message with subject: " + subject + "\nMessage:\n" + message);
  }
});

When configuring a NATS Jetstream Consumer, you can use both push-based and pull-based Consumer types, lib.eventing will automatically detect the difference between these and will subscribe to the server accordingly.

Note

ConnectionPrimer object besides handling the connection and re-connection (if this has been lost) to the NATS server, it will also resubscribe any active listening PubSubTransceiver objects when a connection to the NATS server has been re-established.

Known Issues

  • If the connection to the NATS server is down, any published messaged through PubTransmitter or PubSubTransceiver will fail immediately. Client side fault tolerance needs to be implemented.

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

All of our work is licenses under the Apache License Version 2.0 unless specified otherwise due to constraints by dependencies.