Skip to content

Commit

Permalink
api/views: Create new viewership BigQuery API (#121)
Browse files Browse the repository at this point in the history
* api/views: Add bigquery viewership support

* views: Create separate prometheus+bigquery components

* views: Make bigquery component more testable

* views: Add tests to bigquery client

* api/views: Output time as unix millis timestamp

* api/views: Create right 404 response

* api/views: Remove old staging hack

It's unusable anyway

* api/views: Use nullable for all breakdown fields

* views/bigquery: Update query for error_rate metric

Also fix exits_before_start tag

* Address PR comments

Re-ordered the shit out of bigquery handler
  • Loading branch information
victorges authored May 9, 2023
1 parent dd30988 commit 90c45b1
Show file tree
Hide file tree
Showing 13 changed files with 1,081 additions and 485 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ all: $(allCmds)
$(allCmds):
$(MAKE) -C ./cmd/$@

run: check_local_rabbit deps_start
run:
$(MAKE) -C ./cmd/$(cmd) run

docker:
Expand Down
28 changes: 23 additions & 5 deletions api/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
httpClient = &http.Client{
Transport: promhttp.InstrumentRoundTripperDuration(authRequestDuration, http.DefaultTransport),
}

userIdContextKey = &struct{}{}
)

func authorization(authUrl string) middleware {
Expand All @@ -48,12 +50,12 @@ func authorization(authUrl string) middleware {
respondError(rw, http.StatusInternalServerError, err)
return
}

authReq.Header.Set("X-Original-Uri", originalReqUri(r))
if streamID := apiParam(r, streamIDParam); streamID != "" {
authReq.Header.Set("X-Livepeer-Stream-Id", streamID)
} else if assetID := apiParam(r, assetIDParam); assetID != "" {
authReq.Header.Set("X-Livepeer-Asset-Id", assetID)
}
setAuthHeaderFromAPIParam(r, authReq.Header, streamIDParam, "X-Livepeer-Stream-Id")
setAuthHeaderFromAPIParam(r, authReq.Header, assetIDParam, "X-Livepeer-Asset-Id")
setAuthHeaderFromAPIParam(r, authReq.Header, playbackIDParam, "X-Livepeer-Playback-Id")

copyHeaders(authorizationHeaders, r.Header, authReq.Header)
authRes, err := httpClient.Do(authReq)
if err != nil {
Expand All @@ -72,6 +74,12 @@ func authorization(authUrl string) middleware {
}
return
}

if userID := authRes.Header.Get("X-Livepeer-User-Id"); userID != "" {
ctx := context.WithValue(r.Context(), userIdContextKey, userID)
r = r.WithContext(ctx)
}

next.ServeHTTP(rw, r)
})
}
Expand All @@ -87,6 +95,16 @@ func originalReqUri(r *http.Request) string {
return fmt.Sprintf("%s://%s%s", proto, r.Host, r.URL.RequestURI())
}

func setAuthHeaderFromAPIParam(r *http.Request, headers http.Header, param string, header string) {
val := apiParam(r, param)
if val == "" {
val = r.URL.Query().Get(param)
}
if val != "" {
headers.Set(header, val)
}
}

