Skip to content

Commit

Permalink
views: realtime
Browse files Browse the repository at this point in the history
  • Loading branch information
gioelecerati committed Nov 15, 2022
1 parent c6aa7ae commit 45f3e4c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
10 changes: 10 additions & 0 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) {
router.Handler("GET", fullPath, fullHandler)
}
addApiHandler("/total", "get_total_views", handler.getTotalViews)
addApiHandler("/realtime", "get_realtime_views", handler.getRealTimeViews)
}

func (h *apiHandler) cors() middleware {
Expand Down Expand Up @@ -118,6 +119,15 @@ func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
respondJson(rw, http.StatusOK, views)
}

func (h *apiHandler) getRealTimeViews(rw http.ResponseWriter, r *http.Request) {
views, err := h.views.GetRealTimeViews(r.Context(), apiParam(r, assetIDParam))
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, views)
}

func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) {
respondJson(rw, http.StatusOK, getStreamStatus(r))
}
Expand Down
53 changes: 53 additions & 0 deletions views/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ import (
)

var ErrAssetNotFound = errors.New("asset not found")
var ErrStreamNotFound = errors.New("stream not found")

type TotalViews struct {
ID string `json:"id"`
StartViews int64 `json:"startViews"`
}

type RealTimeViews struct {
ID string `json:"id"`
Views int64 `json:"views"`
}

type ClientOptions struct {
Prometheus promClient.Config
Livepeer livepeer.ClientOptions
Expand Down Expand Up @@ -59,6 +65,25 @@ func (c *Client) GetTotalViews(ctx context.Context, id string) ([]TotalViews, er
}}, nil
}

func (c *Client) GetRealTimeViews(ctx context.Context, id string) ([]RealTimeViews, error) {
stream, err := c.lp.GetStream(id, false)
if errors.Is(err, livepeer.ErrNotExists) {
return nil, ErrStreamNotFound
} else if err != nil {
return nil, fmt.Errorf("error getting stream: %w", err)
}

realTimeViews, err := c.doQueryRealTimeViews(ctx, stream)
if err != nil {
return nil, fmt.Errorf("error querying real time views: %w", err)
}

return []RealTimeViews{{
ID: stream.PlaybackID,
Views: realTimeViews,
}}, nil
}

func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) {
query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID)
value, warn, err := c.prom.Query(ctx, query, time.Time{})
Expand All @@ -80,6 +105,27 @@ func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (
return int64(vec[0].Value), nil
}

func (c *Client) doQueryRealTimeViews(ctx context.Context, stream *livepeer.Stream) (int64, error) {
query := realTimeViewsQuery(stream.PlaybackID)
value, warn, err := c.prom.Query(ctx, query, time.Time{})
if len(warn) > 0 {
glog.Warningf("Prometheus query warnings: %q", warn)
}
if err != nil {
return -1, fmt.Errorf("query error: %w", err)
}
if value.Type() != model.ValVector {
return -1, fmt.Errorf("unexpected value type: %s", value.Type())
}
vec := value.(model.Vector)
if len(vec) > 1 {

This comment has been minimized.

Copy link
@gioelecerati

gioelecerati Nov 15, 2022

Author Member

This need to be changed, the query currently returns multiple values. This check should be different or the query should return an average single value

return -1, fmt.Errorf("unexpected result count: %d", len(vec))
} else if len(vec) == 0 {
return 0, nil
}
return int64(vec[0].Value), nil
}

func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID := playbackID
if playbackRecordingID != "" {
Expand All @@ -90,3 +136,10 @@ func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID,
)
}

func realTimeViewsQuery(playbackID string) string {
return fmt.Sprintf(
`sum(mist_sessions{catalyst="true", sessType="viewers", stream=~"video+%s"}[1m] )`,
playbackID,
)
}

0 comments on commit 45f3e4c

Please sign in to comment.