Following the definition of EIP (Enterprise Integration Pattern) inspired by the Spring Integration.
This Project does not claim complete implementation but best effort - Feel free to help expand the project
The Framework is working with Attributes to build the Integration Flow:
Example of a flow. A flow could be split over several classes, the different steps are connected by the name of the channels:
using EnterpriseIntegration.ChannelAttributes;
public class ExampleFlow001
{
[ServiceActivator(inChannelId: "hello", outChannelId: "world")]
public string Hello(string prefix)
{
return $"{prefix} hello";
}
[ServiceActivator(inChannelId: "world", outChannelId: "random")]
public string World(string data)
{
return $"{data} world";
}
[Router(inChannelId: "random")]
public string Randomizer(string data)
{
return Random.Shared.NextInt64() % 2 == 0 ? "hello" : "end";
}
[Endpoint(inChannelId: "end")]
public void End(string data)
{
...
}
}
Starting the flow by sending a message:
using EnterpriseIntegration.Flow;
public class ProductController : Controller
{
private readonly IMessageGateway _messageGateway;
// Inject Gateway with Dependency Injection
public ProductController(IMessageGateway messageGateway)
{
_messageGateway = messageGateway;
}
public async Task<ActionResult> OrderProduct(Product product)
{
// send message to flow for processing
await _messageGateway.Send("order-product", product);
return View();
}
}
Register Enterprise Integration to the ServiceCollection
using EnterpriseIntegration;
public void ConfigureServices(IServiceCollection services)
{
services
// Register Flows used (so to leverage Dependency Injection)
.AddSingleton<ExampleFlow001>()
// Register Enterprise Integration
.UseEnterpriseIntegration();
}
Feature | Status | Description |
---|---|---|
ServiceActivator | DONE | Allows to define a method which receives and sends a Message. |
Router | DONE | Allows to define the next channel based on Conditions. |
Endpoint | DONE | Allows to define a method which only receives a Message. |
Splitter | DONE | Allows to split a single Message to several Messages (,to be aggregated again). |
Aggregator | DONE | Allows to aggregate several Messages back into one (after being split). |
Filter | DONE | Allows to only continue with a subset of Messages |
WireTap | DONE | (PRE/POSTAction) Allows to consume Messages without being part of the flow |
History | DONE | (POSTAction) Allows to Track the History of an Message |
ErrorHandling | DONE | Exceptions are forwarded to an ErrorChannel |
InMemoryChannel | DONE | Channel for passing messages in the same application |
RabbitMQChannel | DONE | Channel for passing messages via RabbitMQ queues |
KafkaChannel | TODO | Channel for passing messages via Kafka topics |
The ServiceActivator allows to activate/execute service from within the flow. This is the basic use case to execute code or call other services as part of a flow.
[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public UserRegistrationResult Register(User user)
{
return UserService.Register(user);
}
The FlowEngine tries to map the provided payload to the parameters of the attributed method. In addition to just expecting the payload, it's also possible to expect the message headers and/or the message itself. Modifications to the message headers will be forwarded to further flow nodes.
[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public UserRegistrationResult Register(IMessage<User> userMessage)
{
return UserService.Register(userMessage.Payload);
}
[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public user Authenticate(User user, IMessageHeaders headers)
{
headers.Add("token", UserService.Authenticate(user));
return user;
}
A Router allows to route messages to different channels by evaluating any condition. The as router defined method, can execute any code and must at the end return a ChannelId, to where the originial message/payload is to be sent to.
[Router(inChannelId: "route-user-exists")]
public ChannelId RouteUserExists(User? user)
{
return new ChannelId(user != null ? "set-user" : "load-user");
}
The Framework comes with a predefined Router, which routes to a Channel provided in the headers.
using EnterpriseIntegration.Channels;
[Router(inChannelId: "load-users", outChannelId: EngineChannels.RouteByHeaderChannel)]
public async Task<User> LoadUser(string userId, IMessageHeaders headers)
{
headers.RouteToChannel = new ChannelId(user != null ? "set-user" : "load-user");
return await UserService.LoadUser(userId);
}
Endpoints are similar to ServiceActivator but are intended to be placed at the end of the flow, and therefor do not provide a outChannelId.
[Endpoint(inChannelId: "complete-user-creation")]
public async Task CompleteUserCreation(string userId)
{
await UserService.SetUserActive(userId);
}
Splitter allow to generate multiple follow up messages from a single message, these can later be aggregated with an Aggregator to complete the complete flow with a single message.
A Splitter adds meta information to the headers, to be used by an Aggregator to wait for completion of all sent messages.
[Splitter(inChannelId: "process-complete-order", outChannelId: "process-single-item")]
public IEnumerable<OrderItem> ProcessOrder(Oder order)
{
return order.Items;
}
Filter allows to stop some messages from the flow. This might be helpful as a part of a Splitter.
[Filter("filter", "next-channel")]
public FilterResult OnlyForwardWhenEnoughCash(Message message)
{
return message.Cash >= Cost ? FilterResult.Forward : FilterResult.Discard;
}
Aggregator wait for all/enough messages to be arrived, before processing them all. Waiting for all messages requires a MessageStore, where the messages can be stored, while waiting for others to arrive. In a setup with multiple apps, it is important to have a MessageStore which is shared over all instances.
[Aggregator("aggregate-completed-items", "complete-order")]
public CompleteOrder AggregateOrder(IEnumerable<CompleteOrderItem> completeOrderItem)
{
return OrderService.CompleteOrder(completeOrderItem);
}
WireTaps are used to listen to Messages without interrupting the Flow (for testing/debugging).
// to be injected by dependency injection
IWireTapService wireTapService
IMessage result = null;
// first parameter: name of the channel to be tapped
// second parameter: method to be executed, when a message arrives (in this example it stores the message in a variable)
WireTapId id = _wireTapService.CreateWireTap("name_of_channel", async msg => result = msg);
...
// remove the wiretap, when you are finished, to reduce overhead.
_wireTapService.RemoveWireTap(id);
EIP: Messaging Channels are responsible to transport messages between the different components of the Enterprise Integration Pattern. The default channel used, is an InMemoryChannel invoking other components in the same application. By replacing such a channel with another implementations (e.g. RabbitMQ), distribution of different applications can be achieved.
see: EIP: Point to Point Channel
Is an InMemoryChannel allowing to connect two endpoints with eachother. The channel is One-to-One connection, directly moving the return value of one endpoint to the next endpoint.
This is the default channel type and also the fallback, if a channel is requested (for sending or receiving) and no channel has been previously registered, a new InMemoryChannel will be created.
Provides a simple channel implementation using RabbitMQ.
Registration of RabbitMQ
using EnterpriseIntegation.RabbitMQ;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.IO;
namespace EnterpriseIntegration.RabbitMQ.Tests;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
IConfigurationBuilder configBuilder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json");
IConfiguration config = configBuilder.Build();
services
.AddSingleton<ServiceActivatorFlow001>()
// Enable RabbitMQ Messaging based on json config
.WithRabbitMQMessaging(config)
// Provide the channel "001_world" via RabbitMQ
.WithRabbitMQChannel("001_world")
.UseEnterpriseIntegration();
}
}
The Registration of a RabbitMQChannel can be configured with additional parameters of the registration method.
Provides a simple channel implementation using Kafka.
Registration of Kafka
using EnterpriseIntegation.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Xunit.DependencyInjection.Logging;
namespace EnterpriseIntegration.Kafka.Tests;
public class Startup
{
public static void ConfigureServices(IServiceCollection services)
{
IConfigurationBuilder configBuilder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json");
IConfiguration config = configBuilder.Build();
services
.AddSingleton<ServiceActivatorFlow001>()
// Enable Kafka Messaging based on json config
.WithKafkaMessaging(config)
// Provide the channel "001_world" via Kafka and will try to create the topic if it doesn't exist already
.WithKafkaChannel("001_world", c => c.EnsureCreated = true)
.UseWireTap()
.UseEnterpriseIntegration();
}
}
The Registration of a KafkaChannel can be configured with additional parameters of the registration method.
Example of the configuration for Kafka (it leverages the base kafka config, so all kafka configs are possible).
{
"EnterpriseIntegration": {
"Kafka": {
"ConsumerConfig": {
"BootstrapServers": "127.0.0.1:29092",
"GroupId": "integration-test"
},
"ProducerConfig": {
"BootstrapServers": "127.0.0.1:29092"
}
}
}
}
If an Error/Exception happens immediately after handing a Message is pushed through the MessageGateway the Exception will be thrown to the calling Thread/Method. As soon as the first receiver handled the message, or the message has been pushed to an external channel (Kafka, RabbitMQ...) the flow handles the exception by forwarding it to the error channel.
If an Error/Exception happens during the flow, the FlowEngine catches the exception and forwards it to an error channel. The error channel can be defined via the message headers - if no error channel is defined, the default error channel is used; with the behaviour to log the exception.
IMessageHeaders headers = new MessageHeaders();
headers.WithErrorChannel("custom-error-channel");
IMessage message = new GenericMessage<ExamplePayload>(headers, ExamplePayload.CreateRandom());
await _messageGateway.SendMessage("flow-entry", message);
Common Errors and how to fix them:
Exception | How to solve |
---|---|
TooManyPayloadParameters | The method used as a flow node receiver has too many "payload" parameters. A method should have only one value parameter, which could be either any type or a parameter of type IMessage<> . In addition it is possible to have an IMessageHeaders injected. |
PayloadTransformation | The payload of a message did not match the parameter defined in the receiving method. Change the return type of the sending message, or change the parameter of the receiving message |
Integration Tests are using xUnit Dependencies to setup real applications with proper ServiceCollections to test the flows. Tests for Channels (e.g. RabbitMQ) are using Docker Images (Setup with FluentDocker) for testing.
Load Tests are setup with NBomber to run scenarios and measure their execution time. To make the starting and usage of the console app, the Framework Cocona has been used, to give a nice CLI feeling.
To run the Load tests the must be published
# generating a OS agnostic output
mkdir loadtests
cd loadtests
dotnet publish ..\tests\EnterpriseIntegration.LoadTests --output .
To execute the Load tests the published artifact can be started with a scenario parameter.
# running load test for simple scenario
EnterpriseIntegration.LoadTests.exe --scenario Simple
the report is by default generated into the folder reports