Skip to content

Commit

Permalink
ai/live: Take RTMP output URL from stream query parameters (#3252)
Browse files Browse the repository at this point in the history
* Take RTMP output URL from stream query params

* Add stream type and query params to auth request

* Logging updates for MediaMTX errors
  • Loading branch information
j0sh authored Nov 15, 2024
1 parent 5738a52 commit abdd534
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
15 changes: 13 additions & 2 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -64,11 +65,21 @@ func startTrickleSubscribe(url *url.URL, params aiRequestParams) {
ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: fmt.Sprintf("pipe:%d", r.Fd()),
}, []ffmpeg.TranscodeOptions{{
// TODO take from params
Oname: "rtmp://localhost/out-stream",
Oname: params.outputRTMPURL,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "flv"},
}})
}()
}

func mediamtxSourceTypeToString(s string) (string, error) {
switch s {
case "webrtcSession":
return "whip", nil
case "rtmpConn":
return "rtmp", nil
default:
return "", errors.New("unknown media source")
}
}
45 changes: 39 additions & 6 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/getkin/kin-openapi/openapi3filter"
Expand Down Expand Up @@ -356,32 +358,62 @@ func (ls *LivepeerServer) ImageToVideoResult() http.Handler {

func (ls *LivepeerServer) StartLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
streamName := r.FormValue("stream")
if streamName == "" {
clog.Errorf(ctx, "Missing stream name")
http.Error(w, "Missing stream name", http.StatusBadRequest)
return
}
ctx = clog.AddVal(ctx, "stream", streamName)
sourceID := r.FormValue("source_id")
if sourceID == "" {
clog.Errorf(ctx, "Missing source_id")
http.Error(w, "Missing source_id", http.StatusBadRequest)
return
}
ctx = clog.AddVal(ctx, "source_id", sourceID)
sourceType := r.FormValue("source_type")
if sourceType == "" {
clog.Errorf(ctx, "Missing source_type")
http.Error(w, "Missing source_type", http.StatusBadRequest)
return
}
sourceTypeStr, err := mediamtxSourceTypeToString(sourceType)
if err != nil {
clog.Errorf(ctx, "Invalid source type %s", sourceType)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx = clog.AddVal(ctx, "source_type", sourceType)

queryParams := r.FormValue("query")
qp, err := url.ParseQuery(queryParams)
if err != nil {
clog.Errorf(ctx, "invalid query params, err=%w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// If auth webhook is set and returns an output URL, this will be replaced
outputURL := qp.Get("rtmpOutput")
if outputURL == "" {
// re-publish to ourselves for now
// Not sure if we want this to be permanent
outputURL = "rtmp://localhost/" + streamName + "-out"
}

if streamName == "out-stream" {
// convention to avoid re-subscribing to our own streams
// in case we want to push outputs back into mediamtx -
// use an `-out` suffix for the stream name.
if strings.HasSuffix(streamName, "-out") {
// skip for now; we don't want to re-publish our own outputs
return
}
ctx := clog.AddVal(r.Context(), "stream", streamName)
ctx = clog.AddVal(ctx, "source_id", sourceID)
ctx = clog.AddVal(ctx, "source_type", sourceType)

err := authenticateAIStream(AuthWebhookURL, AIAuthRequest{
Stream: streamName,
err = authenticateAIStream(AuthWebhookURL, AIAuthRequest{
Stream: streamName,
Type: sourceTypeStr,
QueryParams: queryParams,
})
if err != nil {
kickErr := kickInputConnection(sourceID, sourceType)
Expand Down Expand Up @@ -410,6 +442,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
os: drivers.NodeStorage.NewSession(requestID),
sessManager: ls.AISessionManager,
segmentReader: ssr,
outputRTMPURL: outputURL,
}

req := worker.GenLiveVideoToVideoJSONRequestBody{
Expand Down
1 change: 1 addition & 0 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type aiRequestParams struct {

// For live video pipelines
segmentReader *media.SwitchableSegmentReader
outputRTMPURL string
}

// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request.
Expand Down
8 changes: 8 additions & 0 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,15 @@ func (a authWebhookResponse) areProfilesEqual(b authWebhookResponse) bool {
}

type AIAuthRequest struct {
// Stream name or stream key
Stream string `json:"stream"`

// Stream type, eg RTMP or WHIP
Type string `json:"type"`

// Query parameters that came with the stream, if any
QueryParams string `json:"query_params,omitempty"`

// TODO not sure what params we need yet
}

Expand Down

0 comments on commit abdd534

Please sign in to comment.