Skip to content

Commit

Permalink
Merge pull request #748 from deniszh/DZ-expand
Browse files Browse the repository at this point in the history
Implementing /metrics/expand API
  • Loading branch information
Civil authored Dec 29, 2022
2 parents 117040a + c408e89 commit 743814d
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 0 deletions.
171 changes: 171 additions & 0 deletions cmd/carbonapi/http/expand_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package http

import (
"encoding/json"
"html"
"net/http"
"sort"
"time"

"github.com/ansel1/merry"
pbv3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/zapwriter"
uuid "github.com/satori/go.uuid"

"github.com/go-graphite/carbonapi/carbonapipb"
"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
"github.com/go-graphite/carbonapi/date"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
)

func expandHandler(w http.ResponseWriter, r *http.Request) {
t0 := time.Now()
uid := uuid.NewV4()
// TODO: Migrate to context.WithTimeout
// ctx, _ := context.WithTimeout(context.TODO(), config.Config.ZipperTimeout)
ctx := utilctx.SetUUID(r.Context(), uid.String())
username, _, _ := r.BasicAuth()
requestHeaders := utilctx.GetLogHeaders(ctx)

format, ok, formatRaw := getFormat(r, treejsonFormat)
jsonp := r.FormValue("jsonp")
groupByExpr := r.FormValue("groupByExpr")
leavesOnly := r.FormValue("leavesOnly")

qtz := r.FormValue("tz")
from := r.FormValue("from")
until := r.FormValue("until")
from64 := date.DateParamToEpoch(from, qtz, timeNow().Add(-time.Hour).Unix(), config.Config.DefaultTimeZone)
until64 := date.DateParamToEpoch(until, qtz, timeNow().Unix(), config.Config.DefaultTimeZone)

srcIP, srcPort := splitRemoteAddr(r.RemoteAddr)

accessLogger := zapwriter.Logger("access")
var accessLogDetails = carbonapipb.AccessLogDetails{
Handler: "expand",
Username: username,
CarbonapiUUID: uid.String(),
URL: r.URL.RequestURI(),
PeerIP: srcIP,
PeerPort: srcPort,
Host: r.Host,
Referer: r.Referer(),
URI: r.RequestURI,
Format: formatRaw,
RequestHeaders: requestHeaders,
}

logAsError := false
defer func() {
deferredAccessLogging(accessLogger, &accessLogDetails, t0, logAsError)
}()

err := r.ParseForm()
if err != nil {
setError(w, &accessLogDetails, err.Error(), http.StatusBadRequest, uid.String())
logAsError = true
return
}
query := r.Form["query"]

if !ok || !format.ValidExpandFormat() {
http.Error(w, "unsupported format: "+html.EscapeString(formatRaw), http.StatusBadRequest)
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "unsupported format: " + formatRaw
logAsError = true
return
}

var pv3Request pbv3.MultiGlobRequest
pv3Request.Metrics = query
pv3Request.StartTime = from64
pv3Request.StopTime = until64

multiGlobs, stats, err := config.Config.ZipperInstance.Find(ctx, pv3Request)
if stats != nil {
accessLogDetails.ZipperRequests = stats.ZipperRequests
accessLogDetails.TotalMetricsCount += stats.TotalMetricsCount
}
if err != nil {
returnCode := merry.HTTPCode(err)
if returnCode != http.StatusOK || multiGlobs == nil {
// Allow override status code for 404-not-found replies.
if returnCode == http.StatusNotFound {
returnCode = config.Config.NotFoundStatusCode
}

if returnCode < 300 {
multiGlobs = &pbv3.MultiGlobResponse{Metrics: []pbv3.GlobResponse{}}
} else {
http.Error(w, http.StatusText(returnCode), returnCode)
accessLogDetails.HTTPCode = int32(returnCode)
accessLogDetails.Reason = err.Error()
// We don't want to log this as an error if it's something normal
// Normal is everything that is >= 500. So if config.Config.NotFoundStatusCode is 500 - this will be
// logged as error

if returnCode >= 500 {
logAsError = true
}
return
}
}
}

var b []byte
var err2 error
b, err2 = expandEncoder(multiGlobs, leavesOnly, groupByExpr)
err = merry.Wrap(err2)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
accessLogDetails.HTTPCode = http.StatusInternalServerError
accessLogDetails.Reason = err.Error()
logAsError = true
return
}

writeResponse(w, http.StatusOK, b, jsonFormat, jsonp, uid.String())
}

