Skip to content

Commit

Permalink
Merge pull request #179 from grafana/njpm/sync-upstream
Browse files Browse the repository at this point in the history
Merge upstream changes
  • Loading branch information
npazosmendez authored Jul 31, 2024
2 parents 3fe7a4c + 810ea92 commit 996f193
Show file tree
Hide file tree
Showing 1,127 changed files with 44,191 additions and 22,138 deletions.
15 changes: 13 additions & 2 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/go-graphite/carbonapi/cache"
"github.com/go-graphite/carbonapi/cmd/carbonapi/interfaces"
"github.com/go-graphite/carbonapi/expr"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/tlsconfig"
zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -85,6 +87,7 @@ type ConfigType struct {
SendGlobsAsIs *bool `mapstructure:"sendGlobsAsIs"`
AlwaysSendGlobsAsIs *bool `mapstructure:"alwaysSendGlobsAsIs"`
ExtractTagsFromArgs bool `mapstructure:"extractTagsFromArgs"`
PassFunctionsToBackend bool `mapstructure:"passFunctionsToBackend"`
MaxBatchSize int `mapstructure:"maxBatchSize"`
Zipper string `mapstructure:"zipper"`
Upstreams zipperCfg.Config `mapstructure:"upstreams"`
Expand Down Expand Up @@ -116,10 +119,12 @@ type ConfigType struct {
DefaultTimeZone *time.Location `mapstructure:"-" json:"-"`

// ZipperInstance is API entry to carbonzipper
ZipperInstance interfaces.CarbonZipper `mapstructure:"-" json:"-"`
ZipperInstance zipper.CarbonZipper `mapstructure:"-" json:"-"`

// Limiter limits concurrent zipper requests
Limiter limiter.SimpleLimiter `mapstructure:"-" json:"-"`

Evaluator interfaces.Evaluator `mapstructure:"-" json:"-"`
}

// skipcq: CRT-P0003
Expand All @@ -132,6 +137,12 @@ func (c ConfigType) String() string {
}
}

func (c *ConfigType) SetZipper(zipper zipper.CarbonZipper) (err error) {
c.ZipperInstance = zipper
c.Evaluator, err = expr.NewEvaluator(c.Limiter, c.ZipperInstance, c.PassFunctionsToBackend)
return
}

var Config = ConfigType{
ExtrapolateExperiment: false,
Buckets: 10,
Expand Down
27 changes: 9 additions & 18 deletions cmd/carbonapi/http/find_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-graphite/carbonapi/date"
"github.com/go-graphite/carbonapi/intervalset"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
"github.com/go-graphite/carbonapi/zipper/helper"
)

// Find handler and it's helper functions
Expand Down Expand Up @@ -214,9 +215,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) {
}()

if !ok || !format.ValidFindFormat() {
http.Error(w, "unsupported format: "+formatRaw, http.StatusBadRequest)
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "unsupported format: " + formatRaw
setError(w, &accessLogDetails, "unsupported format: "+formatRaw, http.StatusBadRequest, uid.String())
logAsError = true
return
}
Expand Down Expand Up @@ -244,17 +243,15 @@ func findHandler(w http.ResponseWriter, r *http.Request) {
if format == protoV3Format {
body, err := io.ReadAll(r.Body)
if err != nil {
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "failed to parse message body: " + err.Error()
http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest)
setError(w, &accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String())
logAsError = true
return
}

err = pv3Request.Unmarshal(body)
if err != nil {
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "failed to parse message body: " + err.Error()
http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest)
setError(w, &accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String())
logAsError = true
return
}
} else {
Expand All @@ -264,9 +261,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) {
}

