diff --git a/api/handler.go b/api/handler.go index d02bc383..d227438b 100644 --- a/api/handler.go +++ b/api/handler.go @@ -84,6 +84,7 @@ func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) { router.Handler("GET", fullPath, fullHandler) } addApiHandler("/total", "get_total_views", handler.getTotalViews) + addApiHandler("/realtime", "get_realtime_views", handler.getRealTimeViews) } func (h *apiHandler) cors() middleware { @@ -118,6 +119,15 @@ func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) { respondJson(rw, http.StatusOK, views) } +func (h *apiHandler) getRealTimeViews(rw http.ResponseWriter, r *http.Request) { + views, err := h.views.GetRealTimeViews(r.Context(), apiParam(r, assetIDParam)) + if err != nil { + respondError(rw, http.StatusInternalServerError, err) + return + } + respondJson(rw, http.StatusOK, views) +} + func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) { respondJson(rw, http.StatusOK, getStreamStatus(r)) } diff --git a/views/client.go b/views/client.go index 20dccb00..8df056cb 100644 --- a/views/client.go +++ b/views/client.go @@ -14,12 +14,18 @@ import ( ) var ErrAssetNotFound = errors.New("asset not found") +var ErrStreamNotFound = errors.New("stream not found") type TotalViews struct { ID string `json:"id"` StartViews int64 `json:"startViews"` } +type RealTimeViews struct { + ID string `json:"id"` + Views int64 `json:"views"` +} + type ClientOptions struct { Prometheus promClient.Config Livepeer livepeer.ClientOptions @@ -59,6 +65,25 @@ func (c *Client) GetTotalViews(ctx context.Context, id string) ([]TotalViews, er }}, nil } +func (c *Client) GetRealTimeViews(ctx context.Context, id string) ([]RealTimeViews, error) { + stream, err := c.lp.GetStream(id, false) + if errors.Is(err, livepeer.ErrNotExists) { + return nil, ErrStreamNotFound + } else if err != nil { + return nil, fmt.Errorf("error getting stream: %w", err) + } + + realTimeViews, err := c.doQueryRealTimeViews(ctx, stream) + if err != nil { + return nil, fmt.Errorf("error querying real time views: %w", err) + } + + return []RealTimeViews{{ + ID: stream.PlaybackID, + Views: realTimeViews, + }}, nil +} + func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) { query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID) value, warn, err := c.prom.Query(ctx, query, time.Time{}) @@ -80,6 +105,27 @@ func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) ( return int64(vec[0].Value), nil } +func (c *Client) doQueryRealTimeViews(ctx context.Context, stream *livepeer.Stream) (int64, error) { + query := realTimeViewsQuery(stream.PlaybackID) + value, warn, err := c.prom.Query(ctx, query, time.Time{}) + if len(warn) > 0 { + glog.Warningf("Prometheus query warnings: %q", warn) + } + if err != nil { + return -1, fmt.Errorf("query error: %w", err) + } + if value.Type() != model.ValVector { + return -1, fmt.Errorf("unexpected value type: %s", value.Type()) + } + vec := value.(model.Vector) + if len(vec) > 1 { + return -1, fmt.Errorf("unexpected result count: %d", len(vec)) + } else if len(vec) == 0 { + return 0, nil + } + return int64(vec[0].Value), nil +} + func startViewsQuery(playbackID, playbackRecordingID string) string { queryID := playbackID if playbackRecordingID != "" { @@ -90,3 +136,10 @@ func startViewsQuery(playbackID, playbackRecordingID string) string { queryID, ) } + +func realTimeViewsQuery(playbackID string) string { + return fmt.Sprintf( + `sum(mist_sessions{catalyst="true", sessType="viewers", stream=~"video+%s"}[1m] )`, + playbackID, + ) +}