Skip to content

Commit

Permalink
recordtester: Fix session checks for catalyst recording tests (#293)
Browse files Browse the repository at this point in the history
* recordtester: Check the right number of sessions

* recordtester: Allow configuring recording wait time

* recordtester: Test both streamed sessions

* recordtester: Fix duration checks (no double anymore)

* recordtester: Stop doubling testDuration var

That was making the checks for the double size of the recording

* rt: Create helper var and comment about pauseDuration

* rt: Remove all commented out code
  • Loading branch information
victorges authored May 19, 2023
1 parent 1cfbbd7 commit 20a2379
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 82 deletions.
5 changes: 4 additions & 1 deletion cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func main() {
vodImportUrl := fs.String("vod-import-url", "https://storage.googleapis.com/lp_testharness_assets/bbb_sunflower_1080p_30fps_normal_2min.mp4", "URL for VOD import")
continuousTest := fs.Duration("continuous-test", 0, "Do continuous testing")
useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP")
forceRecordingUrl := fs.Bool("force-recording-url", false, "Whether to force the API to return a recording URL (skip the user session timeout)")
recordingWaitTime := fs.Duration("recording-wait-time", 6*time.Minute+20*time.Second, "How long to wait after the stream ends before checking for recording")
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
testLive := fs.Bool("live", false, "Check Live workflow")
Expand Down Expand Up @@ -330,7 +332,8 @@ func main() {
Analyzers: lanalyzers,
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
UseForceURL: true,
UseForceURL: *forceRecordingUrl,
RecordingWaitTime: *recordingWaitTime,
UseHTTP: *useHttp,
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
Expand Down
137 changes: 56 additions & 81 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
Ingest *api.Ingest
RecordObjectStoreId string
UseForceURL bool
RecordingWaitTime time.Duration
UseHTTP bool
TestMP4 bool
TestStreamHealth bool
Expand All @@ -63,6 +64,7 @@ type (
ingest *api.Ingest
recordObjectStoreId string
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
mp4 bool
streamHealth bool
Expand All @@ -86,6 +88,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
Expand All @@ -110,28 +113,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
apiTry++
continue
}
// exit(255, fileName, *fileArg, err)
return 255, err
}
break
}
apiTry = 0
glog.Infof("Got broadcasters: %+v", broadcasters)
fmt.Printf("Streaming video file '%s'\n", fileName)
/*
httpIngestURLTemplates := make([]string, 0, len(broadcasters))
for _, b := range broadcasters {
httpIngestURLTemplates = append(httpIngestURLTemplates, fmt.Sprintf("%s/live/%%s", b))
}
*/

if rt.useHTTP && len(broadcasters) == 0 {
// exit(254, fileName, *fileArg, errors.New("Empty list of broadcasters"))
return 254, errors.New("empty list of broadcasters")
} else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" {
return 254, errors.New("empty ingest URLs")
// exit(254, fileName, *fileArg, errors.New("Empty list of ingests"))
}
// glog.Infof("All cool!")

hostName, _ := os.Hostname()
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
Expand All @@ -143,7 +138,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
continue
}
glog.Errorf("Error creating stream using Livepeer API: %v", err)
// exit(253, fileName, *fileArg, err)
return 253, err
}
break
Expand All @@ -152,12 +146,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
rt.streamID = stream.ID
rt.stream = stream
messenger.SendMessage(fmt.Sprintf(":information_source: Created stream id=%s", stream.ID))
// createdAPIStreams = append(createdAPIStreams, stream.ID)
glog.V(model.VERBOSE).Infof("Created Livepeer stream id=%s streamKey=%s playbackId=%s name=%s", stream.ID, stream.StreamKey, stream.PlaybackID, streamName)
// glog.Infof("Waiting 5 second for stream info to propagate to the Postgres replica")
// time.Sleep(5 * time.Second)
rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey)
// rtmpURL = fmt.Sprintf("%s/%s", ingests[0].Ingest, stream.ID)

testerFuncs := []testers.StartTestFunc{}
if rt.streamHealth {
Expand All @@ -166,6 +156,10 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
})
}

// when pauseDuration is set, we will stream the same file twice sleeping for
// the specified duration in between each.
streamTwice := pauseDuration > 0

mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingest.Playback, stream.PlaybackID)
if rt.serfOpts.UseSerf {
index := 0
Expand All @@ -183,22 +177,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
glog.Warningf("Streaming returned error err=%v", sterr)
return 3, err
}
if pauseDuration > 0 {
if streamTwice {
glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration)
time.Sleep(pauseDuration)
sterr = rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream)
if sterr != nil {
glog.Warningf("Second time streaming returned error err=%v", sterr)
return 3, err
}
testDuration *= 2
}
} else {

sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...)
sr2.StartStreaming(fileName, rtmpURL, mediaURL, 2*time.Minute, testDuration)
// <-sr2.Done()
srerr := sr2.Err()

