Skip to content

Commit

Permalink
fix(chstorage): properly handle matchers in LabelValues
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jun 10, 2024
1 parent 28b3a72 commit a130d78
Showing 1 changed file with 185 additions and 9 deletions.
194 changes: 185 additions & 9 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/promapi"
"github.com/go-faster/oteldb/internal/xattribute"
)

Expand All @@ -27,10 +28,10 @@ var _ storage.Queryable = (*Querier)(nil)
// Querier returns a new Querier on the storage.
func (q *Querier) Querier(mint, maxt int64) (storage.Querier, error) {
var minTime, maxTime time.Time
if mint > 0 {
if mint != promapi.MinTime.UnixMilli() {
minTime = time.UnixMilli(mint)
}
if maxt > 0 {
if maxt != promapi.MaxTime.UnixMilli() {
maxTime = time.UnixMilli(maxt)
}
return &promQuerier{
Expand Down Expand Up @@ -66,13 +67,46 @@ var _ storage.Querier = (*promQuerier)(nil)
// It is not safe to use the strings beyond the lifetime of the querier.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (result []string, _ annotations.Annotations, rerr error) {
func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (_ []string, _ annotations.Annotations, rerr error) {
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.LabelValues",
trace.WithAttributes(
attribute.String("chstorage.label", labelName),
xattribute.StringerSlice("chstorage.matchers", matchers),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

matchesOtherLabels := slices.ContainsFunc(matchers, func(m *labels.Matcher) bool {
return labelName != m.Name
})
if matchesOtherLabels {
r, err := p.getMatchingLabelValues(ctx, labelName, matchers...)
if err != nil {
return nil, nil, err
}
return r, nil, nil
}

r, err := p.getLabelValues(ctx, labelName, matchers...)
if err != nil {
return nil, nil, err
}
return r, nil, nil
}

func (p *promQuerier) getLabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (result []string, rerr error) {
table := p.tables.Labels

ctx, span := p.tracer.Start(ctx, "chstorage.metrics.LabelValues",
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.getLabelValues",
trace.WithAttributes(
attribute.String("chstorage.label", labelName),
xattribute.StringerSlice("chstorage.matchers", matchers),

attribute.String("chstorage.table", table),
),
)
Expand All @@ -95,13 +129,17 @@ func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matcher
Distinct(true).
Where(chsql.ColumnEq("name_normalized", labelName))
for _, m := range matchers {
if m.Name != labelName {
return nil, errors.Errorf("unexpected label matcher %s (label must be %q)", m, labelName)
}

expr, err := promQLLabelMatcher(
[]chsql.Expr{chsql.Ident("name_normalized")},
[]chsql.Expr{chsql.Ident(valueColumn)},
m.Type,
m.Value,
)
if err != nil {
return nil, nil, err
return nil, err
}
query.Where(expr)
}
Expand All @@ -115,14 +153,152 @@ func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matcher
return nil
},

Type: "LabelValues",
Type: "getLabelValues",
Signal: "metrics",
Table: table,
}); err != nil {
return nil, nil, err
return nil, err
}

return result, nil, nil
return result, nil
}

func (p *promQuerier) getMatchingLabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (_ []string, rerr error) {
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.getMatchingLabelValues",
trace.WithAttributes(
attribute.String("chstorage.label", labelName),
xattribute.StringerSlice("chstorage.matchers", matchers),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

mlabels := make([]string, 0, len(matchers))
for _, m := range matchers {
mlabels = append(mlabels, m.Name)
}
mapping, err := p.getLabelMapping(ctx, mlabels)
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}

query := func(ctx context.Context, table string) (result []string, rerr error) {
var columnExpr chsql.Expr
if labelName == labels.MetricName {
columnExpr = chsql.Ident("name_normalized")
} else {
columnExpr = firstAttrSelector(labelName)
}

var (
value proto.ColStr

query = chsql.Select(table, chsql.ResultColumn{
Name: "value",
Expr: columnExpr,
Data: &value,
}).
Distinct(true)
)
for _, m := range matchers {
selectors := []chsql.Expr{
chsql.Ident("name_normalized"),
}
if name := m.Name; name != labels.MetricName {
if mapped, ok := mapping[name]; ok {
name = mapped
}
selectors = []chsql.Expr{
attrSelector(colAttrs, name),
attrSelector(colScope, name),
attrSelector(colResource, name),
}
}

expr, err := promQLLabelMatcher(selectors, m.Type, m.Value)
if err != nil {
return nil, err
}
query.Where(expr)
}
{
columnExpr := chsql.ToUnixTimestamp64Nano(chsql.Ident("timestamp"))
if t := p.mint; !(t.IsZero() || t == promapi.MinTime) {
query.Where(
chsql.Gte(columnExpr, chsql.UnixNano(t)),
)
}
if t := p.maxt; !(t.IsZero() || t == promapi.MaxTime) {
query.Where(
chsql.Lte(columnExpr, chsql.UnixNano(t)),
)
}
}

if err := p.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < value.Rows(); i++ {
if v := value.Row(i); v != "" {
result = append(result, v)
}
}
return nil
},

Type: "getMatchingLabelValues",
Signal: "metrics",
Table: table,
}); err != nil {
return nil, err
}
query.Limit(1000)

return result, nil
}

var (
points []string
expHists []string
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
ctx := grpCtx
table := p.tables.Points

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query points series")
}
points = result

return nil
})
grp.Go(func() error {
ctx := grpCtx
table := p.tables.ExpHistograms

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query exponential histogram series")
}
expHists = result

return nil
})
if err := grp.Wait(); err != nil {
return nil, err
}

points = append(points, expHists...)
slices.Sort(points)
points = slices.Clip(points)

return points, nil
}

// LabelNames returns all the unique label names present in the block in sorted order.
Expand Down

0 comments on commit a130d78

Please sign in to comment.