Skip to content

Commit

Permalink
Merge pull request #431 from go-faster/perf/optimize-prom-series
Browse files Browse the repository at this point in the history
feat(promhandler): use optimized API for `Series` queries
  • Loading branch information
tdakkota authored Jun 18, 2024
2 parents b50f638 + cba958c commit 6570691
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 35 deletions.
6 changes: 6 additions & 0 deletions dev/local/ch-bench-read/testdata/bench-series.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
start: "2024-01-06T13:10:00Z"
end: "2024-01-06T14:02:45Z"
step: 15
series:
- title: All series
matchers: ['{job=~".+"}']
2 changes: 1 addition & 1 deletion internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (a *Attributes) Row(idx int) otelstorage.Attrs {

func attrsToLabels(m otelstorage.Attrs, to map[string]string) {
m.AsMap().Range(func(k string, v pcommon.Value) bool {
to[k] = v.Str()
to[otelstorage.KeyToLabel(k)] = v.Str()
return true
})
}
2 changes: 1 addition & 1 deletion internal/chstorage/chsql/_golden/Test1.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT DISTINCT (timestamp),(body),(attributes),(resource) FROM logs WHERE (toUnixTimestamp64Nano(timestamp) >= 17175110000000000 AND toUnixTimestamp64Nano(timestamp) <= 17176110000000000) AND (JSONExtract(attributes, 'label', 'String') = 'value' OR JSONExtract(resource, 'label', 'String') = 'value') AND (NOT (positionUTF8(body, 'line') > 0)) AND (hex(span_id) IN ('deaddead', 'aaaabbbb')) AND (hex(trace_id) = unhex('deaddead')) LIMIT 1000
SELECT DISTINCT timestamp,body,attributes,resource FROM logs WHERE (((toUnixTimestamp64Nano(timestamp) >= 17175110000000000) AND (toUnixTimestamp64Nano(timestamp) <= 17176110000000000))) AND (((JSONExtract(attributes, 'label', 'String') = 'value') OR (JSONExtract(resource, 'label', 'String') = 'value'))) AND (NOT ((positionUTF8(body, 'line') > 0))) AND ((hex(span_id) IN ('deaddead', 'aaaabbbb'))) AND ((hex(trace_id) = unhex('deaddead'))) LIMIT 1000
2 changes: 1 addition & 1 deletion internal/chstorage/chsql/_golden/Test2.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT (span_id) FROM (SELECT (span_id),(timestamp) FROM spans WHERE (true) AND (duration > 3.14) AND (duration < 3.14)) LIMIT 1
SELECT span_id FROM (SELECT span_id,timestamp FROM spans WHERE (true) AND ((duration > 3.14)) AND ((duration < 3.14))) LIMIT 1
2 changes: 1 addition & 1 deletion internal/chstorage/chsql/_golden/Test3.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT (timestamp) FROM spans ORDER BY timestamp DESC
SELECT timestamp FROM spans ORDER BY timestamp DESC
2 changes: 1 addition & 1 deletion internal/chstorage/chsql/_golden/Test4.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT (timestamp) FROM spans ORDER BY timestamp ASC,duration DESC
SELECT timestamp FROM spans ORDER BY timestamp ASC,duration DESC
2 changes: 1 addition & 1 deletion internal/chstorage/chsql/_golden/Test7.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT (column) FROM spans
SELECT column FROM spans
2 changes: 2 additions & 0 deletions internal/chstorage/chsql/chsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (p *Printer) WriteExpr(e Expr) error {
return errors.Errorf("binary expression must have at least two args, got %d", l)
}

p.OpenParen()
for i, arg := range e.args {
if i != 0 {
p.Ident(e.tok)
Expand All @@ -135,6 +136,7 @@ func (p *Printer) WriteExpr(e Expr) error {
return err
}
}
p.CloseParen()

return nil
case exprFunction:
Expand Down
2 changes: 0 additions & 2 deletions internal/chstorage/chsql/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,9 @@ func (q *SelectQuery) WriteSQL(p *Printer) error {
}
cexpr = aliasColumn(c.Name, cexpr)

p.OpenParen()
if err := p.WriteExpr(cexpr); err != nil {
return errors.Wrapf(err, "column %q", c.Name)
}
p.CloseParen()
}
p.From()
switch {
Expand Down
10 changes: 5 additions & 5 deletions internal/chstorage/chsql/sugar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func TestInTimeRange(t *testing.T) {
want string
}{
{"timestamp", time.Time{}, time.Time{}, "true"},
{"timestamp", time.Unix(0, 1), time.Time{}, "toUnixTimestamp64Nano(timestamp) >= 1"},
{"timestamp", time.Time{}, time.Unix(0, 10), "toUnixTimestamp64Nano(timestamp) <= 10"},
{"timestamp", time.Unix(0, 1), time.Unix(0, 10), "toUnixTimestamp64Nano(timestamp) >= 1 AND toUnixTimestamp64Nano(timestamp) <= 10"},
{"timestamp", time.Unix(0, 1), time.Time{}, "(toUnixTimestamp64Nano(timestamp) >= 1)"},
{"timestamp", time.Time{}, time.Unix(0, 10), "(toUnixTimestamp64Nano(timestamp) <= 10)"},
{"timestamp", time.Unix(0, 1), time.Unix(0, 10), "((toUnixTimestamp64Nano(timestamp) >= 1) AND (toUnixTimestamp64Nano(timestamp) <= 10))"},
}
for i, tt := range tests {
tt := tt
Expand All @@ -44,15 +44,15 @@ func TestJoinAnd(t *testing.T) {
Ident("foo"),
Ident("bar"),
},
"foo AND bar",
"(foo AND bar)",
},
{
[]Expr{
Ident("foo"),
Ident("bar"),
Ident("baz"),
},
"foo AND bar AND baz",
"(foo AND bar AND baz)",
},
}
for i, tt := range tests {
Expand Down
21 changes: 14 additions & 7 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type promQuerier struct {

var _ storage.Querier = (*promQuerier)(nil)

// Close releases the resources of the Querier.
func (p *promQuerier) Close() error {
return nil
}

func (p *promQuerier) getStart(t time.Time) time.Time {
switch {
case t.IsZero():
Expand Down Expand Up @@ -633,15 +638,18 @@ func (q *Querier) getMetricsLabelMapping(ctx context.Context, input []string) (_
return out, nil
}

// Close releases the resources of the Querier.
func (p *promQuerier) Close() error {
return nil
}

// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimizing select, but it's up to implementation how this is used if used at all.
func (p *promQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if hints != nil && hints.Func == "series" {
ss, err := p.selectOnlySeries(ctx, sortSeries, hints.Start, hints.End, matchers)
if err != nil {
return storage.ErrSeriesSet(err)
}
return ss
}

ss, err := p.selectSeries(ctx, sortSeries, hints, matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
Expand All @@ -659,7 +667,7 @@ type seriesKey struct {
func (p *promQuerier) selectSeries(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (_ storage.SeriesSet, rerr error) {
hints, start, end, queryLabels := p.extractHints(hints, matchers)

ctx, span := p.tracer.Start(ctx, "chstorage.metrics.SelectSeries",
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.selectSeries",
trace.WithAttributes(
attribute.Bool("promql.sort_series", sortSeries),
attribute.Int64("promql.hints.start", hints.Start),
Expand Down Expand Up @@ -962,7 +970,6 @@ func (p *promQuerier) queryExpHistograms(ctx context.Context, table string, quer
func buildPromLabels(lb *labels.ScratchBuilder, set map[string]string) labels.Labels {
lb.Reset()
for key, value := range set {
key = otelstorage.KeyToLabel(key)
lb.Add(key, value)
}
lb.Sort()
Expand Down
241 changes: 241 additions & 0 deletions internal/chstorage/querier_metrics_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package chstorage

import (
"context"
"slices"
"time"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/metricstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/promapi"
"github.com/go-faster/oteldb/internal/xattribute"
)

// OnlySeries selects only labels from series.
func (p *promQuerier) OnlySeries(ctx context.Context, sortSeries bool, startMs, endMs int64, matcherSets ...[]*labels.Matcher) storage.SeriesSet {
ss, err := p.selectOnlySeries(ctx, sortSeries, startMs, endMs, matcherSets...)
if err != nil {
return storage.ErrSeriesSet(err)
}
return ss
}

var _ metricstorage.OptimizedSeriesQuerier = (*promQuerier)(nil)

// OnlySeries selects only labels from series.
func (p *promQuerier) selectOnlySeries(
ctx context.Context,
sortSeries bool,
startMs, endMs int64,
matcherSets ...[]*labels.Matcher,
) (_ storage.SeriesSet, rerr error) {
var start, end time.Time
if ms := startMs; ms != promapi.MinTime.UnixMilli() {
start = p.getStart(time.UnixMilli(ms))
}
if ms := endMs; ms != promapi.MaxTime.UnixMilli() {
end = p.getEnd(time.UnixMilli(ms))
}

ctx, span := p.tracer.Start(ctx, "chstorage.metrics.selectOnlySeries",
trace.WithAttributes(
attribute.Bool("promql.sort_series", sortSeries),
attribute.Int64("promql.hints.start", startMs),
attribute.Int64("promql.hints.end", endMs),

xattribute.UnixNano("chstorage.range.start", start),
xattribute.UnixNano("chstorage.range.end", end),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

var queryLabels []string
for _, set := range matcherSets {
for _, m := range set {
queryLabels = append(queryLabels, m.Name)
}
}
mapping, err := p.getLabelMapping(ctx, queryLabels)
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}

query := func(ctx context.Context, table string) (result []onlyLabelsSeries, _ error) {
series := proto.ColMap[string, string]{
Keys: new(proto.ColStr),
Values: new(proto.ColStr),
}
query, err := p.buildSeriesQuery(
table,
chsql.ResultColumn{
Name: "series",
Expr: chsql.MapConcat(
chsql.Map(chsql.String("__name__"), chsql.Ident("name_normalized")),
attrStringMap(colAttrs),
attrStringMap(colResource),
attrStringMap(colScope),
),
Data: &series,
},
start, end,
matcherSets,
mapping,
)
if err != nil {
return nil, err
}

var (
dedup = map[string]string{}
lb labels.ScratchBuilder
)
if err := p.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < series.Rows(); i++ {
clear(dedup)
forEachColMap(&series, i, func(k, v string) {
dedup[otelstorage.KeyToLabel(k)] = v
})

lb.Reset()
for k, v := range dedup {
lb.Add(k, v)
}
lb.Sort()
result = append(result, onlyLabelsSeries{
labels: lb.Labels(),
})
}
return nil
},

Type: "QueryOnlySeries",
Signal: "metrics",
Table: table,
}); err != nil {
return nil, err
}
span.AddEvent("series_fetched", trace.WithAttributes(
attribute.String("chstorage.table", table),
attribute.Int("chstorage.total_series", len(result)),
))

return result, nil
}

var (
pointsSeries []onlyLabelsSeries
expHistSeries []onlyLabelsSeries
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
ctx := grpCtx
table := p.tables.Points

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query points")
}
pointsSeries = result
return nil
})
grp.Go(func() error {
ctx := grpCtx
table := p.tables.ExpHistograms

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query exponential histogram")
}
expHistSeries = result

return nil
})
if err := grp.Wait(); err != nil {
return nil, err
}

pointsSeries = append(pointsSeries, expHistSeries...)
if sortSeries {
slices.SortFunc(pointsSeries, func(a, b onlyLabelsSeries) int {
return labels.Compare(a.Labels(), b.Labels())
})
}
return newSeriesSet(pointsSeries), nil
}

func (p *promQuerier) buildSeriesQuery(
table string, column chsql.ResultColumn,
start, end time.Time,
matcherSets [][]*labels.Matcher,
mapping map[string]string,
) (*chsql.SelectQuery, error) {
query := chsql.Select(table, column).
Distinct(true).
Where(chsql.InTimeRange("timestamp", start, end))

sets := make([]chsql.Expr, 0, len(matcherSets))
for _, set := range matcherSets {
matchers := make([]chsql.Expr, 0, len(set))
for _, m := range set {
selectors := []chsql.Expr{
chsql.Ident("name_normalized"),
}
if name := m.Name; name != labels.MetricName {
if mapped, ok := mapping[name]; ok {
name = mapped
}
selectors = []chsql.Expr{
attrSelector(colAttrs, name),
attrSelector(colResource, name),
}
}

matcher, err := promQLLabelMatcher(selectors, m.Type, m.Value)
if err != nil {
return query, err
}
matchers = append(matchers, matcher)
}
sets = append(sets, chsql.JoinAnd(matchers...))
}

return query.
Where(chsql.JoinOr(sets...)).
Order(chsql.Ident("timestamp"), chsql.Asc), nil
}

type onlyLabelsSeries struct {
labels labels.Labels
}

var _ storage.Series = onlyLabelsSeries{}

// Labels returns the complete set of labels. For series it means all labels identifying the series.
func (s onlyLabelsSeries) Labels() labels.Labels {
return s.labels
}

// Iterator returns an iterator of the data of the series.
// The iterator passed as argument is for re-use, if not nil.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
func (onlyLabelsSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return chunkenc.NewNopIterator()
}
Loading

0 comments on commit 6570691

Please sign in to comment.