glog.Infof("Streaming stream id=%s done err=%v", stream.ID, srerr)
var re *testers.RTMPError
if errors.As(srerr, &re) {
Expand All @@ -217,7 +210,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
if pauseDuration > 0 {
if streamTwice {
glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration)
time.Sleep(pauseDuration)
sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...)
Expand All @@ -242,7 +235,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
testDuration *= 2
}
}
if err := rt.isCancelled(); err != nil {
Expand All @@ -254,32 +246,31 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
sessions, err := rt.lapi.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions for stream id=%s err=%v", stream.ID, err)
// exit(252, fileName, *fileArg, err)
return 252, err
}
glog.Infof("Sessions: %+v", sessions)
sessionsLength := len(sessions)
if sessionsLength == 2 {
// We often see failures for 2 sessions, this should be fixed once we move to catalyst recording but for now
// we want to ignore these to reduce the alert noise
return 0, nil
} else if sessionsLength != 1 {
err := fmt.Errorf("invalid session count, got %d", sessionsLength)

expectedSessions := 1
if streamTwice {
expectedSessions = 2
}

if len(sessions) != expectedSessions {
err := fmt.Errorf("invalid session count, expected %d but got %d",
expectedSessions, len(sessions))
glog.Error(err)
// exit(251, fileName, *fileArg, err)
return 251, err
}

sess := sessions[0]
if len(sess.Profiles) != len(stream.Profiles) {
glog.Infof("session: %+v", sess)
err := fmt.Errorf("got %d profiles but should have %d", len(sess.Profiles), len(stream.Profiles))
return 251, err
// exit(251, fileName, *fileArg, err)
}
if sess.RecordingStatus != api.RecordingStatusWaiting {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, api.RecordingStatusWaiting)
return 250, err
// exit(250, fileName, *fileArg, err)
}
if err = rt.isCancelled(); err != nil {
return 0, err
Expand All @@ -289,7 +280,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if rt.useForceURL {
time.Sleep(5 * time.Second)
} else {
time.Sleep(6*time.Minute + 20*time.Second)
time.Sleep(rt.recordingWaitTime)
}
if err = rt.isCancelled(); err != nil {
return 0, err
Expand All @@ -299,58 +290,47 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err != nil {
err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err)
return 252, err
// exit(252, fileName, *fileArg, err)
}
glog.Infof("Sessions: %+v", sessions)
if err = rt.isCancelled(); err != nil {
return 0, err
}

sess = sessions[0]
statusShould := api.RecordingStatusReady
if rt.useForceURL {
statusShould = api.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould)
return 240, err
// exit(250, fileName, *fileArg, err)
}
if sess.RecordingURL == "" {
err := fmt.Errorf("recording URL should appear by now")
return 249, err
// exit(249, fileName, *fileArg, err)
}
glog.Infof("recordingURL=%s downloading now", sess.RecordingURL)
for _, sess := range sessions {
sess = sessions[0]
statusShould := api.RecordingStatusReady
if rt.useForceURL {
statusShould = api.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould)
return 240, err
}
if sess.RecordingURL == "" {
err := fmt.Errorf("recording URL should appear by now")
return 249, err
}
glog.Infof("recordingURL=%s downloading now", sess.RecordingURL)

// started := time.Now()
// downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil)
// <-downloader.Done()
// glog.Infof(`Pulling stopped after %s`, time.Since(started))
// exit(55, fileName, *fileArg, err)
glog.Info("Done Record Test")
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
}
}

// lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0)
es, err := rt.checkDown(stream, sess.RecordingURL, testDuration)
if err != nil {
return es, err
}
}
glog.Info("Done Record Test")

es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0)
if es == 0 {
rt.lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
}

// uploader := testers.NewRtmpStreamer(gctx, rtmpURL)
// uploader.StartUpload(fileName, rtmpURL, -1, 30*time.Second)
return es, err
rt.lapi.DeleteStream(stream.ID)
return 0, nil
}

func (rt *recordTester) getIngestInfo() (*api.Ingest, error) {
Expand Down Expand Up @@ -390,7 +370,6 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
continue
}
glog.Errorf("Error creating stream session using Livepeer API: %v", err)
// exit(253, fileName, *fileArg, err)
return err
}
break
Expand All @@ -399,7 +378,7 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
httpIngestBaseURL := fmt.Sprintf("%s/live/%s", broadcasterURL, session.ID)
glog.Infof("httpIngestBaseURL=%s", httpIngestBaseURL)
hs.StartUpload(fileName, httpIngestBaseURL, stream.ID, -1, -1, testDuration, 0)
// <-hs.Done()

stats, err := hs.Stats()
glog.Infof("Streaming stream id=%s done err=%v", stream.ID, err)
glog.Infof("Stats: %+v", stats)
Expand All @@ -415,7 +394,7 @@ func (rt *recordTester) isCancelled() error {
return nil
}

func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) {
func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s stream id=%s", url, stream.ID)
Expand Down Expand Up @@ -449,23 +428,19 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat
glog.Warningf("Error parsing mp4 for manifestID=%s url=%s err=%v", stream.ID, url, err)
return 203, err
}
durDiffShould := 2 * time.Second
if doubled {
durDiffShould *= durDiffShould
}
durDiff := streamDuration - dur
if durDiff < 0 {
durDiff = -durDiff
}
if durDiff > durDiffShould {
if durDiff > 2*time.Second {
ers := fmt.Errorf("duration of mp4 differ by %s (got %s, should %s)", durDiff, dur, streamDuration)
glog.Error(ers)
return 300, err
}
return es, nil
}

func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) {
func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false)
Expand All @@ -482,7 +457,7 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration
}
glog.Infof("Stats for %s: %s", stream.ID, vs.String())
glog.Infof("Stats for %s raw: %+v", stream.ID, vs)
if ok, ers := vs.IsOk(streamDuration, doubled); !ok {
if ok, ers := vs.IsOk(streamDuration, false); !ok {
glog.Warningf("NOT OK! (%s)", ers)
es = 36
return es, errors.New(ers)
Expand Down

0 comments on commit 20a2379

Please sign in to comment.