From 4bf0d100795726556ac5d3f098182fcb1ed957c9 Mon Sep 17 00:00:00 2001 From: "Dr. Ralf S. Engelschall" Date: Wed, 8 Nov 2023 11:20:16 +0100 Subject: [PATCH] metrics: add paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent (#2620) (#2619) (#2629) * add missing Prometheus exports (#2620, #2619): paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent * protect Stream.BytesSent() * add tests --------- Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com> --- README.md | 6 + apidocs/openapi.yaml | 3 + internal/core/api_test.go | 2 + internal/core/core.go | 1 + internal/core/metrics.go | 25 ++++ internal/core/metrics_test.go | 197 ++++++++++++++++++++++++------- internal/core/path.go | 6 + internal/core/srt_server.go | 7 ++ internal/defs/api.go | 1 + internal/stream/stream.go | 17 +++ internal/stream/stream_format.go | 5 +- 11 files changed, 226 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index f0192c2ffef..7db9f2d821e 100644 --- a/README.md +++ b/README.md @@ -1450,6 +1450,7 @@ Obtaining: # metrics of every path paths{name="[path_name]",state="[state]"} 1 paths_bytes_received{name="[path_name]",state="[state]"} 1234 +paths_bytes_sent{name="[path_name]",state="[state]"} 1234 # metrics of every HLS muxer hls_muxers{name="[name]"} 1 @@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1 rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234 rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187 +# metrics of every SRT connection +srt_conns{id="[id]",state="[state]"} 1 +srt_conns_bytes_received{id="[id]",state="[state]"} 1234 +srt_conns_bytes_sent{id="[id]",state="[state]"} 187 + # metrics of every WebRTC session webrtc_sessions{id="[id]"} 1 webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234 diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 4db07544f23..cfb760db479 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -395,6 +395,9 @@ components: bytesReceived: type: integer format: int64 + bytesSent: + type: integer + format: int64 readers: type: array items: diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 05278bce0ae..7964c8cac40 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) { Ready bool `json:"ready"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` } type pathList struct { @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) { Ready bool `json:"Ready"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` } var pathName string diff --git a/internal/core/core.go b/internal/core/core.go index 793351acf63..c9d21fee6e6 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, + p.metrics, p.pathManager, p, ) diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 1b3460f8533..8ff2132a825 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -32,6 +32,7 @@ type metrics struct { rtspServer apiRTSPServer rtspsServer apiRTSPServer rtmpServer apiRTMPServer + srtServer apiSRTServer hlsManager apiHLSManager webRTCManager apiWebRTCManager } @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}" out += metric("paths", tags, 1) out += metric("paths_bytes_received", tags, int64(i.BytesReceived)) + out += metric("paths_bytes_sent", tags, int64(i.BytesSent)) } } else { out += metric("paths", "", 0) @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } } + if !interfaceIsEmpty(m.srtServer) { + data, err := m.srtServer.apiConnsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { + tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}" + out += metric("srt_conns", tags, 1) + out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived)) + out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent)) + } + } else { + out += metric("srt_conns", "", 0) + out += metric("srt_conns_bytes_received", "", 0) + out += metric("srt_conns_bytes_sent", "", 0) + } + } + if !interfaceIsEmpty(m.webRTCManager) { data, err := m.webRTCManager.apiSessionsList() if err == nil && len(data.Items) != 0 { @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) { m.rtmpServer = s } +// srtServerSet is called by srtServer. +func (m *metrics) srtServerSet(s apiSRTServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.srtServer = s +} + // webRTCManagerSet is called by webRTCManager. func (m *metrics) webRTCManagerSet(s apiWebRTCManager) { m.mutex.Lock() diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 0b7a123f79b..824234e9fc8 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -1,20 +1,27 @@ package core import ( + "bufio" + "context" "crypto/tls" "net" "net/http" "net/url" "os" + "sync" "testing" "time" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + "github.com/pion/rtp" "github.com/stretchr/testify/require" "github.com/bluenviron/mediamtx/internal/protocols/rtmp" + "github.com/bluenviron/mediamtx/internal/protocols/webrtc" ) func TestMetrics(t *testing.T) { @@ -60,48 +67,133 @@ rtsps_sessions_bytes_sent 0 rtmp_conns 0 rtmp_conns_bytes_received 0 rtmp_conns_bytes_sent 0 +srt_conns 0 +srt_conns_bytes_received 0 +srt_conns_bytes_sent 0 webrtc_sessions 0 webrtc_sessions_bytes_received 0 webrtc_sessions_bytes_sent 0 `, string(bo)) - medi := testMediaH264 - - source := gortsplib.Client{} - err = source.StartRecording("rtsp://localhost:8554/rtsp_path", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source.Close() - - source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} - err = source2.StartRecording("rtsps://localhost:8322/rtsps_path", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source2.Close() - - u, err := url.Parse("rtmp://localhost:1935/rtmp_path") - require.NoError(t, err) - - nconn, err := net.Dial("tcp", u.Host) - require.NoError(t, err) - defer nconn.Close() - - conn, err := rtmp.NewClientConn(nconn, u, true) - require.NoError(t, err) - - videoTrack := &format.H264{ - PayloadTyp: 96, - SPS: []byte{ // 1920x1080 baseline - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, - }, - PPS: []byte{0x08, 0x06, 0x07, 0x08}, - PacketizationMode: 1, - } - - _, err = rtmp.NewWriter(conn, videoTrack, nil) - require.NoError(t, err) + terminate := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(5) + + go func() { + defer wg.Done() + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/rtsp_path", + &description.Session{Medias: []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{testFormatH264}, + }}}) + require.NoError(t, err) + defer source.Close() + <-terminate + }() + + go func() { + defer wg.Done() + source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} + err := source2.StartRecording("rtsps://localhost:8322/rtsps_path", + &description.Session{Medias: []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{testFormatH264}, + }}}) + require.NoError(t, err) + defer source2.Close() + <-terminate + }() + + go func() { + defer wg.Done() + u, err := url.Parse("rtmp://localhost:1935/rtmp_path") + require.NoError(t, err) + + nconn, err := net.Dial("tcp", u.Host) + require.NoError(t, err) + defer nconn.Close() + + conn, err := rtmp.NewClientConn(nconn, u, true) + require.NoError(t, err) + + _, err = rtmp.NewWriter(conn, testFormatH264, nil) + require.NoError(t, err) + <-terminate + }() + + go func() { + defer wg.Done() + + su, err := url.Parse("http://localhost:8889/webrtc_path/whip") + require.NoError(t, err) + + s := &webrtc.WHIPClient{ + HTTPClient: &http.Client{Transport: &http.Transport{}}, + URL: su, + } + + tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil) + require.NoError(t, err) + defer checkClose(t, s.Close) + + err = tracks[0].WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1}, + }) + require.NoError(t, err) + <-terminate + }() + + go func() { + defer wg.Done() + + srtConf := srt.DefaultConfig() + address, err := srtConf.UnmarshalURL("srt://localhost:8890?streamid=publish:srt_path") + require.NoError(t, err) + + err = srtConf.Validate() + require.NoError(t, err) + + publisher, err := srt.Dial("srt", address, srtConf) + require.NoError(t, err) + defer publisher.Close() + + track := &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(publisher) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + { // IDR + 0x05, 1, + }, + }) + require.NoError(t, err) + + err = bw.Flush() + require.NoError(t, err) + <-terminate + }() time.Sleep(500 * time.Millisecond) @@ -109,11 +201,24 @@ webrtc_sessions_bytes_sent 0 require.Regexp(t, `^paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ `hls_muxers\{name=".*?"\} 1`+"\n"+ `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ `hls_muxers\{name=".*?"\} 1`+"\n"+ @@ -135,9 +240,15 @@ webrtc_sessions_bytes_sent 0 `rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+ `rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ `rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `webrtc_sessions 0`+"\n"+ - `webrtc_sessions_bytes_received 0`+"\n"+ - `webrtc_sessions_bytes_sent 0`+"\n"+ + `srt_conns\{id=".*?",state="publish"\} 1`+"\n"+ + `srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+ + `webrtc_sessions\{id=".*?"\} 1`+"\n"+ + `webrtc_sessions_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ + `webrtc_sessions_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ "$", string(bo)) + + close(terminate) + wg.Wait() } diff --git a/internal/core/path.go b/internal/core/path.go index 27a419d9d00..1679690660a 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { } return pa.stream.BytesReceived() }(), + BytesSent: func() uint64 { + if pa.stream == nil { + return 0 + } + return pa.stream.BytesSent() + }(), Readers: func() []defs.APIPathSourceOrReader { ret := []defs.APIPathSourceOrReader{} for r := range pa.readers { diff --git a/internal/core/srt_server.go b/internal/core/srt_server.go index 2c482d13c02..b8876323563 100644 --- a/internal/core/srt_server.go +++ b/internal/core/srt_server.go @@ -67,6 +67,7 @@ type srtServer struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool + metrics *metrics pathManager *pathManager parent srtServerParent @@ -96,6 +97,7 @@ func newSRTServer( runOnConnectRestart bool, runOnDisconnect string, externalCmdPool *externalcmd.Pool, + metrics *metrics, pathManager *pathManager, parent srtServerParent, ) (*srtServer, error) { @@ -120,6 +122,7 @@ func newSRTServer( runOnConnectRestart: runOnConnectRestart, runOnDisconnect: runOnDisconnect, externalCmdPool: externalCmdPool, + metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -136,6 +139,10 @@ func newSRTServer( s.Log(logger.Info, "listener opened on "+address+" (UDP)") + if s.metrics != nil { + s.metrics.srtServerSet(s) + } + newSRTListener( s.ln, &s.wg, diff --git a/internal/defs/api.go b/internal/defs/api.go index bee536ce324..f0f9150bc80 100644 --- a/internal/defs/api.go +++ b/internal/defs/api.go @@ -35,6 +35,7 @@ type APIPath struct { ReadyTime *time.Time `json:"readyTime"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` Readers []APIPathSourceOrReader `json:"readers"` } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 31aab9867f5..274290394bd 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -24,6 +24,7 @@ type Stream struct { desc *description.Session bytesReceived *uint64 + bytesSent *uint64 smedias map[*description.Media]*streamMedia mutex sync.RWMutex rtspStream *gortsplib.ServerStream @@ -40,6 +41,7 @@ func New( s := &Stream{ desc: desc, bytesReceived: new(uint64), + bytesSent: new(uint64), } s.smedias = make(map[*description.Media]*streamMedia) @@ -75,6 +77,21 @@ func (s *Stream) BytesReceived() uint64 { return atomic.LoadUint64(s.bytesReceived) } +// BytesSent returns sent bytes. +func (s *Stream) BytesSent() uint64 { + s.mutex.RLock() + defer s.mutex.RUnlock() + + bytesSent := atomic.LoadUint64(s.bytesSent) + if s.rtspStream != nil { + bytesSent += s.rtspStream.BytesSent() + } + if s.rtspsStream != nil { + bytesSent += s.rtspsStream.BytesSent() + } + return bytesSent +} + // RTSPStream returns the RTSP stream. func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream { s.mutex.Lock() diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 0899a9e784b..8b8832852bb 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -85,7 +85,9 @@ func (sf *streamFormat) writeRTPPacket( } func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) { - atomic.AddUint64(s.bytesReceived, unitSize(u)) + size := unitSize(u) + + atomic.AddUint64(s.bytesReceived, size) if s.rtspStream != nil { for _, pkt := range u.GetRTPPackets() { @@ -102,6 +104,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni for writer, cb := range sf.readers { ccb := cb writer.Push(func() error { + atomic.AddUint64(s.bytesSent, size) return ccb(u) }) }