func copyHeaders(headers []string, src, dest http.Header) {
for _, header := range headers {
if vals := src[header]; len(vals) > 0 {
Expand Down
248 changes: 213 additions & 35 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,54 @@ package api
import (
"context"
"errors"
"fmt"
"net/http"
"path"
"strconv"
"time"

"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/metrics"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/jsse"
"github.com/livepeer/livepeer-data/views"
"github.com/prometheus/client_golang/prometheus/promhttp"
cache "github.com/victorspringer/http-cache"
"github.com/victorspringer/http-cache/adapter/memory"
)

const (
sseRetryBackoff = 10 * time.Second
ssePingDelay = 20 * time.Second
sseBufferSize = 128

streamIDParam = "streamId"
assetIDParam = "assetId"
streamIDParam = "streamId"
assetIDParam = "assetId"
playbackIDParam = "playbackId"
)

var httpCache *cache.Client

func init() {
memcached, err := memory.NewAdapter(
memory.AdapterWithAlgorithm(memory.LRU),
memory.AdapterWithCapacity(2000),
)
if err != nil {
panic(err)
}

httpCache, err = cache.NewClient(
cache.ClientWithAdapter(memcached),
cache.ClientWithTTL(5*time.Minute),
)
if err != nil {
panic(err)
}
}

type APIHandlerOptions struct {
ServerName, APIRoot, AuthURL string
RegionalHostFormat, OwnRegion string
Expand All @@ -42,48 +67,104 @@ type apiHandler struct {
func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *health.Core, views *views.Client) http.Handler {
handler := &apiHandler{opts, serverCtx, healthcore, views}

router := httprouter.New()
router.HandlerFunc("GET", "/_healthz", handler.healthcheck)
router := chi.NewRouter()

// don't use middlewares for the system routes
router.Get("/_healthz", handler.healthcheck)
if opts.Prometheus {
router.Handler("GET", "/metrics", promhttp.Handler())
router.Method("GET", "/metrics", promhttp.Handler())
}
addStreamHealthHandlers(router, handler)
addViewershipHandlers(router, handler)

globalMiddlewares := []middleware{handler.cors()}
return prepareHandler("", false, router, globalMiddlewares...)
router.Route(opts.APIRoot, func(router chi.Router) {
router.Use(chimiddleware.Logger)
router.Use(chimiddleware.NewCompressor(5, "application/json").Handler)
router.Use(handler.cors())

router.Mount(`/stream/{`+streamIDParam+`}`, handler.streamHealthHandler())
router.Mount("/views", handler.viewershipHandler())
})

return router
}

func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) {
healthcore, opts := handler.core, handler.opts
middlewares := []middleware{
// {streamId} variable must be set in the request context
func (h *apiHandler) streamHealthHandler() chi.Router {
healthcore, opts := h.core, h.opts

router := chi.NewRouter()
router.Use(
streamStatus(healthcore),
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
}
regionProxy(opts.RegionalHostFormat, opts.OwnRegion))
if opts.AuthURL != "" {
middlewares = append(middlewares, authorization(opts.AuthURL))
}
addApiHandler := func(apiPath, name string, handler http.HandlerFunc) {
fullPath := path.Join(opts.APIRoot, "/stream/:"+streamIDParam, apiPath)
fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...)
router.Handler("GET", fullPath, fullHandler)
router.Use(authorization(opts.AuthURL))
}
addApiHandler("/health", "get_stream_health", handler.getStreamHealth)
addApiHandler("/events", "stream_health_events", handler.subscribeEvents)

h.withMetrics(router, "get_stream_health").
MethodFunc("GET", "/health", h.getStreamHealth)
h.withMetrics(router, "stream_health_events").
MethodFunc("GET", "/events", h.subscribeEvents)

return router
}

func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) {
opts := handler.opts
middlewares := []middleware{}
func (h *apiHandler) viewershipHandler() chi.Router {
opts := h.opts

router := chi.NewRouter()
if opts.AuthURL != "" {
middlewares = append(middlewares, authorization(opts.AuthURL))
router.Use(authorization(opts.AuthURL))
}
addApiHandler := func(apiPath, name string, handler http.HandlerFunc) {
fullPath := path.Join(opts.APIRoot, "/views/:"+assetIDParam, apiPath)
fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...)
router.Handler("GET", fullPath, fullHandler)

// TODO: Remove this deprecated endpoint once we know no one is using it
h.withMetrics(router, "get_total_views").
With(h.cache(false)).
MethodFunc("GET", fmt.Sprintf(`/{%s}/total`, assetIDParam), h.getTotalViews)

// total views public API
h.withMetrics(router, "query_total_viewership").
With(h.cache(false)).
MethodFunc("GET", fmt.Sprintf(`/query/total/{%s}`, playbackIDParam), h.queryTotalViewership)
// creator views API, requires assetId or streamId on the query-string
h.withMetrics(router, "query_creator_viewership").
With(h.cache(true)).
With(ensureIsCreatorQuery).
MethodFunc("GET", `/query/creator`, h.queryViewership(false))
// full application views API, gets access to all metrics and filters
h.withMetrics(router, "query_application_viewership").
With(h.cache(true)).
MethodFunc("GET", `/query`, h.queryViewership(true))

return router
}

func (h *apiHandler) withMetrics(router chi.Router, name string) chi.Router {
if !h.opts.Prometheus {
return router
}
return router.With(func(handler http.Handler) http.Handler {
return metrics.ObservedHandler(name, handler)
})
}

func (h *apiHandler) cache(varyAuth bool) middleware {
return func(next http.Handler) http.Handler {
next = httpCache.Middleware(next)

return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Cache-Control", "public, max-age=60, s-maxage=300, stale-while-revalidate=3600, stale-if-error=86400")

if varyAuth {
rw.Header().Add("Vary", "Authorization")

// cache lib uses only URL as key so add auth header to the query-string
query := r.URL.Query()
query.Add("auth-header", r.Header.Get("Authorization"))
r.URL.RawQuery = query.Encode()
}

next.ServeHTTP(rw, r)
})
}
addApiHandler("/total", "get_total_views", handler.getTotalViews)
}

func (h *apiHandler) cors() middleware {
Expand All @@ -109,13 +190,110 @@ func (h *apiHandler) healthcheck(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(status)
}

func (h *apiHandler) queryTotalViewership(rw http.ResponseWriter, r *http.Request) {
playbackID := apiParam(r, playbackIDParam)

query := views.QuerySpec{Filter: views.QueryFilter{PlaybackID: playbackID}}
metrics, err := h.views.Query(r.Context(), query, "", "")
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
if len(metrics) == 0 {
metrics = []views.Metric{{PlaybackID: playbackID}}
if dStorageURL := views.ToDStorageURL(playbackID); dStorageURL != "" {
metrics = []views.Metric{{DStorageURL: dStorageURL}}
}
}
respondJson(rw, http.StatusOK, metrics[0])
}

func ensureIsCreatorQuery(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// disallow querying creator metrics with a playback ID
qs := r.URL.Query()
qs.Del("playbackId")
r.URL.RawQuery = qs.Encode()

hasAsset, hasStream := qs.Get("assetId") != "", qs.Get("streamId") != ""
if hasAsset == hasStream {
respondError(rw, http.StatusBadRequest, errors.New("must provide exactly 1 of assetId or streamId for creator query"))
return
}

next.ServeHTTP(rw, r)
})
}

