Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lokihandler): parse query parameter #429

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 140 additions & 21 deletions integration/lokie2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,153 @@ func runTest(ctx context.Context, t *testing.T, provider *integration.Provider,
}
})
t.Run("LabelValues", func(t *testing.T) {
for labelName, labels := range set.Labels {
unique := map[string]struct{}{}
for _, t := range labels {
unique[t.Value] = struct{}{}
}
values := make([]string, 0, len(unique))
for v := range unique {
values = append(values, v)
t.Run("All", func(t *testing.T) {
for labelName, labels := range set.Labels {
unique := map[string]struct{}{}
for _, t := range labels {
unique[t.Value] = struct{}{}
}
values := maps.Keys(unique)
slices.Sort(values)

t.Run(labelName, func(t *testing.T) {
a := require.New(t)

r, err := c.LabelValues(ctx, lokiapi.LabelValuesParams{
Name: labelName,
// Always sending time range because default is current time.
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
})
a.NoError(err)

a.Len(r.Data, len(values))
requirex.Unique(t, r.Data)
requirex.Sorted(t, r.Data)
for _, val := range r.Data {
a.Containsf(values, val, "check label %q", labelName)
}
})
}
slices.Sort(values)

t.Run(labelName, func(t *testing.T) {
a := require.New(t)

r, err := c.LabelValues(ctx, lokiapi.LabelValuesParams{
Name: labelName,
// Always sending time range because default is current time.
})
for _, tt := range []struct {
name string
params lokiapi.LabelValuesParams
want []string
wantErr bool
}{
{
"OneMatcher",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`{http_method="GET"}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
})
},
[]string{"GET"},
false,
},
{
"AnotherLabel",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`{http_method="HEAD",http_status_code="500"}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
},
[]string{"HEAD"},
false,
},
{
"UnknownLabel",
lokiapi.LabelValuesParams{
Name: "label_clearly_not_exist",
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
},
nil,
false,
},
{
"UnknownValue",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`{http_method="clearly_not_exist"}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
},
nil,
false,
},
{
"NoMatch",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`{handler=~".+",clearly="not_exist"}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
},
nil,
false,
},
{
"OutOfRange",
lokiapi.LabelValuesParams{
Name: "http_method",
Start: lokiapi.NewOptLokiTime(asLokiTime(10)),
End: lokiapi.NewOptLokiTime(asLokiTime(20)),
},
nil,
false,
},
{
"OutOfRangeWithQuery",
lokiapi.LabelValuesParams{
Name: "http_method",
Start: lokiapi.NewOptLokiTime(asLokiTime(10)),
End: lokiapi.NewOptLokiTime(asLokiTime(20)),
},
nil,
false,
},
{
"InvalidSelector",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`\{\}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
},
nil,
true,
},
{
"InvalidRange",
lokiapi.LabelValuesParams{
Name: "http_method",
Query: lokiapi.NewOptString(`\{\}`),
Start: lokiapi.NewOptLokiTime(asLokiTime(20)),
End: lokiapi.NewOptLokiTime(asLokiTime(10)),
},
nil,
true,
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
a := require.New(t)

r, err := c.LabelValues(ctx, tt.params)
if tt.wantErr {
var gotErr *lokiapi.ErrorStatusCode
a.ErrorAs(err, &gotErr)
return
}
a.NoError(err)

a.Len(r.Data, len(values))
requirex.Unique(t, r.Data)
requirex.Sorted(t, r.Data)
for _, val := range r.Data {
a.Containsf(values, val, "check label %q", labelName)
}
a.ElementsMatch(tt.want, r.Data)
})
}
})
Expand Down
62 changes: 45 additions & 17 deletions internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
attribute.String("chstorage.label", labelName),
xattribute.UnixNano("chstorage.range.start", opts.Start),
xattribute.UnixNano("chstorage.range.end", opts.End),
attribute.Stringer("chstorage.matchers", opts.Query),

attribute.String("chstorage.table", table),
),
)
Expand All @@ -140,6 +142,7 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
switch labelName {
case logstorage.LabelBody, logstorage.LabelSpanID, logstorage.LabelTraceID:
case logstorage.LabelSeverity:
// FIXME(tdakkota): do a proper query with filtering
values = []string{
plog.SeverityNumberUnspecified.String(),
plog.SeverityNumberTrace.String(),
Expand All @@ -151,19 +154,23 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
}
slices.Sort(values)
default:
{
mapping, err := q.getLabelMapping(ctx, []string{labelName})
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}
if key, ok := mapping[labelName]; ok {
labelName = key
}
queryLabels := make([]string, 1+len(opts.Query.Matchers))
queryLabels = append(queryLabels, labelName)
for _, m := range opts.Query.Matchers {
queryLabels = append(queryLabels, string(m.Label))
}

var value proto.ColStr
if err := q.do(ctx, selectQuery{
Query: chsql.Select(table, chsql.ResultColumn{
mapping, err := q.getLabelMapping(ctx, queryLabels)
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}
if key, ok := mapping[labelName]; ok {
labelName = key
}

var (
value proto.ColStr
query = chsql.Select(table, chsql.ResultColumn{
Name: "value",
Expr: chsql.ArrayJoin(chsql.Array(
attrSelector(colAttrs, labelName),
Expand All @@ -173,9 +180,20 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
Data: &value,
}).
Distinct(true).
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)).
Order(chsql.Ident("value"), chsql.Asc).
Limit(1000),
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End))
)
for _, m := range opts.Query.Matchers {
expr, err := q.logQLLabelMatcher(m, mapping)
if err != nil {
return nil, err
}
query.Where(expr)
}
query.Order(chsql.Ident("value"), chsql.Asc).
Limit(1000)

if err := q.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < value.Rows(); i++ {
if v := value.Row(i); v != "" {
Expand Down Expand Up @@ -250,6 +268,9 @@ func (q *Querier) getLabelMapping(ctx context.Context, labels []string) (_ map[s
}); err != nil {
return nil, err
}
span.AddEvent("mapping_fetched", trace.WithAttributes(
xattribute.StringMap("chstorage.mapping", out),
))

return out, nil
}
Expand Down Expand Up @@ -287,6 +308,10 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
span.AddEvent("series_fetched", trace.WithAttributes(
attribute.Int("chstorage.total_series", len(result)),
))
}
span.End()
}()
Expand Down Expand Up @@ -328,9 +353,9 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re
attrStringMap(colScope),
),
Data: series,
}).Where(
chsql.InTimeRange("timestamp", opts.Start, opts.End),
)
}).
Distinct(true).
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End))
)
if sels := opts.Selectors; len(sels) > 0 {
// Gather all labels for mapping fetch.
Expand Down Expand Up @@ -368,6 +393,9 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re
for i := 0; i < series.Rows(); i++ {
s := make(map[string]string)
forEachColMap(series, i, func(k, v string) {
if k == "" {
return
}
s[otelstorage.KeyToLabel(k)] = v
})
result = append(result, s)
Expand Down
12 changes: 6 additions & 6 deletions internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr, rerr error) {
case logql.OpRe, logql.OpNotRe:
return chsql.Match(chsql.Ident("body"), chsql.String(m.By.Value)), nil
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected line matcher op %v", m.Op)
}
}

Expand Down Expand Up @@ -398,7 +398,7 @@ func (q *Querier) logQLLabelMatcher(
chsql.String(m.Value),
), nil
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
}
}

Expand Down Expand Up @@ -436,7 +436,7 @@ func (q *Querier) logQLLabelMatcher(
}
return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...)), nil
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
}
case logstorage.LabelBody:
switch m.Op {
Expand All @@ -445,7 +445,7 @@ func (q *Querier) logQLLabelMatcher(
case logql.OpRe, logql.OpNotRe:
return chsql.Match(chsql.Ident("body"), chsql.String(m.Value)), nil
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
}
case logstorage.LabelSpanID:
return matchHex(chsql.Ident("span_id"), m)
Expand All @@ -460,7 +460,7 @@ func (q *Querier) logQLLabelMatcher(
case logql.OpRe, logql.OpNotRe:
return chsql.Match(expr, chsql.String(m.Value)), nil
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
}
}

Expand All @@ -482,7 +482,7 @@ func (q *Querier) logQLLabelMatcher(
case logql.OpRe, logql.OpNotRe:
sub = chsql.Match(selector, chsql.String(m.Value))
default:
return e, errors.Errorf("unexpected op %q", m.Op)
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
}
exprs = append(exprs, sub)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/logstorage/logstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type LabelsOptions struct {
//
// Querier ignores parameter, if it is zero.
End time.Time
// Selector that selects the streams to match.
Query logql.Selector
}

// SeriesOptions defines options for [Querier.Series] method.
Expand Down
Loading
Loading