Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QFE: new middleware to force query statistics collection #7854

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.
- [#7659](https://github.com/thanos-io/thanos/pull/7659) Receive: Add support for replication using [Cap'n Proto](https://capnproto.org/). This protocol has a lower CPU and memory footprint, which leads to a reduction in resource usage in Receivers. Before enabling it, make sure that all receivers are updated to a version which supports this replication method.
- [#7854](https://github.com/thanos-io/thanos/pull/7854) Query Frontend: Add `--query-frontend.force-query-stats` flag to force collection of query statistics from upstream queriers.

### Changed

Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)

cmd.Flag("query-frontend.force-query-stats", "Will always pass \"stats\" param to upstream queriers and collect query statistics reporting them as logs.").Default("false").BoolVar(&cfg.ForceQueryStats)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
"Request header names used to identify the source of slow queries (repeated flag). "+
Expand Down
4 changes: 4 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ Flags:
functions in query-frontend.
--no-query-frontend.enable-x-functions for
disabling.
--query-frontend.force-query-stats
Will always pass "stats" param to upstream
queriers and collect query statistics reporting
them as logs.
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
Expand Down
55 changes: 48 additions & 7 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)

defer func() {
_ = r.Body.Close()
Expand Down Expand Up @@ -156,7 +154,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if shouldReportSlowQuery {
f.reportSlowQuery(r, hs, queryString, queryResponseTime)
f.reportSlowQuery(r, hs, queryString, queryResponseTime, stats)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
Expand All @@ -171,7 +169,13 @@ func isQueryEndpoint(path string) bool {
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, queryString url.Values, queryResponseTime time.Duration) {
func (f *Handler) reportSlowQuery(
r *http.Request,
responseHeaders http.Header,
queryString url.Values,
queryResponseTime time.Duration,
stats *querier_stats.Stats,
) {
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
grafanaDashboardUID := "-"
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {
Expand Down Expand Up @@ -207,6 +211,9 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
"trace_id", thanosTraceID,
}, formatQueryString(queryString)...)

logMessage = addQueryRangeToLogMessage(queryString, logMessage)
logMessage = f.addStatsToLogMessage(logMessage, stats)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

Expand Down Expand Up @@ -265,6 +272,40 @@ func formatQueryString(queryString url.Values) (fields []interface{}) {
return fields
}

func (f *Handler) addStatsToLogMessage(message []interface{}, stats *querier_stats.Stats) []interface{} {
if stats != nil {
message = append(message, "peak_samples", stats.LoadPeakSamples())
message = append(message, "total_samples_loaded", stats.LoadTotalSamples())
}

return message
}

func addQueryRangeToLogMessage(queryString url.Values, logMessage []interface{}) []interface{} {
queryRange := extractQueryRange(queryString)
if queryRange != time.Duration(0) {
logMessage = append(logMessage, "query_range_hours", int(queryRange.Hours()))
logMessage = append(logMessage, "query_range_human", queryRange.String())
}
return logMessage
}

// extractQueryRange extracts query range from query string.
// If start and end are not provided or are invalid, it returns a duration with zero-value.
func extractQueryRange(queryString url.Values) time.Duration {
startStr := queryString.Get("start")
endStr := queryString.Get("end")
var queryRange = time.Duration(0)
if startStr != "" && endStr != "" {
start, serr := util.ParseTime(startStr)
end, eerr := util.ParseTime(endStr)
if serr == nil && eerr == nil {
queryRange = time.Duration(end-start) * time.Millisecond
}
}
return queryRange
}

func writeError(w http.ResponseWriter, err error) {
switch err {
case context.Canceled:
Expand Down
17 changes: 14 additions & 3 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,27 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
sp.LogFields(otlog.Int("bytes", len(b)))

resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Header: mergeHeaders(a.Headers),
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}
return &resp, nil
}

// PrometheusResponseHeader helps preserve the Header from the original Prometheus response, coming from the Tripperware.
func mergeHeaders(headers []*PrometheusResponseHeader) http.Header {
h := make(http.Header, len(headers)+1)
for _, header := range headers {
if strings.EqualFold("Content-Type", header.Name) {
continue
}
h[header.Name] = header.Values
}
h["Content-Type"] = []string{"application/json"}
return h
}

// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus range query response (matrix).
func (s *SampleStream) UnmarshalJSON(data []byte) error {
Expand Down
46 changes: 46 additions & 0 deletions internal/cortex/querier/queryrange/stats_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"context"

"github.com/thanos-io/thanos/internal/cortex/querier/stats"
)

type statsMiddleware struct {
next Handler
forceStats bool
}

func NewStatsMiddleware(forceStats bool) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return statsMiddleware{
next: next,
forceStats: forceStats,
}
})
}