func expandEncoder(multiGlobs *pbv3.MultiGlobResponse, leavesOnly string, groupByExpr string) ([]byte, error) {
var b []byte
var err error
groups := make(map[string][]string)
seen := make(map[string]bool)
for _, globs := range multiGlobs.Metrics {
paths := make([]string, 0, len(globs.Matches))
for _, g := range globs.Matches {
if leavesOnly == "1" && !g.IsLeaf {
continue
}
if _, ok := seen[g.Path]; ok {
continue
}
seen[g.Path] = true
paths = append(paths, g.Path)
}
sort.Strings(paths)
groups[globs.Name] = paths
}
if groupByExpr != "1" {
// results are just []string otherwise
// so, flatting map
flatData := make([]string, 0)
for _, group := range groups {
flatData = append(flatData, group...)
}
// sorting flat list one more to mimic graphite-web
sort.Strings(flatData)
data := map[string][]string{
"results": flatData,
}
b, err = json.Marshal(data)
} else {
// results are map[string][]string
data := map[string]map[string][]string{
"results": groups,
}
b, err = json.Marshal(data)
}
return b, err
}
103 changes: 103 additions & 0 deletions cmd/carbonapi/http/expand_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package http

import (
"testing"

pbv3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
)

func TestExpandEncoder(t *testing.T) {
var tests = []struct {
name string
metricIn pbv3.MultiGlobResponse
metricOut string
leavesOnly string
groupByExpr string
}{
{
name: "test1",
metricIn: pbv3.MultiGlobResponse{
Metrics: []pbv3.GlobResponse{
{
Name: "foo.ba*",
Matches: []pbv3.GlobMatch{
{Path: "foo.bar", IsLeaf: false},
{Path: "foo.bat", IsLeaf: true},
},
},
},
},
metricOut: "{\"results\":[\"foo.bar\",\"foo.bat\"]}",
leavesOnly: "0",
groupByExpr: "0",
},
{
name: "test2",
metricIn: pbv3.MultiGlobResponse{
Metrics: []pbv3.GlobResponse{
{
Name: "foo.ba*",
Matches: []pbv3.GlobMatch{
{Path: "foo.bar", IsLeaf: false},
{Path: "foo.bat", IsLeaf: true},
},
},
},
},
metricOut: "{\"results\":[\"foo.bat\"]}",
leavesOnly: "1",
groupByExpr: "0",
},
{
name: "test3",
metricIn: pbv3.MultiGlobResponse{
Metrics: []pbv3.GlobResponse{
{
Name: "foo.ba*",
Matches: []pbv3.GlobMatch{
{Path: "foo.bar", IsLeaf: false},
{Path: "foo.bat", IsLeaf: true},
},
},
},
},
metricOut: "{\"results\":{\"foo.ba*\":[\"foo.bar\",\"foo.bat\"]}}",
leavesOnly: "0",
groupByExpr: "1",
},
{
name: "test4",
metricIn: pbv3.MultiGlobResponse{
Metrics: []pbv3.GlobResponse{
{
Name: "foo.ba*",
Matches: []pbv3.GlobMatch{
{Path: "foo.bar", IsLeaf: false},
{Path: "foo.bat", IsLeaf: true},
},
},
{
Name: "foo.ba*.*",
Matches: []pbv3.GlobMatch{
{Path: "foo.bar", IsLeaf: false},
{Path: "foo.bat", IsLeaf: true},
{Path: "foo.bar.baz", IsLeaf: true},
},
},
},
},
metricOut: "{\"results\":{\"foo.ba*\":[\"foo.bar\",\"foo.bat\"],\"foo.ba*.*\":[\"foo.bar.baz\"]}}",
leavesOnly: "0",
groupByExpr: "1",
},
}
for _, tst := range tests {
tst := tst
t.Run(tst.name, func(t *testing.T) {
response, _ := expandEncoder(&tst.metricIn, tst.leavesOnly, tst.groupByExpr)
if tst.metricOut != string(response) {
t.Errorf("%v should be same as %v", tst.metricOut, string(response))
}
})
}
}
9 changes: 9 additions & 0 deletions cmd/carbonapi/http/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (r responseFormat) String() string {
}
}

func (r responseFormat) ValidExpandFormat() bool {
switch r {
case jsonFormat:
return true
default:
return false
}
}

func (r responseFormat) ValidFindFormat() bool {
switch r {
case jsonFormat:
Expand Down
3 changes: 3 additions & 0 deletions cmd/carbonapi/http/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func InitHandlers(headersToPass, headersToLog []string) *http.ServeMux {
r.HandleFunc(config.Config.Prefix+"/metrics/find/", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(findHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))
r.HandleFunc(config.Config.Prefix+"/metrics/find", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(findHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))

r.HandleFunc(config.Config.Prefix+"/metrics/expand/", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(expandHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))
r.HandleFunc(config.Config.Prefix+"/metrics/expand", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(expandHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))

r.HandleFunc(config.Config.Prefix+"/info/", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(infoHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))
r.HandleFunc(config.Config.Prefix+"/info", httputil.TrackConnections(httputil.TimeHandler(enrichContextWithHeaders(headersToPass, headersToLog, ctx.ParseCtx(infoHandler, ctx.HeaderUUIDAPI)), bucketRequestTimes)))

Expand Down

0 comments on commit 743814d

Please sign in to comment.