if len(pv3Request.Metrics) == 0 {
http.Error(w, "missing parameter `query`", http.StatusBadRequest)
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "missing parameter `query`"
setError(w, &accessLogDetails, "missing parameter `query`", http.StatusBadRequest, uid.String())
logAsError = true
return
}
Expand All @@ -289,9 +284,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) {
if returnCode < 300 {
multiGlobs = &pbv3.MultiGlobResponse{Metrics: []pbv3.GlobResponse{}}
} else {
http.Error(w, http.StatusText(returnCode), returnCode)
accessLogDetails.HTTPCode = int32(returnCode)
accessLogDetails.Reason = err.Error()
setError(w, &accessLogDetails, helper.MerryRootError(err), returnCode, uid.String())
// We don't want to log this as an error if it's something normal
// Normal is everything that is >= 500. So if config.Config.NotFoundStatusCode is 500 - this will be
// logged as error
Expand Down Expand Up @@ -371,9 +364,7 @@ func findHandler(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
accessLogDetails.HTTPCode = http.StatusInternalServerError
accessLogDetails.Reason = err.Error()
setError(w, &accessLogDetails, err.Error(), http.StatusInternalServerError, uid.String())
logAsError = true
return
}
Expand Down
71 changes: 70 additions & 1 deletion cmd/carbonapi/http/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"fmt"
"html"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -229,6 +230,27 @@ func splitRemoteAddr(addr string) (string, string) {
return tmp[0], tmp[1]
}

func stripKey(key string, n int) string {
if len(key) > n+3 {
key = key[:n/2] + "..." + key[n/2+1:]
}
return key
}

// stripError for strip network errors (ip and other private info)
func stripError(err string) string {
if strings.Contains(err, "connection refused") {
return "connection refused"
} else if strings.Contains(err, " lookup ") {
return "lookup error"
} else if strings.Contains(err, "broken pipe") {
return "broken pipe"
} else if strings.Contains(err, " connection reset ") {
return "connection reset"
}
return html.EscapeString(err)
}

func buildParseErrorString(target, e string, err error) string {
msg := fmt.Sprintf("%s\n\n%-20s: %s\n", http.StatusText(http.StatusBadRequest), "Target", target)
if err != nil {
Expand Down Expand Up @@ -285,9 +307,56 @@ func timestampTruncate(ts int64, duration time.Duration, durations []config.Dura

func setError(w http.ResponseWriter, accessLogDetails *carbonapipb.AccessLogDetails, msg string, status int, carbonapiUUID string) {
w.Header().Set(ctxHeaderUUID, carbonapiUUID)
http.Error(w, http.StatusText(status)+": "+msg, status)
if msg == "" {
msg = http.StatusText(status)
}
accessLogDetails.Reason = msg
accessLogDetails.HTTPCode = int32(status)
msg = html.EscapeString(stripError(msg))
http.Error(w, msg, status)
}

func joinErrors(errMap map[string]string, sep string, status int) (msg, reason string) {
if len(errMap) == 0 {
msg = http.StatusText(status)
} else {
var buf, rBuf strings.Builder
buf.Grow(512)
rBuf.Grow(512)

// map is unsorted, can produce flapping ordered output, not critical
for k, err := range errMap {
if buf.Len() > 0 {
buf.WriteString(sep)
rBuf.WriteString(sep)
}
buf.WriteString(html.EscapeString(stripKey(k, 128)))
rBuf.WriteString(k)
buf.WriteString(": ")
rBuf.WriteString(": ")
buf.WriteString(html.EscapeString(stripError(err)))
rBuf.WriteString(err)
}

msg = buf.String()
reason = rBuf.String()
}
return
}

func setErrors(w http.ResponseWriter, accessLogDetails *carbonapipb.AccessLogDetails, errMamp map[string]string, status int, carbonapiUUID string) {
w.Header().Set(ctxHeaderUUID, carbonapiUUID)
var msg string
if status != http.StatusOK {
if len(errMamp) == 0 {
msg = http.StatusText(status)
accessLogDetails.Reason = msg
} else {
msg, accessLogDetails.Reason = joinErrors(errMamp, "\n", status)
}
}
accessLogDetails.HTTPCode = int32(status)
http.Error(w, msg, status)
}

func queryLengthLimitExceeded(query []string, maxLength uint64) bool {
Expand Down
2 changes: 1 addition & 1 deletion cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
config.Config.Upstreams.Backends = []string{"dummy"}
config.SetUpConfigUpstreams(logger)
config.SetUpConfig(logger, "(test)")
config.Config.ZipperInstance = newMockCarbonZipper()
config.Config.SetZipper(newMockCarbonZipper())
emptyStringList := make([]string, 0)
InitHandlers(emptyStringList, emptyStringList)
}
Expand Down
30 changes: 16 additions & 14 deletions cmd/carbonapi/http/render_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,15 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
if format == protoV3Format {
body, err := io.ReadAll(r.Body)
if err != nil {
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "failed to parse message body: " + err.Error()
http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest)
setError(w, accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String())
return
}

var pv3Request pb.MultiFetchRequest
err = pv3Request.Unmarshal(body)

if err != nil {
accessLogDetails.HTTPCode = http.StatusBadRequest
accessLogDetails.Reason = "failed to parse message body: " + err.Error()
http.Error(w, "bad request (failed to parse format): "+err.Error(), http.StatusBadRequest)
setError(w, accessLogDetails, "failed to parse message body: "+err.Error(), http.StatusBadRequest, uid.String())
return
}

Expand Down Expand Up @@ -306,7 +302,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, errs := expr.FetchAndEvalExprs(ctx, exprs, from32, until32, values)
result, errs := expr.FetchAndEvalExprs(ctx, config.Config.Evaluator, exprs, from32, until32, values)
if errs != nil {
errors = errs
}
Expand All @@ -324,9 +320,15 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, err := expr.FetchAndEvalExp(ctx, exp, from32, until32, values)
result, err := expr.FetchAndEvalExp(ctx, config.Config.Evaluator, exp, from32, until32, values)
if err != nil {
errors[target] = merry.Wrap(err)
if config.Config.Upstreams.RequireSuccessAll {
code := merry.HTTPCode(err)
if code != http.StatusOK && code != http.StatusNotFound {
break
}
}
}

results = append(results, result...)
Expand All @@ -347,20 +349,20 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
var body []byte

returnCode := http.StatusOK
if len(results) == 0 {
if len(results) == 0 || (len(errors) > 0 && config.Config.Upstreams.RequireSuccessAll) {
// Obtain error code from the errors
// In case we have only "Not Found" errors, result should be 404
// Otherwise it should be 500
var errMsgs []string
var errMsgs map[string]string
returnCode, errMsgs = helper.MergeHttpErrorMap(errors)
logger.Debug("error response or no response", zap.Strings("error", errMsgs))
logger.Debug("error response or no response", zap.Any("error", errMsgs))
// Allow override status code for 404-not-found replies.
if returnCode == 404 {
if returnCode == http.StatusNotFound {
returnCode = config.Config.NotFoundStatusCode
}

if returnCode == 400 || returnCode == http.StatusForbidden || returnCode >= 500 {
setError(w, accessLogDetails, strings.Join(errMsgs, ","), returnCode, uid.String())
if returnCode == http.StatusBadRequest || returnCode == http.StatusNotFound || returnCode == http.StatusForbidden || returnCode >= 500 {
setErrors(w, accessLogDetails, errMsgs, returnCode, uid.String())
logAsError = true
return
}
Expand Down
24 changes: 11 additions & 13 deletions cmd/carbonapi/http/tags_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-graphite/carbonapi/carbonapipb"
"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
"github.com/go-graphite/carbonapi/zipper/helper"
"github.com/go-graphite/carbonapi/zipper/types"
"github.com/lomik/zapwriter"
"go.uber.org/zap"
Expand All @@ -21,9 +22,10 @@ import (
func tagHandler(w http.ResponseWriter, r *http.Request) {
t0 := time.Now()
uuid := uuid.NewV4()
carbonapiUUID := uuid.String()

// TODO: Migrate to context.WithTimeout
ctx := utilctx.SetUUID(r.Context(), uuid.String())
ctx := utilctx.SetUUID(r.Context(), carbonapiUUID)
requestHeaders := utilctx.GetLogHeaders(ctx)
username, _, _ := r.BasicAuth()

Expand All @@ -39,7 +41,7 @@ func tagHandler(w http.ResponseWriter, r *http.Request) {
var accessLogDetails = &carbonapipb.AccessLogDetails{
Handler: "tags",
Username: username,
CarbonapiUUID: uuid.String(),
CarbonapiUUID: carbonapiUUID,
URL: r.URL.Path,
PeerIP: srcIP,
PeerPort: srcPort,
Expand All @@ -57,8 +59,7 @@ func tagHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
logAsError = true
w.Header().Set("Content-Type", contentTypeJSON)
_, _ = w.Write([]byte{'[', ']'})
setError(w, accessLogDetails, err.Error(), http.StatusBadRequest, carbonapiUUID)
return
}

Expand All @@ -81,7 +82,7 @@ func tagHandler(w http.ResponseWriter, r *http.Request) {
rawQuery := q.Encode()

if queryLengthLimitExceeded(r.Form["query"], config.Config.MaxQueryLength) {
setError(w, accessLogDetails, "query length limit exceeded", http.StatusBadRequest, uuid.String())
setError(w, accessLogDetails, "query length limit exceeded", http.StatusBadRequest, carbonapiUUID)
logAsError = true
return
}
Expand All @@ -99,10 +100,9 @@ func tagHandler(w http.ResponseWriter, r *http.Request) {
}

// TODO(civil): Implement stats
if err != nil && !merry.Is(err, types.ErrNoMetricsFetched) && !merry.Is(err, types.ErrNonFatalErrors) {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
accessLogDetails.HTTPCode = http.StatusInternalServerError
accessLogDetails.Reason = err.Error()
if err != nil && !merry.Is(err, types.ErrNoMetricsFetched) && (!merry.Is(err, types.ErrNonFatalErrors) || config.Config.Upstreams.RequireSuccessAll) {
code := merry.HTTPCode(err)
setError(w, accessLogDetails, helper.MerryRootError(err), code, carbonapiUUID)
logAsError = true
return
}
Expand All @@ -115,15 +115,13 @@ func tagHandler(w http.ResponseWriter, r *http.Request) {
}

if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
accessLogDetails.HTTPCode = http.StatusInternalServerError
accessLogDetails.Reason = err.Error()
setError(w, accessLogDetails, err.Error(), http.StatusInternalServerError, carbonapiUUID)
logAsError = true
return
}

w.Header().Set("Content-Type", contentTypeJSON)
w.Header().Set(ctxHeaderUUID, uuid.String())
w.Header().Set(ctxHeaderUUID, carbonapiUUID)
_, _ = w.Write(b)
accessLogDetails.Runtime = time.Since(t0).Seconds()
accessLogDetails.HTTPCode = http.StatusOK
Expand Down
6 changes: 5 additions & 1 deletion cmd/carbonapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func main() {
dns.UseDNSCache(config.Config.CachingDNSRefreshTime)
}

config.Config.ZipperInstance = newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))
if err := config.Config.SetZipper(newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))); err != nil {
logger.Fatal("failed to setup zipper",
zap.Error(err),
)
}

wg := sync.WaitGroup{}
serve := func(listen config.Listener, handler http.Handler) {
Expand Down
Loading

0 comments on commit 996f193

Please sign in to comment.