From 36dfa327b93e0b0adbfd90bb4eb9295c9a958e0e Mon Sep 17 00:00:00 2001 From: OrlandoCo Date: Tue, 15 Sep 2020 17:10:48 -0500 Subject: [PATCH] Add more control over senders (#172) * feat(senders): Add nack rate limiter * feat(senders): Add rate limit test * feat(senders): Add router config * feat(senders): Fix logging and naming consistency --- config.toml | 15 +++---- pkg/buffer.go | 2 +- pkg/buffer_test.go | 2 +- pkg/receiver.go | 15 ++++--- pkg/router.go | 13 ++++++ pkg/router_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++ pkg/sender.go | 34 +++++++-------- pkg/sender_test.go | 1 + pkg/sfu.go | 10 +++++ 9 files changed, 158 insertions(+), 35 deletions(-) diff --git a/config.toml b/config.toml index 737f96fc9..34f45df63 100644 --- a/config.toml +++ b/config.toml @@ -1,9 +1,12 @@ [router] -# pass bandwidth feeback to pub -rembfeedback = false -# Cap bandwidth feedback -minbandwidth = 100000 -maxbandwidth = 5000000 +# pass bandwidth feeback from subs to pubs +subrembfeedback = false +# Limit the remb bandwidth in kbps +# zero means no limits +maxbandwidth = 1000 +# Rate limit nack packets from senders to 1 nack per maxNackTime in seconds +# zero means no rate limit +maxnacktime = 1 [receiver.video] # the remb cycle sending to pub, this told the pub it's bandwidth @@ -12,8 +15,6 @@ rembcycle = 2 plicycle = 1 # transport-cc cycle in (ms) (experiment feature) tcccycle = 0 -# this limit the remb bandwidth -maxbandwidth = 1000 # max buffer time by ms maxbuffertime = 1000 diff --git a/pkg/buffer.go b/pkg/buffer.go index ee6c972e1..65eea8040 100644 --- a/pkg/buffer.go +++ b/pkg/buffer.go @@ -262,7 +262,7 @@ func (b *Buffer) GetLostRateBandwidth(cycle uint64) (float64, uint64) { byteRate := b.totalByte / cycle log.Tracef("Buffer.CalcLostRateByteRate b.receivedPkt=%d b.lostPkt=%d lostRate=%v byteRate=%v", b.receivedPkt, b.lostPkt, lostRate, byteRate) b.receivedPkt, b.lostPkt, b.totalByte = 0, 0, 0 - return lostRate, byteRate * 8 / 1000 + return lostRate, byteRate * 8 } // GetPacket get packet by sequence number diff --git a/pkg/buffer_test.go b/pkg/buffer_test.go index 782003079..fc5dd73d1 100644 --- a/pkg/buffer_test.go +++ b/pkg/buffer_test.go @@ -117,7 +117,7 @@ func TestBufferWithBufferTimeAndZeroSSRC(t *testing.T) { assert.Equal(t, uint16(10), nackPair.PacketID) lostRate, byteRate := buffer.GetLostRateBandwidth(uint64(12)) - + byteRate = uint64(byteRate / 1000) assert.Equal(t, lostRate, float64(0)) assert.Equal(t, byteRate, uint64(0)) diff --git a/pkg/receiver.go b/pkg/receiver.go index c1568d65f..25dd7f4ee 100644 --- a/pkg/receiver.go +++ b/pkg/receiver.go @@ -105,7 +105,7 @@ type WebRTCVideoReceiver struct { rtpExtInfoChan chan rtpExtInfo pliCycle int - maxBandwidth int + maxBandwidth uint64 feedback string } @@ -114,7 +114,6 @@ type WebRTCVideoReceiverConfig struct { REMBCycle int `mapstructure:"rembcycle"` PLICycle int `mapstructure:"plicycle"` TCCCycle int `mapstructure:"tcccycle"` - MaxBandwidth int `mapstructure:"maxbandwidth"` MaxBufferTime int `mapstructure:"maxbuffertime"` ReceiveRTPCycle int `mapstructure:"rtpcycle"` } @@ -139,7 +138,7 @@ func NewWebRTCVideoReceiver(ctx context.Context, config WebRTCVideoReceiverConfi rtcpCh: make(chan rtcp.Packet, maxSize), rtpExtInfoChan: make(chan rtpExtInfo, maxSize), pliCycle: pliCycle, - maxBandwidth: config.MaxBandwidth, + maxBandwidth: routerConfig.MaxBandwidth * 1000, } for _, feedback := range track.Codec().RTCPFeedback { @@ -294,20 +293,20 @@ func (v *WebRTCVideoReceiver) rembLoop(cycle int) { var bw uint64 switch { case v.lostRate == 0 && v.bandwidth == 0: - bw = uint64(v.maxBandwidth) + bw = v.maxBandwidth case v.lostRate >= 0 && v.lostRate < 0.1: - bw = uint64(v.bandwidth * 2) + bw = v.bandwidth * 2 default: bw = uint64(float64(v.bandwidth) * (1 - v.lostRate)) } - if bw > uint64(v.maxBandwidth) { - bw = uint64(v.maxBandwidth) + if bw > v.maxBandwidth && v.maxBandwidth > 0 { + bw = v.maxBandwidth } remb := &rtcp.ReceiverEstimatedMaximumBitrate{ SenderSSRC: v.buffer.GetSSRC(), - Bitrate: bw * 1000, + Bitrate: bw, SSRCs: []uint32{v.buffer.GetSSRC()}, } diff --git a/pkg/router.go b/pkg/router.go index 616b00814..eb96343e5 100644 --- a/pkg/router.go +++ b/pkg/router.go @@ -4,12 +4,16 @@ import ( "fmt" "io" "sync" + "sync/atomic" + "time" "github.com/pion/ion-sfu/pkg/log" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" ) +var routerConfig RouterConfig + // Router defines a track rtp/rtcp router type Router struct { tid string @@ -17,6 +21,7 @@ type Router struct { onCloseHandler func() receiver Receiver senders map[string]Sender + lastNack int64 } // NewRouter for routing rtp/rtcp packets @@ -25,6 +30,7 @@ func NewRouter(tid string, recv Receiver) *Router { tid: tid, receiver: recv, senders: make(map[string]Sender), + lastNack: time.Now().Unix(), } go r.start() @@ -125,6 +131,13 @@ func (r *Router) subFeedbackLoop(pid string, sub Sender) { continue } + if routerConfig.MaxNackTime > 0 { + ln := atomic.LoadInt64(&r.lastNack) + if (time.Now().Unix() - ln) < routerConfig.MaxNackTime { + continue + } + atomic.StoreInt64(&r.lastNack, time.Now().Unix()) + } // Packet not found, request from receiver nack := &rtcp.TransportLayerNack{ // origin ssrc diff --git a/pkg/router_test.go b/pkg/router_test.go index 8be3dbaae..9ad17dcf9 100644 --- a/pkg/router_test.go +++ b/pkg/router_test.go @@ -4,6 +4,9 @@ import ( "context" "math/rand" "testing" + "time" + + "github.com/pion/rtcp" "github.com/pion/transport/test" "github.com/pion/webrtc/v3" @@ -149,3 +152,101 @@ func TestRouterPartialReadCanClose(t *testing.T) { pubsfu.Close() pub.Close() } + +func TestSendersNackRateLimit(t *testing.T) { + report := test.CheckRoutines(t) + defer report() + + me := webrtc.MediaEngine{} + me.RegisterDefaultCodecs() + api := webrtc.NewAPI(webrtc.WithMediaEngine(me)) + pubsfu, pub, err := newPair(webrtc.Configuration{}, api) + assert.NoError(t, err) + + onTimeout, onTimeoutFunc := context.WithCancel(context.Background()) + track, err := pub.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion") + assert.NoError(t, err) + _, err = pub.AddTrack(track) + assert.NoError(t, err) + + ctx := context.Background() + done := make(chan bool) + pubsfu.OnTrack(func(track *webrtc.Track, _ *webrtc.RTPReceiver) { + receiver := NewWebRTCVideoReceiver(ctx, WebRTCVideoReceiverConfig{}, track) + router := NewRouter("id", receiver) + assert.Equal(t, router.receiver, receiver) + + subsfu, sub, err := newPair(webrtc.Configuration{}, api) + assert.NoError(t, err) + + ontrackFired := make(chan bool) + sub.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) { + out, err := track.ReadRTP() + assert.NoError(t, err) + + assert.Equal(t, []byte{0x10, 0x01, 0x02, 0x03, 0x04}, out.Payload) + close(ontrackFired) + }) + + subtrack, err := subsfu.NewTrack(webrtc.DefaultPayloadTypeVP8, track.SSRC(), "video", "pion") + assert.NoError(t, err) + + s, err := subsfu.AddTrack(subtrack) + assert.NoError(t, err) + + err = signalPair(subsfu, sub) + assert.NoError(t, err) + + subPid := "subpid" + sender := NewWebRTCSender(ctx, subtrack, s) + router.AddSender(subPid, sender) + assert.Len(t, router.senders, 1) + assert.Equal(t, sender, router.senders[subPid]) + assert.Contains(t, router.stats(), "track id:") + <-ontrackFired + routerConfig.MaxNackTime = 1 + timer := time.After(1100 * time.Millisecond) + nackCounter := 0 + router.lastNack = time.Now().Unix() + nckloop: + for { + select { + case <-timer: + onTimeoutFunc() + break nckloop + case pkt := <-receiver.rtcpCh: + if nck, ok := pkt.(*rtcp.TransportLayerNack); ok && nck.MediaSSRC == 456 { + nackCounter++ + } + default: + nack := &rtcp.TransportLayerNack{ + SenderSSRC: 123, + MediaSSRC: 456, + Nacks: []rtcp.NackPair{{PacketID: 789}}, + } + sender.rtcpCh <- nack + time.Sleep(25 * time.Millisecond) + } + } + assert.Equal(t, 1, nackCounter) + sub.Close() + router.close() + router.mu.RLock() + assert.Len(t, router.senders, 0) + router.mu.RUnlock() + <-sender.ctx.Done() + <-receiver.ctx.Done() + close(done) + + subsfu.Close() + }) + + err = signalPair(pub, pubsfu) + assert.NoError(t, err) + + sendRTPUntilDone(onTimeout.Done(), t, []*webrtc.Track{track}) + <-done + + pubsfu.Close() + pub.Close() +} diff --git a/pkg/sender.go b/pkg/sender.go index a93ccc21a..9feaae312 100644 --- a/pkg/sender.go +++ b/pkg/sender.go @@ -31,8 +31,8 @@ type WebRTCSender struct { sender *webrtc.RTPSender track *webrtc.Track rtcpCh chan rtcp.Packet - useRemb bool rembCh chan *rtcp.ReceiverEstimatedMaximumBitrate + maxBitrate uint64 target uint64 sendChan chan *rtp.Packet } @@ -41,21 +41,23 @@ type WebRTCSender struct { func NewWebRTCSender(ctx context.Context, track *webrtc.Track, sender *webrtc.RTPSender) *WebRTCSender { ctx, cancel := context.WithCancel(ctx) s := &WebRTCSender{ - ctx: ctx, - cancel: cancel, - sender: sender, - track: track, - rtcpCh: make(chan rtcp.Packet, maxSize), - rembCh: make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize), - sendChan: make(chan *rtp.Packet, maxSize), + ctx: ctx, + cancel: cancel, + sender: sender, + track: track, + maxBitrate: routerConfig.MaxBandwidth * 1000, + rtcpCh: make(chan rtcp.Packet, maxSize), + sendChan: make(chan *rtp.Packet, maxSize), } for _, feedback := range track.Codec().RTCPFeedback { switch feedback.Type { case webrtc.TypeRTCPFBGoogREMB: - log.Debugf("Using sender feedback %s", webrtc.TypeRTCPFBGoogREMB) - s.useRemb = true - go s.rembLoop() + if routerConfig.REMBFeedback { + log.Debugf("Using sender feedback %s", webrtc.TypeRTCPFBGoogREMB) + s.rembCh = make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize) + go s.rembLoop() + } case webrtc.TypeRTCPFBTransportCC: log.Debugf("Using sender feedback %s", webrtc.TypeRTCPFBTransportCC) // TODO @@ -149,7 +151,7 @@ func (s *WebRTCSender) receiveRTCP() { case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest, *rtcp.TransportLayerNack: s.rtcpCh <- pkt case *rtcp.ReceiverEstimatedMaximumBitrate: - if s.useRemb { + if s.rembCh != nil { s.rembCh <- pkt } } @@ -161,13 +163,9 @@ func (s *WebRTCSender) rembLoop() { lastRembTime := time.Now() maxRembTime := 200 * time.Millisecond rembMin := uint64(100000) - rembMax := uint64(5000000) if rembMin == 0 { rembMin = 10000 // 10 KBit } - if rembMax == 0 { - rembMax = 100000000 // 100 MBit - } var lowest uint64 = math.MaxUint64 var rembCount, rembTotalRate uint64 @@ -191,8 +189,8 @@ func (s *WebRTCSender) rembLoop() { if s.target < rembMin { s.target = rembMin - } else if s.target > rembMax { - s.target = rembMax + } else if s.target > s.maxBitrate && s.maxBitrate > 0 { + s.target = s.maxBitrate } newPkt := &rtcp.ReceiverEstimatedMaximumBitrate{ diff --git a/pkg/sender_test.go b/pkg/sender_test.go index 9cee97c46..4a5ebd17b 100644 --- a/pkg/sender_test.go +++ b/pkg/sender_test.go @@ -179,6 +179,7 @@ func TestSenderRTCPREMBForwarding(t *testing.T) { report := test.CheckRoutines(t) defer report() + routerConfig.REMBFeedback = true rtcpfb = []webrtc.RTCPFeedback{ {Type: webrtc.TypeRTCPFBGoogREMB}, } diff --git a/pkg/sfu.go b/pkg/sfu.go index 5597bd7e3..9567a2c26 100644 --- a/pkg/sfu.go +++ b/pkg/sfu.go @@ -35,6 +35,14 @@ type Config struct { WebRTC WebRTCConfig `mapstructure:"webrtc"` Log log.Config `mapstructure:"log"` Receiver ReceiverConfig `mapstructure:"receiver"` + Router RouterConfig `mapstructure:"router"` +} + +// RouterConfig defines router configurations +type RouterConfig struct { + REMBFeedback bool `mapstructure:"rembfeedback"` + MaxBandwidth uint64 `mapstructure:"maxbandwidth"` + MaxNackTime int64 `mapstructure:"maxnacktime"` } // SFU represents an sfu instance @@ -56,6 +64,8 @@ func NewSFU(c Config) *SFU { setting: webrtc.SettingEngine{}, receiver: c.Receiver, } + // Init router config + routerConfig = c.Router log.Init(c.Log.Level, c.Log.Fix)