func (s statsMiddleware) Do(ctx context.Context, r Request) (Response, error) {
if s.forceStats {
r = r.WithStats("all")
}
resp, err := s.next.Do(ctx, r)
if err != nil {
return resp, err
}

if resp.GetStats() != nil {
sts := stats.FromContext(ctx)
if sts != nil {
if sts.LoadPeakSamples() < resp.GetStats().Samples.PeakSamples {
sts.SetPeakSamples(resp.GetStats().Samples.PeakSamples)
}
sts.AddTotalSamples(resp.GetStats().Samples.TotalQueryableSamples)
}
}

return resp, err
}
101 changes: 101 additions & 0 deletions internal/cortex/querier/queryrange/stats_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/thanos-io/thanos/internal/cortex/querier/stats"
)

func Test_statsMiddleware_AddsHeaderWithStats(t *testing.T) {
t.Parallel()
tests := []struct {
name string
forceStats bool
peakSamples int32
totalSamples int64
}{
{
name: "With forceStats true",
forceStats: true,
peakSamples: 100,
totalSamples: 1000,
},
{
name: "With forceStats false",
forceStats: false,
peakSamples: 200,
totalSamples: 2000,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
fakeHandler := &fakeHandler{
response: &PrometheusResponse{
Status: "success",
Data: PrometheusData{
ResultType: "vector",
Result: []SampleStream{},
Stats: &PrometheusResponseStats{
Samples: &PrometheusResponseSamplesStats{
TotalQueryableSamples: tt.totalSamples,
PeakSamples: tt.peakSamples,
},
},
},
},
}

middleware := NewStatsMiddleware(tt.forceStats)
wrappedHandler := middleware.Wrap(fakeHandler)

origCtx := context.Background()
qryStats, ctx := stats.ContextWithEmptyStats(origCtx)

resp, err := wrappedHandler.Do(ctx, &PrometheusRequest{
Path: "/api/v1/query_range",
Start: 1536673680 * 1e3,
End: 1536716898 * 1e3,
Step: 120 * 1e3,
Query: "sum(container_memory_rss) by (namespace)",
Headers: []*PrometheusRequestHeader{
{
Name: "Accept",
Values: []string{"application/json"},
},
},
})
require.NoError(t, err)

if tt.forceStats {
require.Equal(t, fakeHandler.request.GetStats(), "all")
}

promResp, ok := resp.(*PrometheusResponse)
require.True(t, ok)

assert.Equal(t, qryStats.LoadPeakSamples(), tt.peakSamples)
assert.Equal(t, qryStats.LoadTotalSamples(), tt.totalSamples)
assert.Equal(t, promResp.Data.Stats.Samples.PeakSamples, tt.peakSamples)
assert.Equal(t, promResp.Data.Stats.Samples.TotalQueryableSamples, tt.totalSamples)
})
}
}

type fakeHandler struct {
request Request
response Response
}

func (f *fakeHandler) Do(ctx context.Context, r Request) (Response, error) {
f.request = r
return f.response, nil
}
32 changes: 32 additions & 0 deletions internal/cortex/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,38 @@ func (s *Stats) LoadFetchedChunkBytes() uint64 {
return atomic.LoadUint64(&s.FetchedChunkBytes)
}

func (s *Stats) SetPeakSamples(peakSamples int32) {
if s == nil {
return
}

atomic.StoreInt32(&s.PeakLoadedSamples, peakSamples)
}

func (s *Stats) LoadPeakSamples() int32 {
if s == nil {
return 0
}

return atomic.LoadInt32(&s.PeakLoadedSamples)
}

func (s *Stats) AddTotalSamples(totalSamples int64) {
if s == nil {
return
}

atomic.AddInt64(&s.TotalLoadedSamples, totalSamples)
}

func (s *Stats) LoadTotalSamples() int64 {
if s == nil {
return 0
}

return atomic.LoadInt64(&s.TotalLoadedSamples)
}

// Merge the provide Stats into this one.
func (s *Stats) Merge(other *Stats) {
if s == nil || other == nil {
Expand Down
Loading
Loading