Skip to content

Commit

Permalink
socket reuse URI argument (cloudflare#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn authored Oct 8, 2022
1 parent 57fad2e commit 05a03e2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ To partition the feed (any field of the protobuf is available), the following op
-format.hash=SamplerAddress,DstAS
```

By default, the collector will listen for IPFIX/NetFlow V9 on port 2055
and sFlow on port 6343.
To change the sockets binding, you can set the `-listen` argument and a URI
for each protocol (`netflow`, `sflow` and `nfl` as scheme) separated by a comma.
For instance, to create 4 parallel sockets of sFlow and one of NetFlow V5, you can use:

```bash
$ ./goflow2 -listen 'sflow://:6343?count=4,nfl://:2055'
```

### Docker

You can also run directly with a container:
Expand Down
70 changes: 42 additions & 28 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ func main() {
if err != nil {
log.Fatal(err)
}
numSockets := 1
if listenAddrUrl.Query().Has("count") {
if numSocketsTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("count"), 10, 64); err != nil {
log.Fatal(err)
} else {
numSockets = int(numSocketsTmp)
}
}
if numSockets == 0 {
numSockets = 1
}

hostname := listenAddrUrl.Hostname()
port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64)
Expand All @@ -127,40 +138,43 @@ func main() {
"scheme": listenAddrUrl.Scheme,
"hostname": hostname,
"port": port,
"count": numSockets,
}

log.WithFields(logFields).Info("Starting collection")

if listenAddrUrl.Scheme == "sflow" {
sSFlow := &utils.StateSFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
}
err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "netflow" {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
}
err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "nfl" {
sNFL := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
for i := 0; i < numSockets; i++ {
if listenAddrUrl.Scheme == "sflow" {
sSFlow := &utils.StateSFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
}
err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "netflow" {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
}
err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "nfl" {
sNFL := &utils.StateNFLegacy{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
}
err = sNFL.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else {
log.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
return
}
err = sNFL.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else {
log.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
return
}

if err != nil {
log.WithFields(logFields).Fatal(err)
if err != nil {
log.WithFields(logFields).Fatal(err)
}
}

}(listenAddress)
Expand Down

0 comments on commit 05a03e2

Please sign in to comment.