The HAProxy Stream Processing Offload Protocol (SPOP) allows traffic data to be streamed to an external agent. This library allows you to write that agent using .NET Core.
For more information, read the blog post Extending HAProxy with the Stream Processing Offload Engine.
Capability | Supported |
---|---|
fragmentation | yes |
async | no |
pipelining | no |
Add a reference to the NuGet package using the .NET CLI:
$ dotnet add package HAProxy.StreamProcessingOffload.Agent --version 1.0.5
Or use the NuGet Package Manager:
PS> Install-Package HAProxy.StreamProcessingOffload.Agent -Version 1.0.5
- Download and install the .NET Core SDK.
- Create a new folder and
cd
into it. - Create a new .NET Core project:
$ dotnet new console
- Add a reference to the package:
$ dotnet add package HAProxy.StreamProcessingOffload.Agent --version 1.0.5
- Edit Program.cs to start a service listening on an IP and port. Create an instance of
FrameProcessor
and callHandleStreamAsync
to process SPOP messages received on new network connections. This method accepts aNetworkStream
and a callback function that takes aNotifyFrame
and returnsList<SpoeAction>
.
Program.cs:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using HAProxy.StreamProcessingOffload.Agent;
using HAProxy.StreamProcessingOffload.Agent.Actions;
using HAProxy.StreamProcessingOffload.Agent.Payloads;
namespace Agent
{
class Program
{
static void Main(string[] args)
{
IPAddress address = IPAddress.Parse("0.0.0.0");
int port = 12345;
var listener = new TcpListener(address, port);
var frameProcessor = new FrameProcessor()
{
EnableLogging = true,
LogFunc = (msg) => Console.WriteLine(msg)
};
listener.Start();
Console.WriteLine("Listening on {0}:{1}", address, port);
while (true)
{
TcpClient client = listener.AcceptTcpClient();
Task.Run(async () =>
{
NetworkStream stream = client.GetStream();
// Cancel stream when process terminates
System.AppDomain.CurrentDomain.ProcessExit += async (sender, e) =>
{
await frameProcessor.CancelStreamAsync(stream);
};
await frameProcessor.HandleStreamAsync(stream, async (notifyFrame) =>
{
// NOTIFY frames contain HAProxy messages to the agent.
// The agent can send back "actions" to HAProxy via ACK frames.
var messages = ((ListOfMessagesPayload)notifyFrame.Payload).Messages;
var responseActions = new List<SpoeAction>();
if (messages.Any(msg => msg.Name == "my-message-name"))
{
var myMessage = messages.First(msg => msg.Name == "my-message-name");
// Each message may contain a collection of arguments, which hold the data.
TypedData myArg = myMessage.Args.First(arg => arg.Key == "ip").Value;
// simulate a non-blocking API call that gets the IP score
// and takes 1 second
await Task.Delay(1000);
int ip_score = 10;
if ((string)myArg.Value == "192.168.50.1")
{
ip_score = 20;
}
// You can send actions back to HAProxy, such as setting a variable.
SpoeAction setVar =
new SetVariableAction(
VariableScope.Session,
"ip_score",
new TypedData(DataType.Int32, ip_score));
responseActions.Add(setVar);
}
return responseActions;
});
});
}
}
}
}
All communication is handled by the FrameProcessor
class. Its HandleStreamAsync
method takes a NetworkStream
object and a lambda function as parameters. The lambda function is called when a NOTIFY frame is received. From there, you can return a list of SpoeAction
objects, which tell HAProxy to perform action(s).
Actions:
SetVariableAction
- set a variable in HAProxyUnsetVariableAction
- unset a variable
When setting a variable, you must pass in the variable scope (Process
, Session
, Transaction
, Request
, or Response
). Then, set a name for the variable and its value as a TypedData
instance.
SpoeAction setVar =
new SetVariableAction(
VariableScope.Session, // scope
"ip_score", // name
new TypedData(DataType.Int32, ip_score)); // value
Both actions and arguments received in messages from HAProxy are represented as TypedData
. This is simply a container for different types of data. DataType
can be:
- Null
- Boolean
- Int32
- Uint32
- Int64
- Uint64
- Ipv4
- Ipv6
- String
- Binary
For message arguments, the type is set for you by HAProxy depending on the fetch method used. For example, the src
fetch method is Ipv4
or Ipv6
.
There can be multiple messages within a NOTIFY frame, so you should check for a message name to find the right one:
var messages = ((ListOfMessagesPayload)notifyFrame.Payload).Messages;
if (messages.Any(msg => msg.Name == "my-message-name"))
{
// perform logic
}
Here is an example spoe.conf file that defines the messages and arguments HAProxy will send to the agent:
[my-spoe]
spoe-agent my-agent
messages my-message-name
option var-prefix myspoe
log global
use-backend be_agents
timeout hello 10s
timeout idle 10s
timeout processing 10s
spoe-message my-message-name
args ip=src anotherarg=str(abc)
event on-frontend-http-request if { path / }
In your haproxy.cfg, the SPOE can be initialized by adding a filter spoe
line:
frontend web
bind :80
filter spoe engine my-spoe config /etc/haproxy/spoe.conf
http-request set-header "ip_score" %[var(sess.myspoe.ip_score)]
default_backend servers
backend servers
balance roundrobin
server s1 127.0.0.1:8080 check maxconn 30
backend be_agents
mode tcp
balance roundrobin
option spop-check
server agent1 127.0.0.1:12345 check inter 10s maxconn 30
To build and run the unit tests, use the following commands
$ dotnet clean
$ dotnet build
$ dotnet test