diff --git a/integration/lokie2e/common_test.go b/integration/lokie2e/common_test.go index ee60d3fb..893ccc0b 100644 --- a/integration/lokie2e/common_test.go +++ b/integration/lokie2e/common_test.go @@ -117,34 +117,153 @@ func runTest(ctx context.Context, t *testing.T, provider *integration.Provider, } }) t.Run("LabelValues", func(t *testing.T) { - for labelName, labels := range set.Labels { - unique := map[string]struct{}{} - for _, t := range labels { - unique[t.Value] = struct{}{} - } - values := make([]string, 0, len(unique)) - for v := range unique { - values = append(values, v) + t.Run("All", func(t *testing.T) { + for labelName, labels := range set.Labels { + unique := map[string]struct{}{} + for _, t := range labels { + unique[t.Value] = struct{}{} + } + values := maps.Keys(unique) + slices.Sort(values) + + t.Run(labelName, func(t *testing.T) { + a := require.New(t) + + r, err := c.LabelValues(ctx, lokiapi.LabelValuesParams{ + Name: labelName, + // Always sending time range because default is current time. + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }) + a.NoError(err) + + a.Len(r.Data, len(values)) + requirex.Unique(t, r.Data) + requirex.Sorted(t, r.Data) + for _, val := range r.Data { + a.Containsf(values, val, "check label %q", labelName) + } + }) } - slices.Sort(values) - - t.Run(labelName, func(t *testing.T) { - a := require.New(t) - - r, err := c.LabelValues(ctx, lokiapi.LabelValuesParams{ - Name: labelName, - // Always sending time range because default is current time. + }) + for _, tt := range []struct { + name string + params lokiapi.LabelValuesParams + want []string + wantErr bool + }{ + { + "OneMatcher", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`{http_method="GET"}`), Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), - }) + }, + []string{"GET"}, + false, + }, + { + "AnotherLabel", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`{http_method="HEAD",http_status_code="500"}`), + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }, + []string{"HEAD"}, + false, + }, + { + "UnknownLabel", + lokiapi.LabelValuesParams{ + Name: "label_clearly_not_exist", + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }, + nil, + false, + }, + { + "UnknownValue", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`{http_method="clearly_not_exist"}`), + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }, + nil, + false, + }, + { + "NoMatch", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`{handler=~".+",clearly="not_exist"}`), + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }, + nil, + false, + }, + { + "OutOfRange", + lokiapi.LabelValuesParams{ + Name: "http_method", + Start: lokiapi.NewOptLokiTime(asLokiTime(10)), + End: lokiapi.NewOptLokiTime(asLokiTime(20)), + }, + nil, + false, + }, + { + "OutOfRangeWithQuery", + lokiapi.LabelValuesParams{ + Name: "http_method", + Start: lokiapi.NewOptLokiTime(asLokiTime(10)), + End: lokiapi.NewOptLokiTime(asLokiTime(20)), + }, + nil, + false, + }, + { + "InvalidSelector", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`\{\}`), + Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)), + End: lokiapi.NewOptLokiTime(asLokiTime(set.End)), + }, + nil, + true, + }, + { + "InvalidRange", + lokiapi.LabelValuesParams{ + Name: "http_method", + Query: lokiapi.NewOptString(`\{\}`), + Start: lokiapi.NewOptLokiTime(asLokiTime(20)), + End: lokiapi.NewOptLokiTime(asLokiTime(10)), + }, + nil, + true, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + a := require.New(t) + + r, err := c.LabelValues(ctx, tt.params) + if tt.wantErr { + var gotErr *lokiapi.ErrorStatusCode + a.ErrorAs(err, &gotErr) + return + } a.NoError(err) - a.Len(r.Data, len(values)) requirex.Unique(t, r.Data) requirex.Sorted(t, r.Data) - for _, val := range r.Data { - a.Containsf(values, val, "check label %q", labelName) - } + a.ElementsMatch(tt.want, r.Data) }) } }) diff --git a/internal/chstorage/querier_logs.go b/internal/chstorage/querier_logs.go index 158dd763..fd9075a2 100644 --- a/internal/chstorage/querier_logs.go +++ b/internal/chstorage/querier_logs.go @@ -126,6 +126,8 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto attribute.String("chstorage.label", labelName), xattribute.UnixNano("chstorage.range.start", opts.Start), xattribute.UnixNano("chstorage.range.end", opts.End), + attribute.Stringer("chstorage.matchers", opts.Query), + attribute.String("chstorage.table", table), ), ) @@ -140,6 +142,7 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto switch labelName { case logstorage.LabelBody, logstorage.LabelSpanID, logstorage.LabelTraceID: case logstorage.LabelSeverity: + // FIXME(tdakkota): do a proper query with filtering values = []string{ plog.SeverityNumberUnspecified.String(), plog.SeverityNumberTrace.String(), @@ -151,19 +154,23 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto } slices.Sort(values) default: - { - mapping, err := q.getLabelMapping(ctx, []string{labelName}) - if err != nil { - return nil, errors.Wrap(err, "get label mapping") - } - if key, ok := mapping[labelName]; ok { - labelName = key - } + queryLabels := make([]string, 1+len(opts.Query.Matchers)) + queryLabels = append(queryLabels, labelName) + for _, m := range opts.Query.Matchers { + queryLabels = append(queryLabels, string(m.Label)) } - var value proto.ColStr - if err := q.do(ctx, selectQuery{ - Query: chsql.Select(table, chsql.ResultColumn{ + mapping, err := q.getLabelMapping(ctx, queryLabels) + if err != nil { + return nil, errors.Wrap(err, "get label mapping") + } + if key, ok := mapping[labelName]; ok { + labelName = key + } + + var ( + value proto.ColStr + query = chsql.Select(table, chsql.ResultColumn{ Name: "value", Expr: chsql.ArrayJoin(chsql.Array( attrSelector(colAttrs, labelName), @@ -173,9 +180,20 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto Data: &value, }). Distinct(true). - Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)). - Order(chsql.Ident("value"), chsql.Asc). - Limit(1000), + Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)) + ) + for _, m := range opts.Query.Matchers { + expr, err := q.logQLLabelMatcher(m, mapping) + if err != nil { + return nil, err + } + query.Where(expr) + } + query.Order(chsql.Ident("value"), chsql.Asc). + Limit(1000) + + if err := q.do(ctx, selectQuery{ + Query: query, OnResult: func(ctx context.Context, block proto.Block) error { for i := 0; i < value.Rows(); i++ { if v := value.Row(i); v != "" { @@ -250,6 +268,9 @@ func (q *Querier) getLabelMapping(ctx context.Context, labels []string) (_ map[s }); err != nil { return nil, err } + span.AddEvent("mapping_fetched", trace.WithAttributes( + xattribute.StringMap("chstorage.mapping", out), + )) return out, nil } @@ -287,6 +308,10 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re defer func() { if rerr != nil { span.RecordError(rerr) + } else { + span.AddEvent("series_fetched", trace.WithAttributes( + attribute.Int("chstorage.total_series", len(result)), + )) } span.End() }() @@ -328,9 +353,9 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re attrStringMap(colScope), ), Data: series, - }).Where( - chsql.InTimeRange("timestamp", opts.Start, opts.End), - ) + }). + Distinct(true). + Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)) ) if sels := opts.Selectors; len(sels) > 0 { // Gather all labels for mapping fetch. @@ -368,6 +393,9 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re for i := 0; i < series.Rows(); i++ { s := make(map[string]string) forEachColMap(series, i, func(k, v string) { + if k == "" { + return + } s[otelstorage.KeyToLabel(k)] = v }) result = append(result, s) diff --git a/internal/chstorage/querier_logs_query.go b/internal/chstorage/querier_logs_query.go index 57537e02..72fabe5e 100644 --- a/internal/chstorage/querier_logs_query.go +++ b/internal/chstorage/querier_logs_query.go @@ -367,7 +367,7 @@ func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr, rerr error) { case logql.OpRe, logql.OpNotRe: return chsql.Match(chsql.Ident("body"), chsql.String(m.By.Value)), nil default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected line matcher op %v", m.Op) } } @@ -398,7 +398,7 @@ func (q *Querier) logQLLabelMatcher( chsql.String(m.Value), ), nil default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected label matcher op %v", m.Op) } } @@ -436,7 +436,7 @@ func (q *Querier) logQLLabelMatcher( } return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...)), nil default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected label matcher op %v", m.Op) } case logstorage.LabelBody: switch m.Op { @@ -445,7 +445,7 @@ func (q *Querier) logQLLabelMatcher( case logql.OpRe, logql.OpNotRe: return chsql.Match(chsql.Ident("body"), chsql.String(m.Value)), nil default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected label matcher op %v", m.Op) } case logstorage.LabelSpanID: return matchHex(chsql.Ident("span_id"), m) @@ -460,7 +460,7 @@ func (q *Querier) logQLLabelMatcher( case logql.OpRe, logql.OpNotRe: return chsql.Match(expr, chsql.String(m.Value)), nil default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected label matcher op %v", m.Op) } } @@ -482,7 +482,7 @@ func (q *Querier) logQLLabelMatcher( case logql.OpRe, logql.OpNotRe: sub = chsql.Match(selector, chsql.String(m.Value)) default: - return e, errors.Errorf("unexpected op %q", m.Op) + return e, errors.Errorf("unexpected label matcher op %v", m.Op) } exprs = append(exprs, sub) } diff --git a/internal/logstorage/logstorage.go b/internal/logstorage/logstorage.go index 917807d6..b0d30520 100644 --- a/internal/logstorage/logstorage.go +++ b/internal/logstorage/logstorage.go @@ -29,6 +29,8 @@ type LabelsOptions struct { // // Querier ignores parameter, if it is zero. End time.Time + // Selector that selects the streams to match. + Query logql.Selector } // SeriesOptions defines options for [Querier.Series] method. diff --git a/internal/lokihandler/lokihandler.go b/internal/lokihandler/lokihandler.go index 37477573..696411b2 100644 --- a/internal/lokihandler/lokihandler.go +++ b/internal/lokihandler/lokihandler.go @@ -23,8 +23,9 @@ import ( // LokiAPI implements lokiapi.Handler. type LokiAPI struct { - q logstorage.Querier - engine *logqlengine.Engine + q logstorage.Querier + engine *logqlengine.Engine + parseOpts logql.ParseOptions } var _ lokiapi.Handler = (*LokiAPI)(nil) @@ -34,6 +35,8 @@ func NewLokiAPI(q logstorage.Querier, engine *logqlengine.Engine) *LokiAPI { return &LokiAPI{ q: q, engine: engine, + // TODO(tdakkota): configure parse options. + parseOpts: logql.ParseOptions{}, } } @@ -64,9 +67,18 @@ func (h *LokiAPI) LabelValues(ctx context.Context, params lokiapi.LabelValuesPar return nil, validationErr(err, "parse time range") } + var sel logql.Selector + if q := params.Query.Or(""); q != "" { + sel, err = logql.ParseSelector(q, h.parseOpts) + if err != nil { + return nil, validationErr(err, "parse query") + } + } + iter, err := h.q.LabelValues(ctx, params.Name, logstorage.LabelsOptions{ Start: start, End: end, + Query: sel, }) if err != nil { return nil, executionErr(err, "get label values") @@ -230,7 +242,7 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok selectors := make([]logql.Selector, len(params.Match)) for i, m := range params.Match { - selectors[i], err = logql.ParseSelector(m, logql.ParseOptions{}) + selectors[i], err = logql.ParseSelector(m, h.parseOpts) if err != nil { return nil, validationErr(err, fmt.Sprintf("invalid match[%d]", i)) }