From 1f554e739c243426a59c8090b3d4d5bb3692059e Mon Sep 17 00:00:00 2001 From: Alexander Lopintsev Date: Fri, 2 Feb 2024 21:15:49 +0300 Subject: [PATCH] Add prometheus metrics, improve errors handling --- stress/hammer/hammer.go | 39 ++++++++++++++++++++++++-------- stress/hammer/metrics.go | 49 ++++++++++++++++++++++++++++++++++++++++ stress/stress.go | 25 +++++++++++++++++--- 3 files changed, 100 insertions(+), 13 deletions(-) create mode 100644 stress/hammer/metrics.go diff --git a/stress/hammer/hammer.go b/stress/hammer/hammer.go index 71a104ae..b7cd4da1 100644 --- a/stress/hammer/hammer.go +++ b/stress/hammer/hammer.go @@ -46,10 +46,11 @@ type Packet struct { // IPFIX represents IPFIX packet generator type IPFIX struct { - conn mirror.Conn - ch chan Packet - srcs []net.IP - vflow net.IP + conn mirror.Conn + ch chan Packet + srcs []net.IP + vflow net.IP + metrics *MetricsIPFIX MaxRouter int Tick time.Duration @@ -59,10 +60,11 @@ type IPFIX struct { // SFlow represents SFlow packet generator type SFlow struct { - conn mirror.Conn - ch chan Packet - srcs []net.IP - vflow net.IP + conn mirror.Conn + ch chan Packet + srcs []net.IP + vflow net.IP + metrics *MetricsSFlow MaxRouter int Port int @@ -81,6 +83,7 @@ func NewIPFIX(raddr net.IP) (*IPFIX, error) { conn: conn, ch: make(chan Packet, 10000), vflow: raddr, + metrics: NewMetricsIPFIX(), MaxRouter: 10, RateLimit: 25 * 10e3, }, nil @@ -100,7 +103,12 @@ func (i *IPFIX) Run() { defer wg.Done() for { p = <-i.ch - i.conn.Send(p.payload[:p.length]) + if err := i.conn.Send(p.payload[:p.length]); err != nil { + i.metrics.sendErr.Inc() + } else { + i.metrics.sendOK.Inc() + } + } }() @@ -141,6 +149,7 @@ func (i *IPFIX) sendData() { for j := range packets { <-throttle i.ch <- packets[j] + i.metrics.sendData.Inc() } } } @@ -151,11 +160,13 @@ func (i *IPFIX) sendTemplate() { for j := range packets { i.ch <- packets[j] + i.metrics.sendTemplate.Inc() } for range c { for j := range packets { i.ch <- packets[j] + i.metrics.sendTemplate.Inc() } } } @@ -166,11 +177,13 @@ func (i *IPFIX) sendTemplateOpt() { for j := range packets { i.ch <- packets[j] + i.metrics.sendTemplateOpt.Inc() } for range c { for j := range packets { i.ch <- packets[j] + i.metrics.sendTemplateOpt.Inc() } } } @@ -235,6 +248,7 @@ func NewSFlow(raddr net.IP) (*SFlow, error) { conn: conn, ch: make(chan Packet, 10000), vflow: raddr, + metrics: NewMetricsSFlow(), MaxRouter: 10, RateLimit: 25 * 10e3, }, nil @@ -254,7 +268,11 @@ func (s *SFlow) Run() { defer wg.Done() for { p = <-s.ch - s.conn.Send(p.payload[:p.length]) + if err := s.conn.Send(p.payload[:p.length]); err != nil { + s.metrics.sendErr.Inc() + } else { + s.metrics.sendOK.Inc() + } } }() @@ -314,6 +332,7 @@ func (s *SFlow) sendData() { for j := range packets { <-throttle s.ch <- packets[j] + s.metrics.sendData.Inc() } } } diff --git a/stress/hammer/metrics.go b/stress/hammer/metrics.go new file mode 100644 index 00000000..df9a9cbc --- /dev/null +++ b/stress/hammer/metrics.go @@ -0,0 +1,49 @@ +package hammer + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type MetricsIPFIX struct { + sendOK prometheus.Counter + sendErr prometheus.Counter + sendTemplate prometheus.Counter + sendTemplateOpt prometheus.Counter + sendData prometheus.Counter +} + +func NewMetricsIPFIX() *MetricsIPFIX { + m := MetricsIPFIX{ + sendOK: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ipfix_send_ok"}), + sendErr: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ipfix_send_err"}), + sendTemplate: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ipfix_send_template"}), + sendTemplateOpt: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ipfix_send_template_opt"}), + sendData: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ipfix_send_data"}), + } + prometheus.MustRegister(m.sendOK, m.sendErr, m.sendTemplate, m.sendTemplateOpt, m.sendData) + return &m +} + +type MetricsSFlow struct { + sendOK prometheus.Counter + sendErr prometheus.Counter + sendData prometheus.Counter +} + +func NewMetricsSFlow() *MetricsSFlow { + m := MetricsSFlow{ + sendOK: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sflow_send_ok"}), + sendErr: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sflow_send_err"}), + sendData: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "sflow_send_data"}), + } + prometheus.MustRegister(m.sendOK, m.sendErr, m.sendData) + return &m +} diff --git a/stress/stress.go b/stress/stress.go index 21f1d34a..6fe638ff 100644 --- a/stress/stress.go +++ b/stress/stress.go @@ -24,18 +24,24 @@ package main import ( "flag" + "fmt" "log" "net" + "net/http" "sync" "time" "github.com/EdgeCast/vflow/stress/hammer" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" ) var opts = struct { vflowAddr string ipfixPort int sflowPort int + prometheusPort int ipfixTick string ipfixRateLimit int sflowRateLimit int @@ -43,6 +49,7 @@ var opts = struct { "127.0.0.1", 4739, 6343, + 8090, "10s", 25000, 25000, @@ -51,6 +58,7 @@ var opts = struct { func init() { flag.IntVar(&opts.ipfixPort, "ipfix-port", opts.ipfixPort, "ipfix port number") flag.IntVar(&opts.sflowPort, "sflow-port", opts.sflowPort, "sflow port number") + flag.IntVar(&opts.prometheusPort, "prometheus-port", opts.prometheusPort, "prometheus port number") flag.StringVar(&opts.ipfixTick, "ipfix-interval", opts.ipfixTick, "ipfix template interval in seconds") flag.IntVar(&opts.ipfixRateLimit, "ipfix-rate-limit", opts.ipfixRateLimit, "ipfix rate limit packets per second") flag.IntVar(&opts.sflowRateLimit, "sflow-rate-limit", opts.sflowRateLimit, "sflow rate limit packets per second") @@ -65,11 +73,19 @@ func main() { vflow = net.ParseIP(opts.vflowAddr) ) + prometheus.Unregister(collectors.NewGoCollector()) + http.Handle("/metrics", promhttp.Handler()) + go func() { + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", opts.prometheusPort), nil)) + }() + wg.Add(1) go func() { - var err error defer wg.Done() - ipfix, _ := hammer.NewIPFIX(vflow) + ipfix, err := hammer.NewIPFIX(vflow) + if err != nil { + log.Fatalf("got error while NewIPFIX, %v", err) + } ipfix.Port = opts.ipfixPort ipfix.Tick, err = time.ParseDuration(opts.ipfixTick) ipfix.RateLimit = opts.ipfixRateLimit @@ -82,7 +98,10 @@ func main() { wg.Add(1) go func() { defer wg.Done() - sflow, _ := hammer.NewSFlow(vflow) + sflow, err := hammer.NewSFlow(vflow) + if err != nil { + log.Fatalf("got error while NewSFlow, %v", err) + } sflow.Port = opts.sflowPort sflow.RateLimit = opts.sflowRateLimit sflow.Run()