From 05a03e2446bf7509767f45361e2851fd93afa978 Mon Sep 17 00:00:00 2001 From: Louis Date: Sat, 8 Oct 2022 08:50:23 -0700 Subject: [PATCH] socket reuse URI argument (#107) --- README.md | 10 +++++++ cmd/goflow2/main.go | 70 +++++++++++++++++++++++++++------------------ 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index e25c894..807807d 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,16 @@ To partition the feed (any field of the protobuf is available), the following op -format.hash=SamplerAddress,DstAS ``` +By default, the collector will listen for IPFIX/NetFlow V9 on port 2055 +and sFlow on port 6343. +To change the sockets binding, you can set the `-listen` argument and a URI +for each protocol (`netflow`, `sflow` and `nfl` as scheme) separated by a comma. +For instance, to create 4 parallel sockets of sFlow and one of NetFlow V5, you can use: + +```bash +$ ./goflow2 -listen 'sflow://:6343?count=4,nfl://:2055' +``` + ### Docker You can also run directly with a container: diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index d241ad9..63345d4 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -115,6 +115,17 @@ func main() { if err != nil { log.Fatal(err) } + numSockets := 1 + if listenAddrUrl.Query().Has("count") { + if numSocketsTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("count"), 10, 64); err != nil { + log.Fatal(err) + } else { + numSockets = int(numSocketsTmp) + } + } + if numSockets == 0 { + numSockets = 1 + } hostname := listenAddrUrl.Hostname() port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64) @@ -127,40 +138,43 @@ func main() { "scheme": listenAddrUrl.Scheme, "hostname": hostname, "port": port, + "count": numSockets, } log.WithFields(logFields).Info("Starting collection") - if listenAddrUrl.Scheme == "sflow" { - sSFlow := &utils.StateSFlow{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), - Config: config, - } - err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort) - } else if listenAddrUrl.Scheme == "netflow" { - sNF := &utils.StateNetFlow{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), - Config: config, - } - err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort) - } else if listenAddrUrl.Scheme == "nfl" { - sNFL := &utils.StateNFLegacy{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), + for i := 0; i < numSockets; i++ { + if listenAddrUrl.Scheme == "sflow" { + sSFlow := &utils.StateSFlow{ + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + Config: config, + } + err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort) + } else if listenAddrUrl.Scheme == "netflow" { + sNF := &utils.StateNetFlow{ + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + Config: config, + } + err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort) + } else if listenAddrUrl.Scheme == "nfl" { + sNFL := &utils.StateNFLegacy{ + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + } + err = sNFL.FlowRoutine(*Workers, hostname, int(port), *ReusePort) + } else { + log.Errorf("scheme %s does not exist", listenAddrUrl.Scheme) + return } - err = sNFL.FlowRoutine(*Workers, hostname, int(port), *ReusePort) - } else { - log.Errorf("scheme %s does not exist", listenAddrUrl.Scheme) - return - } - if err != nil { - log.WithFields(logFields).Fatal(err) + if err != nil { + log.WithFields(logFields).Fatal(err) + } } }(listenAddress)