func (h *apiHandler) queryViewership(detailed bool) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
var (
from, err1 = parseInputTimestamp(r.URL.Query().Get("from"))
to, err2 = parseInputTimestamp(r.URL.Query().Get("to"))
)
if errs := nonNilErrs(err1, err2); len(errs) > 0 {
respondError(rw, http.StatusBadRequest, errs...)
return
}

userId, ok := r.Context().Value(userIdContextKey).(string)
if !ok {
respondError(rw, http.StatusInternalServerError, errors.New("request not authenticated"))
return
}

qs := r.URL.Query()
assetID, streamID := qs.Get("assetId"), qs.Get("streamId")
query := views.QuerySpec{
From: from,
To: to,
TimeStep: qs.Get("timeStep"),
Filter: views.QueryFilter{
UserID: userId,
PlaybackID: qs.Get("playbackId"),
CreatorID: qs.Get("creatorId"),
},
BreakdownBy: qs["breakdownBy[]"],
Detailed: detailed,
}

metrics, err := h.views.Query(r.Context(), query, assetID, streamID)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, metrics)
}
}

func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
views, err := h.views.GetTotalViews(r.Context(), apiParam(r, assetIDParam))
assetID := apiParam(r, assetIDParam)

totalViews, err := h.views.Deprecated_GetTotalViews(r.Context(), assetID)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, views)
oldStartViews := totalViews[0].StartViews

metrics, err := h.views.Query(r.Context(), views.QuerySpec{}, assetID, "")
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

if len(metrics) > 0 {
totalViews = []views.TotalViews{{
ID: totalViews[0].ID,
StartViews: metrics[0].ViewCount,
}}
}

userId := r.Context().Value(userIdContextKey)
glog.Infof("Used deprecated get total views endpoint userId=%v assetId=%v playbackId=%v oldStartViews=%v newViewCount=%v",
userId, assetID, totalViews[0].ID, oldStartViews, totalViews[0].StartViews)

respondJson(rw, http.StatusOK, totalViews)
}

func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit 90c45b1

Please sign in to comment.