diff --git a/.github/workflows/compliance.yml b/.github/workflows/compliance.yml index 8cb5a0bc..f39595ad 100644 --- a/.github/workflows/compliance.yml +++ b/.github/workflows/compliance.yml @@ -135,8 +135,9 @@ jobs: -end 1m -range 1m \ -config-file logql-test-queries.yml -config-file test-oteldb.yml \ -output-format json -output-file result.oteldb.json \ + -output-passing -output-unsupported \ -query-parallelism 2 \ - -target 82.97 || true + -target 81.98 - name: Upload artifact uses: actions/upload-artifact@v4 diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 90919a36..fc464af3 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -42,7 +42,7 @@ jobs: env: E2E: "1" TESTCONTAINERS_RYUK_DISABLED: "true" - run: go test -timeout 15m -race -v -coverpkg=./... -coverprofile=profile.out ./... + run: go test -timeout 15m -race -v -coverpkg=./... -coverprofile=profile.out ./integration/... - name: Emit coverage run: go tool cover -func profile.out @@ -71,4 +71,4 @@ jobs: - name: Send coverage uses: codecov/codecov-action@v4 with: - file: profile.out \ No newline at end of file + file: profile.out diff --git a/.golangci.yml b/.golangci.yml index f7b2ecb3..50abe889 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -41,6 +41,7 @@ linters-settings: - octalLiteral - typeDefFirst - unnamedResult + - ptrToRefParam linters: disable-all: true diff --git a/cmd/logql-compliance-tester/config.go b/cmd/logql-compliance-tester/config.go index 942ad50a..c983576c 100644 --- a/cmd/logql-compliance-tester/config.go +++ b/cmd/logql-compliance-tester/config.go @@ -28,6 +28,7 @@ type OutputConfig struct { OutputFile string OutputFormat string OutputPassing bool + OutputUnsupported bool MinimumPercentage float64 } @@ -41,6 +42,7 @@ type Flags struct { OutputFile string OutputFormat string OutputPassing bool + OutputUnsupported bool MinimumPercentage float64 } @@ -64,6 +66,7 @@ func (f *Flags) Register(set *flag.FlagSet) { set.StringVar(&f.OutputFile, "output-file", "", "Path to output file") set.StringVar(&f.OutputFormat, "output-format", "json", "The comparison output format. Valid values: [json]") set.BoolVar(&f.OutputPassing, "output-passing", false, "Whether to also include passing test cases in the output.") + set.BoolVar(&f.OutputUnsupported, "output-unsupported", false, "Whether to also include unsupported test cases in the output.") set.Float64Var(&f.MinimumPercentage, "target", math.NaN(), "Minimum compliance percentage") } @@ -95,6 +98,7 @@ func parseConfig() (c Config, _ error) { OutputFile: flags.OutputFile, OutputFormat: flags.OutputFormat, OutputPassing: flags.OutputPassing, + OutputUnsupported: flags.OutputUnsupported, MinimumPercentage: flags.MinimumPercentage, } diff --git a/cmd/logql-compliance-tester/main.go b/cmd/logql-compliance-tester/main.go index 22b3f747..c88d7723 100644 --- a/cmd/logql-compliance-tester/main.go +++ b/cmd/logql-compliance-tester/main.go @@ -39,6 +39,10 @@ func run(ctx context.Context) error { results = make([]*lokicompliance.Result, len(cfg.TestCases)) progressBar = pb.StartNew(len(results)) ) + // Progress bar messes up Github Actions logs, so keep it static. + if os.Getenv("GITHUB_ACTIONS") == "true" { + progressBar.Set(pb.Static, true) + } grp, grpCtx := errgroup.WithContext(ctx) if n := cfg.Parallelism; n > 0 { @@ -64,7 +68,7 @@ func run(ctx context.Context) error { if err := grp.Wait(); err != nil { return errors.Wrap(err, "run queries") } - progressBar.Finish() + progressBar.Finish().Write() return printOutput(results, cfg.Output) } diff --git a/cmd/logql-compliance-tester/output.go b/cmd/logql-compliance-tester/output.go index 721b5a24..5a0c3595 100644 --- a/cmd/logql-compliance-tester/output.go +++ b/cmd/logql-compliance-tester/output.go @@ -27,7 +27,7 @@ func printOutput(results []*lokicompliance.Result, cfg OutputConfig) error { _ = f.Close() }() - if err := outp(f, results, cfg.OutputPassing); err != nil { + if err := outp(f, results, cfg); err != nil { return err } } @@ -35,7 +35,7 @@ func printOutput(results []*lokicompliance.Result, cfg OutputConfig) error { return verifyTargetCompliance(results, cfg.MinimumPercentage) } -func outp(output io.Writer, results []*lokicompliance.Result, includePassing bool) error { +func outp(output io.Writer, results []*lokicompliance.Result, cfg OutputConfig) error { for _, r := range results { if d := r.Diff; d != "" && !r.Unsupported { fmt.Printf("%q:\n%s\n", r.TestCase.Query, d) @@ -44,25 +44,33 @@ func outp(output io.Writer, results []*lokicompliance.Result, includePassing boo // JSONResult is the JSON output format. type JSONResult struct { - TotalResults int `json:"totalResults"` - Results []*lokicompliance.Result `json:"results,omitempty"` - IncludePassing bool `json:"includePassing"` + TotalResults int `json:"totalResults"` + Results []*lokicompliance.Result `json:"results,omitempty"` + IncludePassing bool `json:"includePassing"` + IncludeUnsupported bool `json:"includeUnsupported"` } total := len(results) - if !includePassing { - var failed []*lokicompliance.Result + if !cfg.OutputPassing || !cfg.OutputUnsupported { + var filter []*lokicompliance.Result for _, r := range results { - if !r.Success() { - failed = append(failed, r) + if r.Success() && !cfg.OutputPassing { + // Exclude passing. + continue } + if r.Unsupported && !cfg.OutputUnsupported { + // Exclude unsupported. + continue + } + filter = append(filter, r) } - results = failed + results = filter } buf, err := json.MarshalIndent(JSONResult{ - TotalResults: total, - Results: results, - IncludePassing: includePassing, + TotalResults: total, + Results: results, + IncludePassing: cfg.OutputPassing, + IncludeUnsupported: cfg.OutputUnsupported, }, "", "\t") if err != nil { return err diff --git a/cmd/oteldb/app.go b/cmd/oteldb/app.go index 8f82daf7..823f5ac7 100644 --- a/cmd/oteldb/app.go +++ b/cmd/oteldb/app.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" + "github.com/go-faster/oteldb/internal/chstorage" "github.com/go-faster/oteldb/internal/httpmiddleware" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" @@ -175,11 +176,15 @@ func (app *App) trySetupLoki() error { cfg := app.cfg.LokiConfig cfg.setDefaults() + var optimizers []logqlengine.Optimizer + optimizers = append(optimizers, logqlengine.DefaultOptimizers()...) + optimizers = append(optimizers, &chstorage.ClickhouseOptimizer{}) engine := logqlengine.NewEngine(q, logqlengine.Options{ ParseOptions: logql.ParseOptions{ AllowDots: true, }, LookbackDuration: cfg.LookbackDelta, + Optimizers: optimizers, TracerProvider: app.metrics.TracerProvider(), }) loki := lokihandler.NewLokiAPI(q, engine) diff --git a/dev/local/ch-logql-compliance/logql-test-queries.yml b/dev/local/ch-logql-compliance/logql-test-queries.yml index ff6b4981..f68465f9 100644 --- a/dev/local/ch-logql-compliance/logql-test-queries.yml +++ b/dev/local/ch-logql-compliance/logql-test-queries.yml @@ -62,6 +62,14 @@ test_cases: variant_args: ["unwrapRangeAggOp", "range"] skip_comparison: true # It seems, there is some issues with unwrap. # Vector aggregation. + - query: |- + {{ .simpleVecAggOp }} by (filename) ( + {{ .simpleRangeAggOp }}( + {job="varlogs"} + [{{ .range }}] + ) + ) + variant_args: ["simpleVecAggOp", "simpleRangeAggOp", "range"] - query: |- {{ .simpleVecAggOp }} by (method) ( {{ .simpleRangeAggOp }}( diff --git a/integration/lokie2e/common_test.go b/integration/lokie2e/common_test.go index 7fd6e14d..0e627e28 100644 --- a/integration/lokie2e/common_test.go +++ b/integration/lokie2e/common_test.go @@ -19,6 +19,7 @@ import ( "github.com/go-faster/oteldb/integration" "github.com/go-faster/oteldb/integration/lokie2e" + "github.com/go-faster/oteldb/internal/chstorage" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" "github.com/go-faster/oteldb/internal/logstorage" @@ -53,9 +54,12 @@ func setupDB(ctx context.Context, t *testing.T, provider *integration.Provider, gold.Str(t, out.String(), "logs.yml") + var optimizers []logqlengine.Optimizer + optimizers = append(optimizers, logqlengine.DefaultOptimizers()...) + optimizers = append(optimizers, &chstorage.ClickhouseOptimizer{}) engine := logqlengine.NewEngine(engineQuerier, logqlengine.Options{ ParseOptions: logql.ParseOptions{AllowDots: true}, - OTELAdapter: true, + Optimizers: optimizers, TracerProvider: provider, }) @@ -203,8 +207,6 @@ func runTest(ctx context.Context, t *testing.T, provider *integration.Provider, // Negative line matcher. {`{http_method=~".+"} != "HEAD"`, len(set.Records) - 22}, {`{http_method=~".+"} !~ "HEAD"`, len(set.Records) - 22}, - // IP line filter. - {`{http_method="HEAD"} |= ip("236.7.233.166")`, 1}, // Trace to logs. {`{http_method=~".+"} |= "af36000000000000c517000000000003"`, 1}, @@ -328,6 +330,12 @@ type LogQueryTest struct { func testLogQuery(c *lokiapi.Client, params LogQueryTest) func(*testing.T) { return func(t *testing.T) { + defer func() { + if t.Failed() { + t.Logf("Query: %s", params.Query) + } + }() + ctx := context.Background() if d, ok := t.Deadline(); ok { @@ -370,9 +378,7 @@ func testLogQuery(c *lokiapi.Client, params LogQueryTest) func(*testing.T) { record, ok := params.Set.Records[pcommon.Timestamp(entry.T)] require.Truef(t, ok, "can't find log record %d", entry.T) - line := logqlengine.LineFromRecord( - logstorage.NewRecordFromOTEL(pcommon.NewResource(), pcommon.NewInstrumentationScope(), record), - ) + line := record.Body().AsString() assert.Equal(t, line, entry.V) labelSetHasAttrs(t, stream.Stream.Value, record.Attributes()) diff --git a/internal/chstorage/attributes.go b/internal/chstorage/attributes.go index 9addc9f8..c533288f 100644 --- a/internal/chstorage/attributes.go +++ b/internal/chstorage/attributes.go @@ -2,6 +2,7 @@ package chstorage import ( "fmt" + "strings" "github.com/ClickHouse/ch-go/proto" "github.com/go-faster/errors" @@ -156,6 +157,25 @@ func attrSelector(name, key string) string { ) } +func firstAttrSelector(label string) string { + quoted := singleQuoted(label) + var sb strings.Builder + sb.WriteString("coalesce(") + for i, column := range []string{ + colAttrs, + colScope, + colResource, + } { + if i != 0 { + sb.WriteString(",") + } + fmt.Fprintf(&sb, "JSONExtract(%s, %s, 'Nullable(String)')", column, quoted) + } + sb.WriteString(",''") + sb.WriteString(")") + return sb.String() +} + // Append adds a new map of attributes. func (a *Attributes) Append(kv otelstorage.Attrs) { a.Value.Append(kv) diff --git a/internal/chstorage/columns_logs.go b/internal/chstorage/columns_logs.go index 78b98bf0..b6d54d37 100644 --- a/internal/chstorage/columns_logs.go +++ b/internal/chstorage/columns_logs.go @@ -65,7 +65,7 @@ func setStrOrEmpty(col proto.ColumnOf[string], m pcommon.Map, k string) { col.Append(v.AsString()) } -func (c *logColumns) ForEach(f func(r logstorage.Record)) error { +func (c *logColumns) ForEach(f func(r logstorage.Record) error) error { for i := 0; i < c.timestamp.Rows(); i++ { r := logstorage.Record{ Timestamp: otelstorage.NewTimestampFromTime(c.timestamp.Row(i)), @@ -103,7 +103,9 @@ func (c *logColumns) ForEach(f func(r logstorage.Record)) error { // Default just to timestamp. r.ObservedTimestamp = r.Timestamp } - f(r) + if err := f(r); err != nil { + return err + } } return nil } diff --git a/internal/chstorage/querier_logs.go b/internal/chstorage/querier_logs.go index c2ea5110..ba709489 100644 --- a/internal/chstorage/querier_logs.go +++ b/internal/chstorage/querier_logs.go @@ -2,12 +2,8 @@ package chstorage import ( "context" - "encoding/hex" "fmt" - "regexp" "slices" - "strings" - "time" "github.com/ClickHouse/ch-go" "github.com/ClickHouse/ch-go/proto" @@ -15,11 +11,9 @@ import ( "github.com/go-faster/sdk/zctx" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "github.com/go-faster/oteldb/internal/iterators" - "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" "github.com/go-faster/oteldb/internal/logstorage" "github.com/go-faster/oteldb/internal/otelstorage" @@ -122,46 +116,6 @@ func (l *labelStaticIterator) Next(t *logstorage.Label) bool { func (l *labelStaticIterator) Err() error { return nil } func (l *labelStaticIterator) Close() error { return nil } -func (q *Querier) getLabelMapping(ctx context.Context, labels []string) (_ map[string]string, rerr error) { - ctx, span := q.tracer.Start(ctx, "getLabelMapping", - trace.WithAttributes( - attribute.Int("chstorage.labels_count", len(labels)), - ), - ) - defer func() { - if rerr != nil { - span.RecordError(rerr) - } - span.End() - }() - - out := make(map[string]string, len(labels)) - attrs := newLogAttrMapColumns() - var inputData proto.ColStr - for _, label := range labels { - inputData.Append(label) - } - if err := q.ch.Do(ctx, ch.Query{ - Logger: zctx.From(ctx).Named("ch"), - Result: attrs.Result(), - OnResult: func(ctx context.Context, block proto.Block) error { - attrs.ForEach(func(name, key string) { - out[name] = key - }) - return nil - }, - ExternalTable: "labels", - ExternalData: []proto.InputColumn{ - {Name: "name", Data: &inputData}, - }, - Body: fmt.Sprintf(`SELECT name, key FROM %[1]s WHERE name IN labels`, q.tables.LogAttrs), - }); err != nil { - return nil, errors.Wrap(err, "select") - } - - return out, nil -} - // LabelValues implements logstorage.Querier. func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logstorage.LabelsOptions) (_ iterators.Iterator[logstorage.Label], rerr error) { table := q.tables.Logs @@ -248,39 +202,10 @@ WHERE (toUnixTimestamp64Nano(timestamp) >= %d AND toUnixTimestamp64Nano(timestam }, nil } -// Capabilities implements logqlengine.Querier. -func (q *Querier) Capabilities() (caps logqlengine.QuerierCapabilities) { - caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) - caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) - return caps -} - -type logStaticIterator struct { - data []logstorage.Record -} - -func (l *logStaticIterator) Next(t *logstorage.Record) bool { - if len(l.data) == 0 { - return false - } - *t = l.data[0] - l.data = l.data[1:] - return true -} - -func (l *logStaticIterator) Err() error { return nil } -func (l *logStaticIterator) Close() error { return nil } - -// SelectLogs implements logqlengine.Querier. -func (q *Querier) SelectLogs(ctx context.Context, start, end otelstorage.Timestamp, direction logqlengine.Direction, params logqlengine.SelectLogsParams) (_ iterators.Iterator[logstorage.Record], rerr error) { - table := q.tables.Logs - - ctx, span := q.tracer.Start(ctx, "SelectLogs", +func (q *Querier) getLabelMapping(ctx context.Context, labels []string) (_ map[string]string, rerr error) { + ctx, span := q.tracer.Start(ctx, "getLabelMapping", trace.WithAttributes( - attribute.Int("chstorage.labels_count", len(params.Labels)), - attribute.Int64("chstorage.range.start", int64(start)), - attribute.Int64("chstorage.range.end", int64(end)), - attribute.String("chstorage.table", table), + attribute.Int("chstorage.labels_count", len(labels)), ), ) defer func() { @@ -290,209 +215,46 @@ func (q *Querier) SelectLogs(ctx context.Context, start, end otelstorage.Timesta span.End() }() - // Gather all labels for mapping fetch. - var labels []string - for _, m := range params.Labels { - labels = append(labels, string(m.Label)) - } - mapping, err := q.getLabelMapping(ctx, labels) - if err != nil { - return nil, errors.Wrap(err, "get label mapping") - } - - out := newLogColumns() - var query strings.Builder - query.WriteString("SELECT ") - for i, column := range out.StaticColumns() { - if i != 0 { - query.WriteByte(',') - } - query.WriteString(column) - } - fmt.Fprintf(&query, " FROM %s WHERE (toUnixTimestamp64Nano(timestamp) >= %d AND toUnixTimestamp64Nano(timestamp) <= %d)", table, start, end) - for _, m := range params.Labels { - labelName := string(m.Label) - if key, ok := mapping[labelName]; ok { - labelName = key - } - switch m.Op { - case logql.OpEq, logql.OpRe: - query.WriteString(" AND (") - case logql.OpNotEq, logql.OpNotRe: - query.WriteString(" AND NOT (") - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - switch labelName { - case logstorage.LabelTraceID: - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "trace_id = unhex(%s)", singleQuoted(m.Value)) - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(hex(trace_id), %s)", singleQuoted(m.Value)) - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - case logstorage.LabelSpanID: - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "span_id = unhex(%s)", singleQuoted(m.Value)) - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(hex(span_id), %s)", singleQuoted(m.Value)) - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - case logstorage.LabelSeverity: - switch m.Op { - case logql.OpEq, logql.OpNotEq: - // Direct comparison with severity number. - var severityNumber uint8 - for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ { - if strings.EqualFold(i.String(), m.Value) { - severityNumber = uint8(i) - break - } - } - fmt.Fprintf(&query, "severity_number = %d", severityNumber) - case logql.OpRe, logql.OpNotRe: - re, err := regexp.Compile(m.Value) - if err != nil { - return nil, errors.Wrap(err, "compile regex") - } - var matches []int - for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ { - for _, s := range []string{ - i.String(), - strings.ToLower(i.String()), - strings.ToUpper(i.String()), - } { - if re.MatchString(s) { - matches = append(matches, int(i)) - break - } - } - } - query.WriteString("severity_number IN (") - for i, v := range matches { - if i != 0 { - query.WriteByte(',') - } - fmt.Fprintf(&query, "%d", v) - } - query.WriteByte(')') - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - case logstorage.LabelBody: - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "positionUTF8(body, %s) > 0", singleQuoted(m.Value)) - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(body, %s)", singleQuoted(m.Value)) - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - case logstorage.LabelServiceName, logstorage.LabelServiceNamespace, logstorage.LabelServiceInstanceID: - // Materialized from resource.service.{name,namespace,instance.id}. - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "%s = %s", labelName, singleQuoted(m.Value)) - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(%s, %s)", labelName, singleQuoted(m.Value)) - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - default: - // Search in all attributes. - for i, column := range []string{ - colAttrs, - colResource, - colScope, - } { - if i != 0 { - query.WriteString(" OR ") - } - // TODO: how to match integers, booleans, floats, arrays? - - selector := attrSelector(column, labelName) - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "%s = %s", selector, singleQuoted(m.Value)) - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(%s, %s)", selector, singleQuoted(m.Value)) - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - } - } - query.WriteByte(')') - } - - for _, m := range params.Line { - switch m.Op { - case logql.OpEq, logql.OpRe: - query.WriteString(" AND (") - case logql.OpNotEq, logql.OpNotRe: - query.WriteString(" AND NOT (") - default: - return nil, errors.Errorf("unexpected op %q", m.Op) - } - - switch m.Op { - case logql.OpEq, logql.OpNotEq: - fmt.Fprintf(&query, "positionUTF8(body, %s) > 0", singleQuoted(m.Value)) - { - // HACK: check for special case of hex-encoded trace_id and span_id. - // Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`. - // TODO(ernado): also handle regex? - encoded := strings.ToLower(m.Value) - v, _ := hex.DecodeString(encoded) - switch len(v) { - case len(otelstorage.TraceID{}): - fmt.Fprintf(&query, " OR trace_id = unhex(%s)", singleQuoted(encoded)) - case len(otelstorage.SpanID{}): - fmt.Fprintf(&query, " OR span_id = unhex(%s)", singleQuoted(encoded)) - } - } - case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "match(body, %s)", singleQuoted(m.Value)) - } - query.WriteByte(')') - } - - query.WriteString(" ORDER BY timestamp ") - switch direction { - case logqlengine.DirectionBackward: - query.WriteString("DESC") - case logqlengine.DirectionForward: - query.WriteString("ASC") - default: - return nil, errors.Errorf("unexpected direction %q", direction) + out := make(map[string]string, len(labels)) + attrs := newLogAttrMapColumns() + var inputData proto.ColStr + for _, label := range labels { + inputData.Append(label) } - - var data []logstorage.Record - queryStartTime := time.Now() if err := q.ch.Do(ctx, ch.Query{ Logger: zctx.From(ctx).Named("ch"), - Body: query.String(), - Result: out.Result(), + Result: attrs.Result(), OnResult: func(ctx context.Context, block proto.Block) error { - if err := out.ForEach(func(r logstorage.Record) { - data = append(data, r) - }); err != nil { - return errors.Wrap(err, "for each") - } + attrs.ForEach(func(name, key string) { + out[name] = key + }) return nil }, + ExternalTable: "labels", + ExternalData: []proto.InputColumn{ + {Name: "name", Data: &inputData}, + }, + Body: fmt.Sprintf(`SELECT name, key FROM %[1]s WHERE name IN labels`, q.tables.LogAttrs), }); err != nil { return nil, errors.Wrap(err, "select") } - q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(), - metric.WithAttributes( - attribute.String("chstorage.table", table), - attribute.String("chstorage.signal", "logs"), - ), - ) - return &logStaticIterator{data: data}, nil + return out, nil +} + +func (q *Querier) getMaterializedLabelColumn(labelName string) (column string, isColumn bool) { + switch labelName { + case logstorage.LabelTraceID: + return "hex(trace_id)", true + case logstorage.LabelSpanID: + return "hex(span_id)", true + case logstorage.LabelSeverity: + return "severity_number", true + case logstorage.LabelBody: + return "body", true + case logstorage.LabelServiceName, logstorage.LabelServiceNamespace, logstorage.LabelServiceInstanceID: + return labelName, true + default: + return "", false + } } diff --git a/internal/chstorage/querier_logs_node.go b/internal/chstorage/querier_logs_node.go new file mode 100644 index 00000000..766dc12f --- /dev/null +++ b/internal/chstorage/querier_logs_node.go @@ -0,0 +1,95 @@ +package chstorage + +import ( + "context" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logstorage" +) + +var _ logqlengine.Querier = (*Querier)(nil) + +// Capabilities implements logqlengine.Querier. +func (q *Querier) Capabilities() (caps logqlengine.QuerierCapabilities) { + caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) + caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) + return caps +} + +// Query creates new [InputNode]. +func (q *Querier) Query(ctx context.Context, labels []logql.LabelMatcher) (logqlengine.PipelineNode, error) { + return &InputNode{ + Labels: labels, + q: q, + }, nil +} + +// InputNode rebuilds LogQL pipeline in as Clickhouse query. +type InputNode struct { + Labels []logql.LabelMatcher + Line []logql.LineFilter + q *Querier +} + +var _ logqlengine.PipelineNode = (*InputNode)(nil) + +// Traverse implements [logqlengine.Node]. +func (n *InputNode) Traverse(cb logqlengine.NodeVisitor) error { + return cb(n) +} + +// EvalPipeline implements [logqlengine.PipelineNode]. +func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalParams) (logqlengine.EntryIterator, error) { + q := LogsQuery[logqlengine.Entry]{ + Start: params.Start, + End: params.End, + Direction: params.Direction, + Labels: n.Labels, + Line: n.Line, + Mapper: entryMapper, + } + return q.Eval(ctx, n.q) +} + +func entryMapper(r logstorage.Record) (logqlengine.Entry, error) { + set := logqlengine.NewLabelSet() + e := logqlengine.Entry{ + Timestamp: r.Timestamp, + Line: r.Body, + Set: set, + } + set.SetFromRecord(r) + return e, nil +} + +// SamplingNode is a [MetricNode], which offloads sampling to Clickhouse +type SamplingNode struct { + Sampling SamplingOp + GroupingLabels []logql.Label + + Labels []logql.LabelMatcher + Line []logql.LineFilter + + q *Querier +} + +var _ logqlengine.SampleNode = (*SamplingNode)(nil) + +// Traverse implements [logqlengine.Node]. +func (n *SamplingNode) Traverse(cb logqlengine.NodeVisitor) error { + return cb(n) +} + +// EvalSample implements [logqlengine.SampleNode]. +func (n *SamplingNode) EvalSample(ctx context.Context, params logqlengine.EvalParams) (logqlengine.SampleIterator, error) { + q := SampleQuery{ + Start: params.Start, + End: params.End, + Sampling: n.Sampling, + GroupingLabels: n.GroupingLabels, + Labels: n.Labels, + Line: n.Line, + } + return q.Eval(ctx, n.q) +} diff --git a/internal/chstorage/querier_logs_optimizer.go b/internal/chstorage/querier_logs_optimizer.go new file mode 100644 index 00000000..9a5ff9d8 --- /dev/null +++ b/internal/chstorage/querier_logs_optimizer.go @@ -0,0 +1,172 @@ +package chstorage + +import ( + "context" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine" +) + +// ClickhouseOptimizer replaces LogQL engine execution +// nodes with optimzied Clickhouse queries. +type ClickhouseOptimizer struct{} + +var _ logqlengine.Optimizer = (*ClickhouseOptimizer)(nil) + +// Name returns optimizer name. +func (o *ClickhouseOptimizer) Name() string { + return "ClickhouseOptimizer" +} + +// Optimize implements [Optimizer]. +func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query) (logqlengine.Query, error) { + switch q := q.(type) { + case *logqlengine.LogQuery: + q.Root = o.optimizePipeline(q.Root) + case *logqlengine.MetricQuery: + if err := logqlengine.VisitNode(q.Root, func(n *logqlengine.SamplingNode) error { + n.Input = o.optimizePipeline(n.Input) + return nil + }); err != nil { + return nil, err + } + q.Root = o.optimizeSampling(q.Root, nil) + } + return q, nil +} + +func (o *ClickhouseOptimizer) optimizeSampling(n logqlengine.MetricNode, grouping []logql.Label) logqlengine.MetricNode { + switch n := n.(type) { + case *logqlengine.RangeAggregation: + sampleNode, ok := n.Input.(*logqlengine.SamplingNode) + if !ok { + return n + } + + // If it is possible to offload the pipeline to Clickhouse entirely + // preceding optimizer should replace node with [InputNode]. + pipelineNode, ok := sampleNode.Input.(*InputNode) + if !ok { + return n + } + + samplingOp, ok := getSamplingOp(n.Expr) + if !ok { + return n + } + + labels, ok := getGroupByLabels(n.Expr.Grouping) + switch { + case ok: + // Use grouping labels from range expression. + grouping = labels + case len(grouping) > 0: + // Use grouping labels from parent vector expression. + default: + return n + } + + n.Input = &SamplingNode{ + Sampling: samplingOp, + GroupingLabels: grouping, + Labels: pipelineNode.Labels, + Line: pipelineNode.Line, + q: pipelineNode.q, + } + return n + case *logqlengine.VectorAggregation: + if labels, ok := getGroupByLabels(n.Expr.Grouping); ok { + grouping = labels + n.Input = o.optimizeSampling(n.Input, grouping) + } + return n + case *logqlengine.LabelReplace: + n.Input = o.optimizeSampling(n.Input, grouping) + return n + case *logqlengine.LiteralBinOp: + n.Input = o.optimizeSampling(n.Input, grouping) + return n + case *logqlengine.BinOp: + n.Left = o.optimizeSampling(n.Left, grouping) + n.Right = o.optimizeSampling(n.Right, grouping) + return n + default: + return n + } +} + +func getGroupByLabels(g *logql.Grouping) ([]logql.Label, bool) { + if g == nil || g.Without || len(g.Labels) == 0 { + return nil, false + } + return g.Labels, true +} + +func getSamplingOp(e *logql.RangeAggregationExpr) (op SamplingOp, _ bool) { + if er := e.Range; er.Unwrap != nil || er.Offset != nil { + return op, false + } + switch e.Op { + case logql.RangeOpCount: + return CountSampling, true + case logql.RangeOpBytes: + return BytesSampling, true + default: + return op, false + } +} + +func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode) logqlengine.PipelineNode { + pn, ok := n.(*logqlengine.ProcessorNode) + if !ok { + return n + } + + sn, ok := pn.Input.(*InputNode) + if !ok { + // NOTE(tdakkota): this should not happen as long + // as there is only one possible node made by storage. + return n + } + + var ( + line []logql.LineFilter + skippedStages int + ) +stageLoop: + for _, stage := range pn.Pipeline { + switch stage := stage.(type) { + case *logql.LineFilter: + if stage.IP { + skippedStages++ + continue + } + // TODO(tdakkota): remove stages from pipeline. + line = append(line, *stage) + case *logql.JSONExpressionParser, + *logql.LogfmtExpressionParser, + *logql.RegexpLabelParser, + *logql.PatternLabelParser, + *logql.LabelFilter, + *logql.LabelFormatExpr, + *logql.DropLabelsExpr, + *logql.KeepLabelsExpr, + *logql.DistinctFilter: + // Do nothing on line, just skip. + skippedStages++ + case *logql.LineFormat, + *logql.DecolorizeExpr, + *logql.UnpackLabelParser: + // Stage modify the line, can't offload line filters after this stage. + skippedStages++ + break stageLoop + } + } + sn.Line = line + // Replace original node with [InputNode], since we can execute filtering entirely in + // Clickhouse. + if skippedStages == 0 && !pn.EnableOTELAdapter { + return sn + } + return n +} diff --git a/internal/chstorage/querier_logs_query.go b/internal/chstorage/querier_logs_query.go new file mode 100644 index 00000000..09a28b6e --- /dev/null +++ b/internal/chstorage/querier_logs_query.go @@ -0,0 +1,471 @@ +package chstorage + +import ( + "context" + "encoding/hex" + "fmt" + "regexp" + "strings" + "time" + + "github.com/ClickHouse/ch-go" + "github.com/ClickHouse/ch-go/proto" + "github.com/go-faster/errors" + "github.com/go-faster/sdk/zctx" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" + "github.com/go-faster/oteldb/internal/logstorage" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// LogsQuery defines a logs query. +type LogsQuery[E any] struct { + Start, End time.Time + Direction logqlengine.Direction + + Labels []logql.LabelMatcher + Line []logql.LineFilter + + Mapper func(logstorage.Record) (E, error) +} + +// Eval evaluates the query using given querier. +func (v *LogsQuery[E]) Eval(ctx context.Context, q *Querier) (_ iterators.Iterator[E], rerr error) { + table := q.tables.Logs + + ctx, span := q.tracer.Start(ctx, "LogsQuery", + trace.WithAttributes( + attribute.Int64("chstorage.range.start", v.Start.UnixNano()), + attribute.Int64("chstorage.range.end", v.End.UnixNano()), + attribute.String("chstorage.direction", string(v.Direction)), + attribute.String("chstorage.table", table), + ), + ) + defer func() { + if rerr != nil { + span.RecordError(rerr) + } + span.End() + }() + + // Gather all labels for mapping fetch. + var labels []string + for _, m := range v.Labels { + labels = append(labels, string(m.Label)) + } + mapping, err := q.getLabelMapping(ctx, labels) + if err != nil { + return nil, errors.Wrap(err, "get label mapping") + } + + out := newLogColumns() + var query strings.Builder + query.WriteString("SELECT ") + for i, column := range out.StaticColumns() { + if i != 0 { + query.WriteByte(',') + } + query.WriteString(column) + } + fmt.Fprintf(&query, "\nFROM %s WHERE\n", table) + + if err := (logQueryPredicates{ + Start: v.Start, + End: v.End, + Labels: v.Labels, + Line: v.Line, + }).write(&query, mapping); err != nil { + return nil, err + } + + query.WriteString(" ORDER BY timestamp ") + switch d := v.Direction; d { + case logqlengine.DirectionBackward: + query.WriteString("DESC") + case logqlengine.DirectionForward: + query.WriteString("ASC") + default: + return nil, errors.Errorf("unexpected direction %q", d) + } + + var ( + queryStartTime = time.Now() + data []E + ) + if err := q.ch.Do(ctx, ch.Query{ + Logger: zctx.From(ctx).Named("ch"), + Body: query.String(), + Result: out.Result(), + OnResult: func(ctx context.Context, block proto.Block) error { + if err := out.ForEach(func(r logstorage.Record) error { + e, err := v.Mapper(r) + if err != nil { + return err + } + data = append(data, e) + return nil + }); err != nil { + return errors.Wrap(err, "for each") + } + return rerr + }, + }); err != nil { + return nil, err + } + + q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(), + metric.WithAttributes( + attribute.String("chstorage.query_type", fmt.Sprintf("%T", v)), + attribute.String("chstorage.table", table), + attribute.String("chstorage.signal", "logs"), + ), + ) + return iterators.Slice(data), nil +} + +// SampleQuery defines a sample query. +type SampleQuery struct { + Start, End time.Time + + Sampling SamplingOp + GroupingLabels []logql.Label + + Labels []logql.LabelMatcher + Line []logql.LineFilter +} + +// sampleQueryColumns defines result columns of [SampleQuery]. +type sampleQueryColumns struct { + Timestamp proto.ColDateTime64 + Sample proto.ColFloat64 + Labels *proto.ColMap[string, string] +} + +func (c *sampleQueryColumns) Result() proto.Results { + return proto.Results{ + {Name: "timestamp", Data: &c.Timestamp}, + {Name: "sample", Data: &c.Sample}, + {Name: "labels", Data: c.Labels}, + } +} + +// Eval evaluates the query using given querier. +func (v *SampleQuery) Eval(ctx context.Context, q *Querier) (_ logqlengine.SampleIterator, rerr error) { + table := q.tables.Logs + + ctx, span := q.tracer.Start(ctx, "SampleQuery", + trace.WithAttributes( + attribute.Int64("chstorage.range.start", v.Start.UnixNano()), + attribute.Int64("chstorage.range.end", v.End.UnixNano()), + attribute.String("chstorage.sampling", v.Sampling.String()), + attribute.String("chstorage.table", table), + ), + ) + defer func() { + if rerr != nil { + span.RecordError(rerr) + } + span.End() + }() + + // Gather all labels for mapping fetch. + var labels []string + for _, m := range v.Labels { + labels = append(labels, string(m.Label)) + } + for _, l := range v.GroupingLabels { + labels = append(labels, string(l)) + } + mapping, err := q.getLabelMapping(ctx, labels) + if err != nil { + return nil, errors.Wrap(err, "get label mapping") + } + + var query strings.Builder + sampleExpr, err := getSampleExpr(v.Sampling) + if err != nil { + return nil, err + } + fmt.Fprintf(&query, "SELECT timestamp, toFloat64(%s) AS sample, map(\n", sampleExpr) + for i, key := range v.GroupingLabels { + label := string(key) + if key, ok := mapping[label]; ok { + label = key + } + + labelExpr, ok := q.getMaterializedLabelColumn(label) + if !ok { + labelExpr = firstAttrSelector(label) + } + + if i != 0 { + query.WriteString(",\n") + } + quotedLabel := singleQuoted(key) + fmt.Fprintf(&query, "%s, toString(%s)", quotedLabel, labelExpr) + } + query.WriteString("\n) AS labels\n") + + fmt.Fprintf(&query, "FROM %s WHERE\n", table) + pred := logQueryPredicates{ + Start: v.Start, + End: v.End, + Labels: v.Labels, + Line: v.Line, + } + if err := pred.write(&query, mapping); err != nil { + return nil, err + } + query.WriteString(`ORDER BY timestamp ASC;`) + + var ( + queryStartTime = time.Now() + result []logqlmetric.SampledEntry + + columns = sampleQueryColumns{ + Timestamp: proto.ColDateTime64{}, + Sample: proto.ColFloat64{}, + Labels: proto.NewMap( + new(proto.ColStr), + new(proto.ColStr), + ), + } + ) + if err := q.ch.Do(ctx, ch.Query{ + Logger: zctx.From(ctx).Named("ch"), + Body: query.String(), + Result: columns.Result(), + OnResult: func(ctx context.Context, block proto.Block) error { + for i := 0; i < columns.Timestamp.Rows(); i++ { + timestamp := columns.Timestamp.Row(i) + sample := columns.Sample.Row(i) + // FIXME(tdakkota): allocates unnecessary map. + labels := columns.Labels.Row(i) + + result = append(result, logqlmetric.SampledEntry{ + Timestamp: pcommon.NewTimestampFromTime(timestamp), + Sample: sample, + Set: logqlengine.AggregatedLabelsFromMap(labels), + }) + } + return nil + }, + }); err != nil { + return nil, err + } + + q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(), + metric.WithAttributes( + attribute.String("chstorage.query_type", fmt.Sprintf("%T", v)), + attribute.String("chstorage.table", table), + attribute.String("chstorage.signal", "logs"), + ), + ) + return iterators.Slice(result), nil +} + +// SamplingOp defines a sampler operation. +type SamplingOp int + +const ( + // CountSampling counts lines. + CountSampling SamplingOp = iota + 1 + // BytesSampling counts line lengths in bytes. + BytesSampling +) + +// String implments [fmt.Stringer]. +func (s SamplingOp) String() string { + switch s { + case CountSampling: + return "count" + case BytesSampling: + return "bytes" + default: + return fmt.Sprintf("unknown(%d)", int(s)) + } +} + +func getSampleExpr(op SamplingOp) (string, error) { + switch op { + case CountSampling: + return "1", nil + case BytesSampling: + return "length(body)", nil + default: + return "", errors.Errorf("unexpected sampling op: %v", op) + } +} + +// logQueryPredicates translates common predicates for log querying. +type logQueryPredicates struct { + Start, End time.Time + Labels []logql.LabelMatcher + Line []logql.LineFilter +} + +func (q logQueryPredicates) write( + query *strings.Builder, + mapping map[string]string, +) error { + fmt.Fprintf(query, "(toUnixTimestamp64Nano(timestamp) >= %d AND toUnixTimestamp64Nano(timestamp) <= %d)", + q.Start.UnixNano(), q.End.UnixNano()) + for _, m := range q.Labels { + labelName := string(m.Label) + if key, ok := mapping[labelName]; ok { + labelName = key + } + switch m.Op { + case logql.OpEq, logql.OpRe: + query.WriteString(" AND (") + case logql.OpNotEq, logql.OpNotRe: + query.WriteString(" AND NOT (") + default: + return errors.Errorf("unexpected op %q", m.Op) + } + switch labelName { + case logstorage.LabelTraceID: + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "trace_id = unhex(%s)", singleQuoted(m.Value)) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(hex(trace_id), %s)", singleQuoted(m.Value)) + default: + return errors.Errorf("unexpected op %q", m.Op) + } + case logstorage.LabelSpanID: + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "span_id = unhex(%s)", singleQuoted(m.Value)) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(hex(span_id), %s)", singleQuoted(m.Value)) + default: + return errors.Errorf("unexpected op %q", m.Op) + } + case logstorage.LabelSeverity: + switch m.Op { + case logql.OpEq, logql.OpNotEq: + // Direct comparison with severity number. + var severityNumber uint8 + for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ { + if strings.EqualFold(i.String(), m.Value) { + severityNumber = uint8(i) + break + } + } + fmt.Fprintf(query, "severity_number = %d", severityNumber) + case logql.OpRe, logql.OpNotRe: + re, err := regexp.Compile(m.Value) + if err != nil { + return errors.Wrap(err, "compile regex") + } + var matches []int + for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ { + for _, s := range []string{ + i.String(), + strings.ToLower(i.String()), + strings.ToUpper(i.String()), + } { + if re.MatchString(s) { + matches = append(matches, int(i)) + break + } + } + } + query.WriteString("severity_number IN (") + for i, v := range matches { + if i != 0 { + query.WriteByte(',') + } + fmt.Fprintf(query, "%d", v) + } + query.WriteByte(')') + default: + return errors.Errorf("unexpected op %q", m.Op) + } + case logstorage.LabelBody: + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.Value)) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.Value)) + default: + return errors.Errorf("unexpected op %q", m.Op) + } + case logstorage.LabelServiceName, logstorage.LabelServiceNamespace, logstorage.LabelServiceInstanceID: + // Materialized from resource.service.{name,namespace,instance.id}. + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "%s = %s", labelName, singleQuoted(m.Value)) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(%s, %s)", labelName, singleQuoted(m.Value)) + default: + return errors.Errorf("unexpected op %q", m.Op) + } + default: + // Search in all attributes. + for i, column := range []string{ + colAttrs, + colResource, + colScope, + } { + if i != 0 { + query.WriteString(" OR ") + } + // TODO: how to match integers, booleans, floats, arrays? + + selector := attrSelector(column, labelName) + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "%s = %s", selector, singleQuoted(m.Value)) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(%s, %s)", selector, singleQuoted(m.Value)) + default: + return errors.Errorf("unexpected op %q", m.Op) + } + } + } + query.WriteByte(')') + } + + for _, m := range q.Line { + switch m.Op { + case logql.OpEq, logql.OpRe: + query.WriteString(" AND (") + case logql.OpNotEq, logql.OpNotRe: + query.WriteString(" AND NOT (") + default: + return errors.Errorf("unexpected op %q", m.Op) + } + + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.Value)) + { + // HACK: check for special case of hex-encoded trace_id and span_id. + // Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`. + // TODO(ernado): also handle regex? + encoded := strings.ToLower(m.Value) + v, _ := hex.DecodeString(encoded) + switch len(v) { + case len(otelstorage.TraceID{}): + fmt.Fprintf(query, " OR trace_id = unhex(%s)", singleQuoted(encoded)) + case len(otelstorage.SpanID{}): + fmt.Fprintf(query, " OR span_id = unhex(%s)", singleQuoted(encoded)) + } + } + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.Value)) + } + query.WriteByte(')') + } + return nil +} diff --git a/internal/logql/logqlengine/aggregated_labels.go b/internal/logql/logqlengine/aggregated_labels.go index 02f3e95d..12fe213d 100644 --- a/internal/logql/logqlengine/aggregated_labels.go +++ b/internal/logql/logqlengine/aggregated_labels.go @@ -25,8 +25,9 @@ type labelEntry struct { value string } -func newAggregatedLabels(set LabelSet, by, without map[string]struct{}) *aggregatedLabels { - labels := make([]labelEntry, 0, len(set.labels)) +// AggregatedLabelsFromSet creates new [logqlmetric.AggregatedLabels] from [LabelSet]. +func AggregatedLabelsFromSet(set LabelSet, by, without map[string]struct{}) logqlmetric.AggregatedLabels { + labels := make([]labelEntry, 0, set.Len()) set.Range(func(l logql.Label, v pcommon.Value) { labels = append(labels, labelEntry{ name: string(l), @@ -34,10 +35,7 @@ func newAggregatedLabels(set LabelSet, by, without map[string]struct{}) *aggrega }) }) slices.SortFunc(labels, func(a, b labelEntry) int { - if c := cmp.Compare(a.name, b.name); c != 0 { - return c - } - return cmp.Compare(a.value, b.value) + return cmp.Compare(a.name, b.name) }) return &aggregatedLabels{ @@ -47,6 +45,23 @@ func newAggregatedLabels(set LabelSet, by, without map[string]struct{}) *aggrega } } +// AggregatedLabelsFromMap creates new [logqlmetric.AggregatedLabels] from label map. +func AggregatedLabelsFromMap(m map[string]string) logqlmetric.AggregatedLabels { + labels := make([]labelEntry, 0, len(m)) + for key, value := range m { + labels = append(labels, labelEntry{name: key, value: value}) + } + slices.SortFunc(labels, func(a, b labelEntry) int { + return cmp.Compare(a.name, b.name) + }) + + return &aggregatedLabels{ + entries: labels, + without: nil, + by: nil, + } +} + // By returns new set of labels containing only given list of labels. func (a *aggregatedLabels) By(labels ...logql.Label) logqlmetric.AggregatedLabels { if len(labels) == 0 { diff --git a/internal/logql/logqlengine/aggregated_labels_test.go b/internal/logql/logqlengine/aggregated_labels_test.go index 04fadacf..86505f2d 100644 --- a/internal/logql/logqlengine/aggregated_labels_test.go +++ b/internal/logql/logqlengine/aggregated_labels_test.go @@ -60,7 +60,7 @@ func TestAggregatedLabels(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() for k, v := range tt.set { set.Set(logql.Label(k), pcommon.NewValueStr(v)) } @@ -72,7 +72,7 @@ func TestAggregatedLabels(t *testing.T) { { "ByThenWithout", func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - var labels logqlmetric.AggregatedLabels = newAggregatedLabels( + var labels logqlmetric.AggregatedLabels = AggregatedLabelsFromSet( set, nil, nil, @@ -89,7 +89,7 @@ func TestAggregatedLabels(t *testing.T) { { "WithoutThenBy", func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - var labels logqlmetric.AggregatedLabels = newAggregatedLabels( + var labels logqlmetric.AggregatedLabels = AggregatedLabelsFromSet( set, nil, nil, @@ -106,7 +106,7 @@ func TestAggregatedLabels(t *testing.T) { { "Constructor", func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - return newAggregatedLabels( + return AggregatedLabelsFromSet( set, buildSet(nil, by...), buildSet(nil, without...), @@ -129,7 +129,7 @@ func TestAggregatedLabels(t *testing.T) { } func TestEmptyAggregatedLabels(t *testing.T) { - al := newAggregatedLabels(LabelSet{}, nil, nil) + al := AggregatedLabelsFromSet(LabelSet{}, nil, nil) el := logqlmetric.EmptyAggregatedLabels() require.Equal(t, el.Key(), al.Key()) } diff --git a/internal/logql/logqlengine/decolorize_test.go b/internal/logql/logqlengine/decolorize_test.go index 06872be1..2321908e 100644 --- a/internal/logql/logqlengine/decolorize_test.go +++ b/internal/logql/logqlengine/decolorize_test.go @@ -26,7 +26,7 @@ func TestDecolorize(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() f, err := buildDecolorize(&logql.DecolorizeExpr{}) require.NoError(t, err) diff --git a/internal/logql/logqlengine/drop_test.go b/internal/logql/logqlengine/drop_test.go index f24a3fd1..fcee40d1 100644 --- a/internal/logql/logqlengine/drop_test.go +++ b/internal/logql/logqlengine/drop_test.go @@ -87,7 +87,7 @@ func TestDropLabels(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input newLine, ok := e.Process(0, ``, set) // Ensure that processor does not change the line. diff --git a/internal/logql/logqlengine/engine.go b/internal/logql/logqlengine/engine.go index 74b44ce0..945f6ec3 100644 --- a/internal/logql/logqlengine/engine.go +++ b/internal/logql/logqlengine/engine.go @@ -3,20 +3,14 @@ package logqlengine import ( "context" - "fmt" - "strconv" "time" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "github.com/go-faster/errors" "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" - "github.com/go-faster/oteldb/internal/lokiapi" - "github.com/go-faster/oteldb/internal/otelstorage" ) // Engine is a LogQL evaluation engine. @@ -28,6 +22,8 @@ type Engine struct { otelAdapter bool parseOpts logql.ParseOptions + optimizers []Optimizer + tracer trace.Tracer } @@ -44,6 +40,9 @@ type Options struct { // OTELAdapter enables 'otel adapter' whatever it is. OTELAdapter bool + // Optimizers defines a list of optimiziers to use. + Optimizers []Optimizer + // TracerProvider provides OpenTelemetry tracer for this engine. TracerProvider trace.TracerProvider } @@ -52,6 +51,9 @@ func (o *Options) setDefaults() { if o.LookbackDuration >= 0 { o.LookbackDuration = -30 * time.Second } + if o.Optimizers == nil { + o.Optimizers = DefaultOptimizers() + } if o.TracerProvider == nil { o.TracerProvider = otel.GetTracerProvider() } @@ -67,158 +69,40 @@ func NewEngine(querier Querier, opts Options) *Engine { lookbackDuration: opts.LookbackDuration, otelAdapter: opts.OTELAdapter, parseOpts: opts.ParseOptions, + optimizers: opts.Optimizers, tracer: opts.TracerProvider.Tracer("logql.Engine"), } } -// EvalParams sets evaluation parameters. -type EvalParams struct { - Start otelstorage.Timestamp - End otelstorage.Timestamp - Step time.Duration - Direction Direction // forward, backward - Limit int -} - -// IsInstant whether query is instant. -func (p EvalParams) IsInstant() bool { - return p.Start == p.End && p.Step == 0 -} - -// Direction describe log ordering. -type Direction string - -const ( - // DirectionBackward sorts records in descending order. - DirectionBackward Direction = "backward" - // DirectionForward sorts records in ascending order. - DirectionForward Direction = "forward" -) +// NewQuery creates new [Query]. +func (e *Engine) NewQuery(ctx context.Context, query string) (Query, error) { + expr, err := logql.Parse(query, e.parseOpts) + if err != nil { + return nil, errors.Wrap(err, "parse") + } -// Eval parses and evaluates query. -func (e *Engine) Eval(ctx context.Context, query string, params EvalParams) (data lokiapi.QueryResponseData, rerr error) { - ctx, span := e.tracer.Start(ctx, "Eval", - trace.WithAttributes( - attribute.String("logql.query", query), - attribute.Int64("logql.start", int64(params.Start)), - attribute.Int64("logql.end", int64(params.End)), - attribute.Int64("logql.step", int64(params.Step)), - attribute.String("logql.direction", string(params.Direction)), - attribute.Int("logql.limit", params.Limit), - ), - ) - defer func() { - if rerr != nil { - span.RecordError(rerr) - } else if streams, ok := data.GetStreamsResult(); ok { - var entries int - for _, stream := range streams.Result { - entries += len(stream.Values) - } - span.SetAttributes( - attribute.Int("logql.returned_entries", entries), - attribute.Int("logql.returned_streams", len(streams.Result)), - ) - } - span.End() - }() + q, err := e.buildQuery(ctx, expr) + if err != nil { + return nil, err + } - expr, err := logql.Parse(query, e.parseOpts) + q, err = e.applyOptimizers(ctx, q) if err != nil { - return data, errors.Wrap(err, "parse") + return nil, errors.Wrap(err, "optimize") } - return e.evalExpr(ctx, expr, params) + return q, nil } -func (e *Engine) evalExpr(ctx context.Context, expr logql.Expr, params EvalParams) (data lokiapi.QueryResponseData, _ error) { - ctx, span := e.tracer.Start(ctx, "evalExpr", - trace.WithAttributes( - attribute.String("logql.expr", fmt.Sprintf("%T", expr)), - ), - ) - defer span.End() +func (e *Engine) buildQuery(ctx context.Context, expr logql.Expr) (Query, error) { switch expr := logql.UnparenExpr(expr).(type) { case *logql.LogExpr: - streams, err := e.evalLogExpr(ctx, expr, params) - if err != nil { - return data, errors.Wrap(err, "evaluate log query") - } - - data.SetStreamsResult(lokiapi.StreamsResult{ - Result: streams, - }) - return data, nil + return e.buildLogQuery(ctx, expr) case *logql.LiteralExpr: - return e.evalLiteral(expr, params), nil + return &LiteralQuery{Value: expr.Value}, nil case logql.MetricExpr: - iter, err := logqlmetric.Build(expr, e.sampleSelector(ctx, params), logqlmetric.EvalParams{ - // NOTE(tdakkota): for some reason, timestamps in Loki appear truncated by step. - // Do the same thing. - Start: params.Start.AsTime().Truncate(params.Step), - End: params.End.AsTime().Truncate(params.Step).Add(params.Step), - Step: params.Step, - }) - if err != nil { - return data, errors.Wrap(err, "build metric query") - } - - data, err = logqlmetric.ReadStepResponse(iter, params.IsInstant()) - if err != nil { - return data, errors.Wrap(err, "evaluate metric query") - } - - return data, nil + return e.buildMetricQuery(ctx, expr) default: - return data, errors.Errorf("unexpected expression %T", expr) - } -} - -func addDuration(ts otelstorage.Timestamp, d time.Duration) otelstorage.Timestamp { - return otelstorage.NewTimestampFromTime(ts.AsTime().Add(d)) -} - -func (e *Engine) evalLiteral(expr *logql.LiteralExpr, params EvalParams) (data lokiapi.QueryResponseData) { - if params.IsInstant() { - data.SetScalarResult(lokiapi.ScalarResult{ - Result: lokiapi.FPoint{ - T: getPrometheusTimestamp(params.Start.AsTime()), - V: strconv.FormatFloat(expr.Value, 'f', -1, 64), - }, - }) - return data + return nil, errors.Errorf("unexpected expression %T", expr) } - data.SetMatrixResult(lokiapi.MatrixResult{ - Result: generateLiteralMatrix(expr.Value, params), - }) - return data -} - -func generateLiteralMatrix(value float64, params EvalParams) lokiapi.Matrix { - var ( - start = params.Start.AsTime() - end = params.End.AsTime() - - series = lokiapi.Series{ - Metric: lokiapi.NewOptLabelSet(lokiapi.LabelSet{}), - } - ) - - var ( - until = end.Add(params.Step) - strValue = strconv.FormatFloat(value, 'f', -1, 64) - ) - for ts := start; !ts.After(until); ts = ts.Add(params.Step) { - series.Values = append(series.Values, lokiapi.FPoint{ - T: getPrometheusTimestamp(ts), - V: strValue, - }) - } - - return lokiapi.Matrix{series} -} - -func getPrometheusTimestamp(t time.Time) float64 { - // Pass milliseconds as fraction part. - return float64(t.UnixMilli()) / 1000 } diff --git a/internal/logql/logqlengine/engine_literal_query.go b/internal/logql/logqlengine/engine_literal_query.go new file mode 100644 index 00000000..33faecf4 --- /dev/null +++ b/internal/logql/logqlengine/engine_literal_query.go @@ -0,0 +1,62 @@ +package logqlengine + +import ( + "context" + "strconv" + "time" + + "github.com/go-faster/oteldb/internal/lokiapi" +) + +// LiteralQuery is simple literal expression query. +type LiteralQuery struct { + Value float64 +} + +var _ Query = (*LiteralQuery)(nil) + +// Eval implements [Query]. +func (q *LiteralQuery) Eval(ctx context.Context, params EvalParams) (data lokiapi.QueryResponseData, _ error) { + if params.IsInstant() { + data.SetScalarResult(lokiapi.ScalarResult{ + Result: lokiapi.FPoint{ + T: getPrometheusTimestamp(params.Start), + V: strconv.FormatFloat(q.Value, 'f', -1, 64), + }, + }) + return data, nil + } + data.SetMatrixResult(lokiapi.MatrixResult{ + Result: generateLiteralMatrix(q.Value, params), + }) + return data, nil +} + +func generateLiteralMatrix(value float64, params EvalParams) lokiapi.Matrix { + var ( + start = params.Start + end = params.End + + series = lokiapi.Series{ + Metric: lokiapi.NewOptLabelSet(lokiapi.LabelSet{}), + } + ) + + var ( + until = end.Add(params.Step) + strValue = strconv.FormatFloat(value, 'f', -1, 64) + ) + for ts := start; !ts.After(until); ts = ts.Add(params.Step) { + series.Values = append(series.Values, lokiapi.FPoint{ + T: getPrometheusTimestamp(ts), + V: strValue, + }) + } + + return lokiapi.Matrix{series} +} + +func getPrometheusTimestamp(t time.Time) float64 { + // Pass milliseconds as fraction part. + return float64(t.UnixMilli()) / 1000 +} diff --git a/internal/logql/logqlengine/engine_log_query.go b/internal/logql/logqlengine/engine_log_query.go new file mode 100644 index 00000000..02695e08 --- /dev/null +++ b/internal/logql/logqlengine/engine_log_query.go @@ -0,0 +1,218 @@ +package logqlengine + +import ( + "cmp" + "context" + "slices" + "time" + + "github.com/go-faster/errors" + "golang.org/x/exp/maps" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/lokiapi" +) + +// LogQuery represents a log query. +type LogQuery struct { + Root PipelineNode + LookbackDuration time.Duration +} + +var _ Query = (*LogQuery)(nil) + +func (e *Engine) buildLogQuery(ctx context.Context, expr *logql.LogExpr) (Query, error) { + root, err := e.buildPipelineNode(ctx, expr.Sel, expr.Pipeline) + if err != nil { + return nil, err + } + + return &LogQuery{ + Root: root, + LookbackDuration: e.lookbackDuration, + }, nil +} + +// Eval implements [Query]. +func (q *LogQuery) Eval(ctx context.Context, params EvalParams) (data lokiapi.QueryResponseData, _ error) { + // Instant query, sub lookback duration from Start. + if params.IsInstant() { + params.Start = params.Start.Add(q.LookbackDuration) + } + if params.Direction == "" { + params.Direction = DirectionForward + } + + streams, err := q.eval(ctx, params) + if err != nil { + return data, errors.Wrap(err, "evaluate log query") + } + + data.SetStreamsResult(lokiapi.StreamsResult{ + Result: streams, + }) + return data, nil +} + +func (q *LogQuery) eval(ctx context.Context, params EvalParams) (data lokiapi.Streams, _ error) { + iter, err := q.Root.EvalPipeline(ctx, params) + if err != nil { + return data, err + } + defer func() { + _ = iter.Close() + }() + + streams, err := groupEntries(iter) + if err != nil { + return data, err + } + + return streams, nil +} + +func groupEntries(iter EntryIterator) (s lokiapi.Streams, _ error) { + var ( + e Entry + streams = map[string]lokiapi.Stream{} + ) + for iter.Next(&e) { + // FIXME(tdakkota): allocates a string for every record. + key := e.Set.String() + stream, ok := streams[key] + if !ok { + stream = lokiapi.Stream{ + Stream: lokiapi.NewOptLabelSet(e.Set.AsLokiAPI()), + } + } + stream.Values = append(stream.Values, lokiapi.LogEntry{T: uint64(e.Timestamp), V: e.Line}) + streams[key] = stream + } + if err := iter.Err(); err != nil { + return s, err + } + + result := maps.Values(streams) + for _, stream := range result { + slices.SortFunc(stream.Values, func(a, b lokiapi.LogEntry) int { + return cmp.Compare(a.T, b.T) + }) + } + return result, nil +} + +// ProcessorNode implements [PipelineNode]. +type ProcessorNode struct { + Input PipelineNode + Prefilter Processor + Selector logql.Selector + Pipeline []logql.PipelineStage + EnableOTELAdapter bool +} + +var _ PipelineNode = (*ProcessorNode)(nil) + +func (e *Engine) buildPipelineNode(ctx context.Context, sel logql.Selector, stages []logql.PipelineStage) (PipelineNode, error) { + cond, err := extractQueryConditions(e.querierCaps, sel) + if err != nil { + return nil, errors.Wrap(err, "extract preconditions") + } + + input, err := e.querier.Query(ctx, sel.Matchers) + if err != nil { + return nil, errors.Wrap(err, "create input node") + } + + if p := cond.prefilter; (p == nil || p == NopProcessor) && len(stages) == 0 && !e.otelAdapter { + // Empty processing pipeline, get data directly from storage. + return input, nil + } + + return &ProcessorNode{ + Input: input, + Prefilter: cond.prefilter, + Selector: sel, + Pipeline: stages, + EnableOTELAdapter: e.otelAdapter, + }, nil +} + +// Traverse implements [Node]. +func (n *ProcessorNode) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalPipeline implements [PipelineNode]. +func (n *ProcessorNode) EvalPipeline(ctx context.Context, params EvalParams) (_ EntryIterator, rerr error) { + pipeline, err := BuildPipeline(n.Pipeline...) + if err != nil { + return nil, errors.Wrap(err, "build pipeline") + } + + iter, err := n.Input.EvalPipeline(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(iter, &rerr) + + return &entryIterator{ + iter: iter, + prefilter: n.Prefilter, + pipeline: pipeline, + entries: 0, + limit: params.Limit, + otelAdapter: n.EnableOTELAdapter, + }, nil +} + +type entryIterator struct { + iter EntryIterator + + prefilter Processor + pipeline Processor + + entries int + limit int + // TODO(tdakkota): what? + otelAdapter bool +} + +func (i *entryIterator) Next(e *Entry) bool { + for { + if !i.iter.Next(e) || (i.limit > 0 && i.entries >= i.limit) { + return false + } + + var ( + ts = e.Timestamp + keep bool + ) + if i.otelAdapter { + e.Line = LineFromEntry(*e) + } + + e.Line, keep = i.prefilter.Process(ts, e.Line, e.Set) + if !keep { + continue + } + + e.Line, keep = i.pipeline.Process(ts, e.Line, e.Set) + if !keep { + continue + } + + i.entries++ + return true + } +} + +func (i *entryIterator) Err() error { + return i.iter.Err() +} + +func (i *entryIterator) Close() error { + return i.iter.Close() +} diff --git a/internal/logql/logqlengine/engine_metric_nodes.go b/internal/logql/logqlengine/engine_metric_nodes.go new file mode 100644 index 00000000..61aab27a --- /dev/null +++ b/internal/logql/logqlengine/engine_metric_nodes.go @@ -0,0 +1,196 @@ +package logqlengine + +import ( + "context" + "io" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" +) + +var ( + _ MetricNode = (*RangeAggregation)(nil) + _ MetricNode = (*VectorAggregation)(nil) + _ MetricNode = (*LabelReplace)(nil) + _ MetricNode = (*Vector)(nil) + _ MetricNode = (*LiteralBinOp)(nil) + _ MetricNode = (*BinOp)(nil) +) + +// RangeAggregation is a [MetricNode] implementing range aggregation. +type RangeAggregation struct { + Input SampleNode + Expr *logql.RangeAggregationExpr +} + +// Traverse implements [Node]. +func (n *RangeAggregation) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalMetric implements [EvalMetric]. +func (n *RangeAggregation) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + var ( + qrange = n.Expr.Range + start = params.Start + end = params.End + ) + if o := qrange.Offset; o != nil { + start = start.Add(-o.Duration) + end = end.Add(-o.Duration) + } + // Query samples for first step. + qstart := start.Add(-qrange.Range) + + iter, err := n.Input.EvalSample(ctx, EvalParams{ + Start: qstart, + End: end, + Step: params.Step, + Direction: DirectionForward, + // Do not limit sample queries. + Limit: -1, + }) + if err != nil { + return nil, err + } + defer closeOnError(iter, &rerr) + + return logqlmetric.RangeAggregation(iter, n.Expr, start, end, params.Step) +} + +// VectorAggregation is a [MetricNode] implementing vector aggregation. +type VectorAggregation struct { + Input MetricNode + Expr *logql.VectorAggregationExpr +} + +// Traverse implements [Node]. +func (n *VectorAggregation) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalMetric implements [EvalMetric]. +func (n *VectorAggregation) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + iter, err := n.Input.EvalMetric(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(iter, &rerr) + + return logqlmetric.VectorAggregation(iter, n.Expr) +} + +// LabelReplace is a [MetricNode] implementing `label_replace` function. +type LabelReplace struct { + Input MetricNode + Expr *logql.LabelReplaceExpr +} + +// Traverse implements [Node]. +func (n *LabelReplace) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalMetric implements [EvalMetric]. +func (n *LabelReplace) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + iter, err := n.Input.EvalMetric(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(iter, &rerr) + + return logqlmetric.LabelReplace(iter, n.Expr) +} + +// Vector is a [MetricNode] implementing vector literal. +type Vector struct { + Expr *logql.VectorExpr +} + +// Traverse implements [Node]. +func (n *Vector) Traverse(cb NodeVisitor) error { + return cb(n) +} + +// EvalMetric implements [EvalMetric]. +func (n *Vector) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + return logqlmetric.Vector(n.Expr, params.Start, params.End, params.Step), nil +} + +// LiteralBinOp is a [MetricNode] implementing binary operation on literal. +type LiteralBinOp struct { + Input MetricNode + Literal float64 + IsLiteralOnLeft bool + Expr *logql.BinOpExpr +} + +// Traverse implements [Node]. +func (n *LiteralBinOp) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalMetric implements [EvalMetric]. +func (n *LiteralBinOp) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + iter, err := n.Input.EvalMetric(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(iter, &rerr) + + return logqlmetric.LiteralBinOp(iter, n.Expr, n.Literal, n.IsLiteralOnLeft) +} + +// BinOp is a [MetricNode] implementing binary operation. +type BinOp struct { + Left, Right MetricNode + Expr *logql.BinOpExpr +} + +// Traverse implements [Node]. +func (n *BinOp) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + if err := n.Left.Traverse(cb); err != nil { + return err + } + return n.Right.Traverse(cb) +} + +// EvalMetric implements [EvalMetric]. +func (n *BinOp) EvalMetric(ctx context.Context, params MetricParams) (_ StepIterator, rerr error) { + // TODO(tdakkota): it is likely it would make a query to storage, so + // probably we should do it concurrently. + left, err := n.Left.EvalMetric(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(left, &rerr) + + right, err := n.Right.EvalMetric(ctx, params) + if err != nil { + return nil, err + } + defer closeOnError(right, &rerr) + + return logqlmetric.BinOp(left, right, n.Expr) +} + +func closeOnError[C io.Closer](c C, rerr *error) { + if *rerr != nil { + _ = c.Close() + } +} diff --git a/internal/logql/logqlengine/engine_metric_query.go b/internal/logql/logqlengine/engine_metric_query.go new file mode 100644 index 00000000..a49b1747 --- /dev/null +++ b/internal/logql/logqlengine/engine_metric_query.go @@ -0,0 +1,142 @@ +package logqlengine + +import ( + "context" + "fmt" + + "github.com/go-faster/errors" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" + "github.com/go-faster/oteldb/internal/lokiapi" +) + +// MetricQuery represents a metric query. +type MetricQuery struct { + Root MetricNode +} + +func (e *Engine) buildMetricQuery(ctx context.Context, expr logql.MetricExpr) (Query, error) { + root, err := e.buildMetricNode(ctx, expr) + if err != nil { + return nil, err + } + + return &MetricQuery{ + Root: root, + }, nil +} + +var _ Query = (*MetricQuery)(nil) + +// Eval implements [Query]. +func (q *MetricQuery) Eval(ctx context.Context, params EvalParams) (lokiapi.QueryResponseData, error) { + data, err := q.eval(ctx, params) + if err != nil { + return data, errors.Wrap(err, "evaluate metric query") + } + return data, nil +} + +func (q *MetricQuery) eval(ctx context.Context, params EvalParams) (data lokiapi.QueryResponseData, _ error) { + iter, err := q.Root.EvalMetric(ctx, MetricParams{ + // NOTE(tdakkota): for some reason, timestamps in Loki appear truncated by step. + // Do the same thing. + Start: params.Start.Truncate(params.Step), + End: params.End.Truncate(params.Step).Add(params.Step), + Step: params.Step, + }) + if err != nil { + return data, err + } + defer func() { + _ = iter.Close() + }() + + data, err = logqlmetric.ReadStepResponse(iter, params.IsInstant()) + if err != nil { + return data, err + } + + return data, nil +} + +func (e *Engine) buildMetricNode(ctx context.Context, expr logql.MetricExpr) (MetricNode, error) { + switch expr := logql.UnparenExpr(expr).(type) { + case *logql.RangeAggregationExpr: + node, err := e.buildSampleNode(ctx, expr) + if err != nil { + return nil, err + } + return &RangeAggregation{ + Input: node, + Expr: expr, + }, nil + case *logql.VectorAggregationExpr: + node, err := e.buildMetricNode(ctx, expr.Expr) + if err != nil { + return nil, err + } + return &VectorAggregation{ + Input: node, + Expr: expr, + }, nil + case *logql.LiteralExpr: + case *logql.LabelReplaceExpr: + node, err := e.buildMetricNode(ctx, expr.Expr) + if err != nil { + return nil, err + } + return &LabelReplace{ + Input: node, + Expr: expr, + }, nil + case *logql.VectorExpr: + return &Vector{ + Expr: expr, + }, nil + case *logql.BinOpExpr: + if lit, ok := expr.Left.(*logql.LiteralExpr); ok { + right, err := e.buildMetricNode(ctx, expr.Right) + if err != nil { + return nil, err + } + return &LiteralBinOp{ + Input: right, + Literal: lit.Value, + IsLiteralOnLeft: true, + Expr: expr, + }, nil + } + if lit, ok := expr.Right.(*logql.LiteralExpr); ok { + left, err := e.buildMetricNode(ctx, expr.Left) + if err != nil { + return nil, err + } + return &LiteralBinOp{ + Input: left, + Literal: lit.Value, + IsLiteralOnLeft: false, + Expr: expr, + }, nil + } + + left, err := e.buildMetricNode(ctx, expr.Left) + if err != nil { + return nil, err + } + right, err := e.buildMetricNode(ctx, expr.Right) + if err != nil { + return nil, err + } + return &BinOp{ + Left: left, + Right: right, + Expr: expr, + }, nil + default: + return nil, errors.Errorf("unexpected expression %T", expr) + } + return nil, &logqlerrors.UnsupportedError{Msg: fmt.Sprintf("expression %T is not supported yet", expr)} +} diff --git a/internal/logql/logqlengine/engine_optimizer.go b/internal/logql/logqlengine/engine_optimizer.go new file mode 100644 index 00000000..b96dde89 --- /dev/null +++ b/internal/logql/logqlengine/engine_optimizer.go @@ -0,0 +1,30 @@ +package logqlengine + +import ( + "context" + + "github.com/go-faster/errors" +) + +// Optimizer defines an interface for optimizer. +type Optimizer interface { + // Name returns optimizer name. + Name() string + Optimize(ctx context.Context, q Query) (Query, error) +} + +// DefaultOptimizers returns slice of default [Optimizer]s. +func DefaultOptimizers() []Optimizer { + return []Optimizer{} +} + +func (e *Engine) applyOptimizers(ctx context.Context, q Query) (Query, error) { + var err error + for _, o := range e.optimizers { + q, err = o.Optimize(ctx, q) + if err != nil { + return nil, errors.Wrapf(err, "optimizer %q failed", o.Name()) + } + } + return q, nil +} diff --git a/internal/logql/logqlengine/engine_plan.go b/internal/logql/logqlengine/engine_plan.go new file mode 100644 index 00000000..b6ee5f96 --- /dev/null +++ b/internal/logql/logqlengine/engine_plan.go @@ -0,0 +1,101 @@ +package logqlengine + +import ( + "context" + "time" + + "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" + "github.com/go-faster/oteldb/internal/lokiapi" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// EvalParams sets evaluation parameters. +type EvalParams struct { + Start time.Time + End time.Time + Step time.Duration + Direction Direction // forward, backward + Limit int +} + +// IsInstant whether query is instant. +func (p EvalParams) IsInstant() bool { + return p.Start == p.End && p.Step == 0 +} + +// Direction describe log ordering. +type Direction string + +const ( + // DirectionBackward sorts records in descending order. + DirectionBackward Direction = "backward" + // DirectionForward sorts records in ascending order. + DirectionForward Direction = "forward" +) + +// Entry represents a log entry. +type Entry struct { + Timestamp otelstorage.Timestamp + Line string + Set LabelSet +} + +type ( + // EntryIterator represents a LogQL entry log stream. + EntryIterator = iterators.Iterator[Entry] + // SampleIterator represents a samples stream. + SampleIterator = iterators.Iterator[logqlmetric.SampledEntry] + // StepIterator represents a metric stream. + StepIterator = logqlmetric.StepIterator +) + +// NodeVisitor is a callback to traverse [Node]. +type NodeVisitor = func(n Node) error + +// Node is a generic node interface. +type Node interface { + // Traverse calls given callback on child nodes. + Traverse(cb NodeVisitor) error +} + +// VisitNode visits nodes of given type. +func VisitNode[N Node](root Node, cb func(N) error) error { + return root.Traverse(func(n Node) error { + match, ok := n.(N) + if !ok { + return nil + } + return cb(match) + }) +} + +// PipelineNode represents a LogQL pipeline node. +type PipelineNode interface { + Node + EvalPipeline(ctx context.Context, params EvalParams) (EntryIterator, error) +} + +// SampleNode represents a log sampling node. +type SampleNode interface { + Node + EvalSample(ctx context.Context, params EvalParams) (SampleIterator, error) +} + +// MetricParams defines [MetricNode] parameters. +type MetricParams struct { + Start time.Time + End time.Time + Step time.Duration +} + +// MetricNode represents a LogQL metric function node. +type MetricNode interface { + Node + EvalMetric(ctx context.Context, params MetricParams) (StepIterator, error) +} + +// Query is a LogQL query. +type Query interface { + Eval(ctx context.Context, params EvalParams) (lokiapi.QueryResponseData, error) +} diff --git a/internal/logql/logqlengine/engine_test.go b/internal/logql/logqlengine/engine_test.go index cbfbf0d3..c23b827f 100644 --- a/internal/logql/logqlengine/engine_test.go +++ b/internal/logql/logqlengine/engine_test.go @@ -37,44 +37,61 @@ type mockQuerier struct { step time.Duration } +var _ Querier = (*mockQuerier)(nil) + func (m *mockQuerier) Capabilities() (caps QuerierCapabilities) { return caps } -func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timestamp, direction Direction, _ SelectLogsParams) (iterators.Iterator[logstorage.Record], error) { - step := m.step +func (m *mockQuerier) Query(ctx context.Context, selector []logql.LabelMatcher) (PipelineNode, error) { + return &mockPipelineNode{ + querier: m, + }, nil +} + +type mockPipelineNode struct { + querier *mockQuerier +} + +var _ PipelineNode = (*mockPipelineNode)(nil) + +func (n *mockPipelineNode) Traverse(cb NodeVisitor) error { + return cb(n) +} + +func (n *mockPipelineNode) EvalPipeline(ctx context.Context, params EvalParams) (EntryIterator, error) { + var ( + step = n.querier.step + ts = params.Start + direction = params.Direction + ) if step == 0 { step = time.Millisecond } - ts := start.AsTime() if direction != DirectionForward { return nil, errors.Errorf("test: direction %q is unsupported", direction) } var ( - records []logstorage.Record + entries []Entry scopeAttrs = pcommon.NewMap() resAttrs = pcommon.NewMap() ) scopeAttrs.PutStr("scope", "test") resAttrs.PutStr("resource", "test") - for _, l := range m.lines { + for _, l := range n.querier.lines { + var ( + line = l.line + attrs = pcommon.NewMap() + ) ts = ts.Add(step) - body := l.line - rec := logstorage.Record{ - Timestamp: otelstorage.NewTimestampFromTime(ts), - Body: l.line, - Attrs: otelstorage.Attrs(l.attrs), - ScopeAttrs: otelstorage.Attrs(scopeAttrs), - ResourceAttrs: otelstorage.Attrs(resAttrs), - } - if rec.Attrs == otelstorage.Attrs(pcommon.Map{}) { - rec.Attrs = otelstorage.Attrs(pcommon.NewMap()) + if l.attrs != (pcommon.Map{}) { + l.attrs.CopyTo(attrs) } - if dec := jx.DecodeStr(body); dec.Next() == jx.Object { - rec.Body = "" + + if dec := jx.DecodeStr(line); dec.Next() == jx.Object { if err := dec.Obj(func(d *jx.Decoder, key string) error { switch key { case logstorage.LabelBody: @@ -82,7 +99,7 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta if err != nil { return err } - rec.Body = v + line = v return nil case logstorage.LabelTraceID: v, err := d.Str() @@ -93,7 +110,7 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta if err != nil { return err } - rec.TraceID = traceID + attrs.PutStr(logstorage.LabelTraceID, traceID.Hex()) return nil default: switch d.Next() { @@ -102,14 +119,14 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta if err != nil { return err } - rec.Attrs.AsMap().PutStr(key, v) + attrs.PutStr(key, v) return nil case jx.Bool: v, err := d.Bool() if err != nil { return err } - rec.Attrs.AsMap().PutBool(key, v) + attrs.PutBool(key, v) return nil case jx.Number: v, err := d.Num() @@ -121,13 +138,13 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta if err != nil { return err } - rec.Attrs.AsMap().PutInt(key, n) + attrs.PutInt(key, n) } else { n, err := v.Float64() if err != nil { return err } - rec.Attrs.AsMap().PutDouble(key, n) + attrs.PutDouble(key, n) } return nil default: @@ -135,7 +152,7 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta if err != nil { return err } - rec.Attrs.AsMap().PutStr(key, string(v)) + attrs.PutStr(key, string(v)) return nil } } @@ -143,10 +160,21 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta return nil, err } } - records = append(records, rec) + + set := NewLabelSet() + set.SetAttrs( + otelstorage.Attrs(attrs), + otelstorage.Attrs(scopeAttrs), + otelstorage.Attrs(resAttrs), + ) + entries = append(entries, Entry{ + Timestamp: otelstorage.NewTimestampFromTime(ts), + Line: line, + Set: set, + }) } - return iterators.Slice(records), nil + return iterators.Slice(entries), nil } func justLines(lines ...string) []inputLine { @@ -197,7 +225,7 @@ var ( ) func TestEngineEvalStream(t *testing.T) { - startTime := otelstorage.Timestamp(1688833731000000000) + startTime := otelstorage.Timestamp(1688833731000000000).AsTime() tests := []struct { query string @@ -443,11 +471,10 @@ func TestEngineEvalStream(t *testing.T) { opts := Options{ ParseOptions: logql.ParseOptions{AllowDots: true}, - OTELAdapter: true, } e := NewEngine(&mockQuerier{lines: tt.input}, opts) - gotData, err := e.Eval(ctx, tt.query, EvalParams{ + gotData, err := eval(ctx, e, tt.query, EvalParams{ Start: startTime, End: startTime, Limit: 1000, @@ -509,6 +536,14 @@ func TestEngineEvalStream(t *testing.T) { } } +func eval(ctx context.Context, e *Engine, query string, params EvalParams) (r lokiapi.QueryResponseData, _ error) { + q, err := e.NewQuery(ctx, query) + if err != nil { + return r, err + } + return q.Eval(ctx, params) +} + type timeRange struct { start uint64 end uint64 @@ -622,6 +657,9 @@ func TestEngineEvalLiteral(t *testing.T) { }, test3steps(`vector(3.14)`, "3.14"), + // Literal binary operation test. + test3steps(`label_replace(vector(3), "dst", "$0", "src", ".+") * vector(3)`, "9"), + // Precedence tests. test3steps(`vector(2)+vector(3)*vector(4)`, "14"), test3steps(`vector(2)*vector(3)+vector(4)`, "10"), @@ -642,9 +680,9 @@ func TestEngineEvalLiteral(t *testing.T) { } e := NewEngine(&mockQuerier{}, opts) - gotData, err := e.Eval(ctx, tt.query, EvalParams{ - Start: otelstorage.Timestamp(tt.tsRange.start), - End: otelstorage.Timestamp(tt.tsRange.end), + gotData, err := eval(ctx, e, tt.query, EvalParams{ + Start: otelstorage.Timestamp(tt.tsRange.start).AsTime(), + End: otelstorage.Timestamp(tt.tsRange.end).AsTime(), Step: tt.tsRange.step, Limit: 1000, }) diff --git a/internal/logql/logqlengine/eval_streams.go b/internal/logql/logqlengine/eval_streams.go deleted file mode 100644 index 7422f514..00000000 --- a/internal/logql/logqlengine/eval_streams.go +++ /dev/null @@ -1,170 +0,0 @@ -package logqlengine - -import ( - "cmp" - "context" - "slices" - - "github.com/go-faster/errors" - "golang.org/x/exp/maps" - - "github.com/go-faster/oteldb/internal/iterators" - "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logstorage" - "github.com/go-faster/oteldb/internal/lokiapi" - "github.com/go-faster/oteldb/internal/otelstorage" -) - -type entry struct { - ts otelstorage.Timestamp - line string - set LabelSet -} - -type entryIterator struct { - iter iterators.Iterator[logstorage.Record] - - prefilter Processor - pipeline Processor - - entries int - limit int - // TODO(tdakkota): what? - otelAdapter bool -} - -func (i *entryIterator) Next(e *entry) bool { - var record logstorage.Record - - for { - if !i.iter.Next(&record) || (i.limit > 0 && i.entries >= i.limit) { - return false - } - - ts := record.Timestamp - line := record.Body - if i.otelAdapter { - line = LineFromRecord(record) - } - e.set.SetFromRecord(record) - - line, keep := i.prefilter.Process(ts, line, e.set) - if !keep { - continue - } - - line, keep = i.pipeline.Process(ts, line, e.set) - if !keep { - continue - } - - e.ts = ts - e.line = line - - i.entries++ - return true - } -} - -func (i *entryIterator) Err() error { - return i.iter.Err() -} - -func (i *entryIterator) Close() error { - return i.iter.Close() -} - -type selectLogsParams struct { - Start, End otelstorage.Timestamp - Direction Direction - Instant bool - Limit int -} - -func (e *Engine) selectLogs(ctx context.Context, sel logql.Selector, stages []logql.PipelineStage, params selectLogsParams) (*entryIterator, error) { - direction := params.Direction - if direction == "" { - params.Direction = DirectionForward - } - - // Instant query, sub lookback duration from Start. - if params.Instant { - params.Start = addDuration(params.Start, e.lookbackDuration) - } - - cond, err := extractQueryConditions(e.querierCaps, sel, stages) - if err != nil { - return nil, errors.Wrap(err, "extract preconditions") - } - - pipeline, err := BuildPipeline(stages...) - if err != nil { - return nil, errors.Wrap(err, "build pipeline") - } - - iter, err := e.querier.SelectLogs(ctx, - params.Start, - params.End, - params.Direction, - cond.params, - ) - if err != nil { - return nil, errors.Wrap(err, "get logs") - } - - return &entryIterator{ - iter: iter, - prefilter: cond.prefilter, - pipeline: pipeline, - entries: 0, - limit: params.Limit, - otelAdapter: e.otelAdapter, - }, nil -} - -func (e *Engine) evalLogExpr(ctx context.Context, expr *logql.LogExpr, params EvalParams) (s lokiapi.Streams, _ error) { - iter, err := e.selectLogs(ctx, expr.Sel, expr.Pipeline, selectLogsParams{ - Start: params.Start, - End: params.End, - Direction: params.Direction, - Instant: params.IsInstant(), - Limit: params.Limit, - }) - if err != nil { - return nil, errors.Wrap(err, "select logs") - } - defer func() { - _ = iter.Close() - }() - return groupEntries(iter) -} - -func groupEntries(iter *entryIterator) (s lokiapi.Streams, _ error) { - var ( - e entry - streams = map[string]lokiapi.Stream{} - ) - for iter.Next(&e) { - // FIXME(tdakkota): allocates a string for every record. - key := e.set.String() - stream, ok := streams[key] - if !ok { - stream = lokiapi.Stream{ - Stream: lokiapi.NewOptLabelSet(e.set.AsLokiAPI()), - } - } - stream.Values = append(stream.Values, lokiapi.LogEntry{T: uint64(e.ts), V: e.line}) - streams[key] = stream - } - if err := iter.Err(); err != nil { - return s, err - } - - result := maps.Values(streams) - for _, stream := range result { - slices.SortFunc(stream.Values, func(a, b lokiapi.LogEntry) int { - return cmp.Compare(a.T, b.T) - }) - } - return result, nil -} diff --git a/internal/logql/logqlengine/json_test.go b/internal/logql/logqlengine/json_test.go index a1a44b42..43e021f3 100644 --- a/internal/logql/logqlengine/json_test.go +++ b/internal/logql/logqlengine/json_test.go @@ -90,7 +90,7 @@ func TestJSONExtractor(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) @@ -172,7 +172,7 @@ func BenchmarkJSONExtractor(b *testing.B) { p, err := buildJSONExtractor(bb.expr) require.NoError(b, err) - set := newLabelSet() + set := NewLabelSet() var ( line string ok bool diff --git a/internal/logql/logqlengine/keep_test.go b/internal/logql/logqlengine/keep_test.go index 3ec7ea92..057a24b3 100644 --- a/internal/logql/logqlengine/keep_test.go +++ b/internal/logql/logqlengine/keep_test.go @@ -84,7 +84,7 @@ func TestKeepLabels(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input newLine, ok := e.Process(0, ``, set) // Ensure that processor does not change the line. diff --git a/internal/logql/logqlengine/label_filter_test.go b/internal/logql/logqlengine/label_filter_test.go index 8c6d4a22..5fe2efb2 100644 --- a/internal/logql/logqlengine/label_filter_test.go +++ b/internal/logql/logqlengine/label_filter_test.go @@ -109,7 +109,7 @@ func TestDurationLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input f, err := buildDurationLabelFilter(&logql.DurationFilter{ @@ -234,7 +234,7 @@ func TestBytesLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input f, err := buildBytesLabelFilter(&logql.BytesFilter{ @@ -379,7 +379,7 @@ func TestNumberLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input f, err := buildNumberLabelFilter(&logql.NumberFilter{ @@ -509,7 +509,7 @@ func TestIPLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() _, hasLabel := tt.input[tt.label] set.labels = tt.input diff --git a/internal/logql/logqlengine/label_format_test.go b/internal/logql/logqlengine/label_format_test.go index 9181a2c6..87ef1ee4 100644 --- a/internal/logql/logqlengine/label_format_test.go +++ b/internal/logql/logqlengine/label_format_test.go @@ -80,7 +80,7 @@ func TestLabelFormat(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input newLine, ok := e.Process(1700000001_000000000, "original line", set) // Ensure that processor does not change the line. diff --git a/internal/logql/logqlengine/label_set.go b/internal/logql/logqlengine/label_set.go index 6d337eec..7262d29a 100644 --- a/internal/logql/logqlengine/label_set.go +++ b/internal/logql/logqlengine/label_set.go @@ -21,7 +21,8 @@ type LabelSet struct { labels map[logql.Label]pcommon.Value } -func newLabelSet() LabelSet { +// NewLabelSet creates new [LabelSet]. +func NewLabelSet() LabelSet { return LabelSet{ labels: map[logql.Label]pcommon.Value{}, } @@ -91,6 +92,11 @@ func (l *LabelSet) SetFromRecord(record logstorage.Record) { l.SetAttrs(record.Attrs, record.ScopeAttrs, record.ResourceAttrs) } +// Len returns set length +func (l *LabelSet) Len() int { + return len(l.labels) +} + // SetAttrs sets labels from attrs. func (l *LabelSet) SetAttrs(attrMaps ...otelstorage.Attrs) { for _, attrs := range attrMaps { diff --git a/internal/logql/logqlengine/line_filter_test.go b/internal/logql/logqlengine/line_filter_test.go index 231c7b11..36ef4bd8 100644 --- a/internal/logql/logqlengine/line_filter_test.go +++ b/internal/logql/logqlengine/line_filter_test.go @@ -105,7 +105,7 @@ func TestIPLineFilter(t *testing.T) { for i, tt := range ipLineFilterTests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() f, err := buildLineFilter(&logql.LineFilter{ Op: logql.OpEq, diff --git a/internal/logql/logqlengine/line_format_test.go b/internal/logql/logqlengine/line_format_test.go index e803d976..86dff7b1 100644 --- a/internal/logql/logqlengine/line_format_test.go +++ b/internal/logql/logqlengine/line_format_test.go @@ -56,7 +56,7 @@ func TestLineFormat(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := newLabelSet() + set := NewLabelSet() set.labels = tt.labels f, err := buildLineFormat(&logql.LineFormat{ diff --git a/internal/logql/logqlengine/logfmt_test.go b/internal/logql/logqlengine/logfmt_test.go index 5d12c96d..b5880fc3 100644 --- a/internal/logql/logqlengine/logfmt_test.go +++ b/internal/logql/logqlengine/logfmt_test.go @@ -88,7 +88,7 @@ func TestLogfmtExtractor(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/otel_adapter.go b/internal/logql/logqlengine/otel_adapter.go index b4e496ae..0f3904d4 100644 --- a/internal/logql/logqlengine/otel_adapter.go +++ b/internal/logql/logqlengine/otel_adapter.go @@ -4,51 +4,37 @@ import ( "github.com/go-faster/jx" "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logstorage" ) -// LineFromRecord returns a JSON line from a log record. -func LineFromRecord(record logstorage.Record) string { +// LineFromEntry returns a JSON line from a log record. +func LineFromEntry(entry Entry) string { // Create JSON object from record. e := &jx.Encoder{} e.Obj(func(e *jx.Encoder) { - if record.Body != "" { + if entry.Line != "" { e.Field(logstorage.LabelBody, func(e *jx.Encoder) { - e.Str(record.Body) + e.Str(entry.Line) }) } - if m := record.Attrs.AsMap(); m != (pcommon.Map{}) { - record.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool { - e.Field(k, func(e *jx.Encoder) { - switch v.Type() { - case pcommon.ValueTypeStr: - e.Str(v.Str()) - case pcommon.ValueTypeBool: - e.Bool(v.Bool()) - case pcommon.ValueTypeInt: - e.Int64(v.Int()) - case pcommon.ValueTypeDouble: - e.Float64(v.Double()) - default: - // Fallback. - e.Str(v.AsString()) - } - }) - return true + entry.Set.Range(func(k logql.Label, v pcommon.Value) { + e.Field(string(k), func(e *jx.Encoder) { + switch v.Type() { + case pcommon.ValueTypeStr: + e.Str(v.Str()) + case pcommon.ValueTypeBool: + e.Bool(v.Bool()) + case pcommon.ValueTypeInt: + e.Int64(v.Int()) + case pcommon.ValueTypeDouble: + e.Float64(v.Double()) + default: + // Fallback. + e.Str(v.AsString()) + } }) - } - // HACK: add trace_id, span_id so "trace to logs" metrics work. - // Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`. - if !record.TraceID.IsEmpty() { - e.Field(logstorage.LabelTraceID, func(e *jx.Encoder) { - e.Str(record.TraceID.Hex()) - }) - } - if !record.SpanID.IsEmpty() { - e.Field(logstorage.LabelSpanID, func(e *jx.Encoder) { - e.Str(record.SpanID.Hex()) - }) - } + }) }) return e.String() } diff --git a/internal/logql/logqlengine/pattern_test.go b/internal/logql/logqlengine/pattern_test.go index 1e7190fc..7235ff94 100644 --- a/internal/logql/logqlengine/pattern_test.go +++ b/internal/logql/logqlengine/pattern_test.go @@ -40,7 +40,7 @@ func TestPatternExtractor(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/precondition.go b/internal/logql/logqlengine/precondition.go index 7a158004..39e42e44 100644 --- a/internal/logql/logqlengine/precondition.go +++ b/internal/logql/logqlengine/precondition.go @@ -6,16 +6,16 @@ import ( type queryConditions struct { prefilter Processor - params SelectLogsParams + Labels []logql.LabelMatcher } -func extractQueryConditions(caps QuerierCapabilities, sel logql.Selector, stages []logql.PipelineStage) (cond queryConditions, _ error) { +func extractQueryConditions(caps QuerierCapabilities, sel logql.Selector) (cond queryConditions, _ error) { var prefilters []Processor for _, lm := range sel.Matchers { // If storage supports the operation, just pass it as a parameter. if caps.Label.Supports(lm.Op) { - cond.params.Labels = append(cond.params.Labels, lm) + cond.Labels = append(cond.Labels, lm) continue } @@ -35,36 +35,5 @@ func extractQueryConditions(caps QuerierCapabilities, sel logql.Selector, stages default: cond.prefilter = &Pipeline{Stages: prefilters} } - -stageLoop: - for _, stage := range stages { - switch stage := stage.(type) { - case *logql.LineFilter: - if stage.IP { - // Do not offload IP line filter. - continue - } - if !caps.Line.Supports(stage.Op) { - continue - } - cond.params.Line = append(cond.params.Line, *stage) - case *logql.JSONExpressionParser, - *logql.LogfmtExpressionParser, - *logql.RegexpLabelParser, - *logql.PatternLabelParser, - *logql.LabelFilter, - *logql.LabelFormatExpr, - *logql.DropLabelsExpr, - *logql.KeepLabelsExpr, - *logql.DistinctFilter: - // Do nothing on line, just skip. - case *logql.LineFormat, - *logql.DecolorizeExpr, - *logql.UnpackLabelParser: - // Stage modify the line, can't offload line filters after this stage. - break stageLoop - } - } - return cond, nil } diff --git a/internal/logql/logqlengine/precondition_test.go b/internal/logql/logqlengine/precondition_test.go index 2b395bda..de0cffb7 100644 --- a/internal/logql/logqlengine/precondition_test.go +++ b/internal/logql/logqlengine/precondition_test.go @@ -15,7 +15,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { sel logql.Selector labelCaps []logql.BinOp wantPrefilter bool - conds SelectLogsParams + conds queryConditions wantErr bool }{ { @@ -27,7 +27,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { }, []logql.BinOp{logql.OpEq, logql.OpNotEq}, false, - SelectLogsParams{ + queryConditions{ Labels: []logql.LabelMatcher{ {Label: "foo", Op: logql.OpEq, Value: "bar"}, {Label: "bar", Op: logql.OpNotEq, Value: "foo"}, @@ -44,7 +44,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { }, []logql.BinOp{logql.OpEq, logql.OpNotEq}, true, - SelectLogsParams{ + queryConditions{ Labels: []logql.LabelMatcher{ {Label: "bar", Op: logql.OpNotEq, Value: "foo"}, }, @@ -60,7 +60,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { }, []logql.BinOp{logql.OpEq, logql.OpNotEq}, true, - SelectLogsParams{}, + queryConditions{}, false, }, } @@ -70,7 +70,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { var caps QuerierCapabilities caps.Label.Add(tt.labelCaps...) - conds, err := extractQueryConditions(caps, tt.sel, nil) + conds, err := extractQueryConditions(caps, tt.sel) if tt.wantErr { require.Error(t, err) return @@ -82,78 +82,7 @@ func TestExtractLabelQueryConditions(t *testing.T) { } else { require.Equal(t, NopProcessor, conds.prefilter) } - require.Equal(t, tt.conds, conds.params) - }) - } -} - -func TestExtractLineQueryConditions(t *testing.T) { - tests := []struct { - stages []logql.PipelineStage - lineCaps []logql.BinOp - conds SelectLogsParams - wantErr bool - }{ - { - []logql.PipelineStage{ - &logql.DropLabelsExpr{}, - &logql.LineFilter{Op: logql.OpEq, Value: "first"}, - &logql.LineFilter{Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)}, - &logql.DecolorizeExpr{}, - // These would not be offloaded. - &logql.LineFilter{Op: logql.OpEq, Value: "second"}, - &logql.LineFilter{Op: logql.OpRe, Value: "no+", Re: regexp.MustCompile(`no.+`)}, - }, - []logql.BinOp{ - logql.OpEq, - logql.OpRe, - }, - SelectLogsParams{ - Line: []logql.LineFilter{ - {Op: logql.OpEq, Value: "first"}, - {Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)}, - }, - }, - false, - }, - { - []logql.PipelineStage{ - &logql.LineFilter{Op: logql.OpRe, Value: "a.+", Re: regexp.MustCompile(`a.+`)}, - &logql.DecolorizeExpr{}, - &logql.LineFilter{Op: logql.OpRe, Value: "b+", Re: regexp.MustCompile(`b.+`)}, - }, - []logql.BinOp{ - logql.OpEq, - }, - SelectLogsParams{}, - false, - }, - { - []logql.PipelineStage{ - &logql.LineFilter{Op: logql.OpEq, Value: "127.0.0.1", IP: true}, - }, - []logql.BinOp{ - logql.OpEq, - }, - SelectLogsParams{}, - false, - }, - } - for i, tt := range tests { - tt := tt - t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - var caps QuerierCapabilities - caps.Line.Add(tt.lineCaps...) - - conds, err := extractQueryConditions(caps, logql.Selector{}, tt.stages) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - require.Equal(t, NopProcessor, conds.prefilter) - require.Equal(t, tt.conds, conds.params) + require.Equal(t, tt.conds.Labels, conds.Labels) }) } } diff --git a/internal/logql/logqlengine/regexp_test.go b/internal/logql/logqlengine/regexp_test.go index c9e2b163..37925fa8 100644 --- a/internal/logql/logqlengine/regexp_test.go +++ b/internal/logql/logqlengine/regexp_test.go @@ -52,7 +52,7 @@ func TestRegexpExtractor(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/sampler.go b/internal/logql/logqlengine/sampler.go index 34f83312..5c623328 100644 --- a/internal/logql/logqlengine/sampler.go +++ b/internal/logql/logqlengine/sampler.go @@ -8,39 +8,49 @@ import ( "github.com/dustin/go-humanize" "github.com/go-faster/errors" - "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" - "github.com/go-faster/oteldb/internal/otelstorage" ) -func (e *Engine) sampleSelector(ctx context.Context, params EvalParams) logqlmetric.SampleSelector { - return func(expr *logql.RangeAggregationExpr, start, end time.Time) (_ iterators.Iterator[logqlmetric.SampledEntry], rerr error) { - qrange := expr.Range - - iter, err := e.selectLogs(ctx, qrange.Sel, qrange.Pipeline, selectLogsParams{ - Start: otelstorage.NewTimestampFromTime(start), - End: otelstorage.NewTimestampFromTime(end), - Instant: params.IsInstant(), - Direction: params.Direction, - // Do not limit sample queries. - Limit: -1, - }) - if err != nil { - return nil, errors.Wrap(err, "select logs") - } - defer func() { - if rerr != nil { - _ = iter.Close() - } - }() +// SamplingNode implements entry sampling. +type SamplingNode struct { + Input PipelineNode + Expr *logql.RangeAggregationExpr +} + +var _ SampleNode = (*SamplingNode)(nil) + +func (e *Engine) buildSampleNode(ctx context.Context, expr *logql.RangeAggregationExpr) (SampleNode, error) { + root, err := e.buildPipelineNode(ctx, expr.Range.Sel, expr.Range.Pipeline) + if err != nil { + return nil, err + } - return newSampleIterator(iter, expr) + return &SamplingNode{ + Input: root, + Expr: expr, + }, nil +} + +// Traverse implements [Node]. +func (n *SamplingNode) Traverse(cb NodeVisitor) error { + if err := cb(n); err != nil { + return err + } + return n.Input.Traverse(cb) +} + +// EvalSample implements [SampleNode]. +func (n *SamplingNode) EvalSample(ctx context.Context, params EvalParams) (SampleIterator, error) { + iter, err := n.Input.EvalPipeline(ctx, params) + if err != nil { + return nil, err } + return newSampleIterator(iter, n.Expr) } type sampleIterator struct { - iter iterators.Iterator[entry] + iter EntryIterator sampler sampleExtractor // grouping parameters. @@ -48,7 +58,7 @@ type sampleIterator struct { without map[string]struct{} } -func newSampleIterator(iter iterators.Iterator[entry], expr *logql.RangeAggregationExpr) (*sampleIterator, error) { +func newSampleIterator(iter EntryIterator, expr *logql.RangeAggregationExpr) (*sampleIterator, error) { sampler, err := buildSampleExtractor(expr) if err != nil { return nil, errors.Wrap(err, "build sample extractor") @@ -75,7 +85,7 @@ func newSampleIterator(iter iterators.Iterator[entry], expr *logql.RangeAggregat } func (i *sampleIterator) Next(s *logqlmetric.SampledEntry) bool { - var e entry + var e Entry for { if !i.iter.Next(&e) { return false @@ -86,9 +96,9 @@ func (i *sampleIterator) Next(s *logqlmetric.SampledEntry) bool { continue } - s.Timestamp = e.ts + s.Timestamp = e.Timestamp s.Sample = v - s.Set = newAggregatedLabels(e.set, i.by, i.without) + s.Set = AggregatedLabelsFromSet(e.Set, i.by, i.without) return true } } @@ -103,7 +113,7 @@ func (i *sampleIterator) Close() error { // sampleExtractor extracts samples from log records. type sampleExtractor interface { - Extract(e entry) (float64, bool) + Extract(e Entry) (float64, bool) } func buildSampleExtractor(expr *logql.RangeAggregationExpr) (sampleExtractor, error) { @@ -190,14 +200,14 @@ func convertDuration(s string) (float64, error) { type lineCounterExtractor struct{} -func (*lineCounterExtractor) Extract(entry) (float64, bool) { +func (*lineCounterExtractor) Extract(Entry) (float64, bool) { return 1., true } type bytesCounterExtractor struct{} -func (*bytesCounterExtractor) Extract(e entry) (float64, bool) { - return float64(len(e.line)), true +func (*bytesCounterExtractor) Extract(e Entry) (float64, bool) { + return float64(len(e.Line)), true } type labelsExtractor struct { @@ -206,8 +216,8 @@ type labelsExtractor struct { postfilter Processor } -func (l *labelsExtractor) Extract(e entry) (p float64, _ bool) { - v, ok := e.set.GetString(l.label) +func (l *labelsExtractor) Extract(e Entry) (p float64, _ bool) { + v, ok := e.Set.GetString(l.label) if !ok { return p, false } @@ -215,6 +225,6 @@ func (l *labelsExtractor) Extract(e entry) (p float64, _ bool) { p, _ = l.converter(v) // TODO(tdakkota): save error - _, ok = l.postfilter.Process(e.ts, e.line, e.set) + _, ok = l.postfilter.Process(e.Timestamp, e.Line, e.Set) return p, ok } diff --git a/internal/logql/logqlengine/sampler_test.go b/internal/logql/logqlengine/sampler_test.go index e67bcb21..6fe03e53 100644 --- a/internal/logql/logqlengine/sampler_test.go +++ b/internal/logql/logqlengine/sampler_test.go @@ -254,13 +254,13 @@ func TestSampleExtractor(t *testing.T) { }) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() set.labels = tt.input.labels - got, gotOk := e.Extract(entry{ - ts: 1, - line: tt.input.line, - set: set, + got, gotOk := e.Extract(Entry{ + Timestamp: 1, + Line: tt.input.line, + Set: set, }) if !tt.wantOk { require.False(t, gotOk) diff --git a/internal/logql/logqlengine/storage.go b/internal/logql/logqlengine/storage.go index 313e5a6d..80054e42 100644 --- a/internal/logql/logqlengine/storage.go +++ b/internal/logql/logqlengine/storage.go @@ -3,10 +3,7 @@ package logqlengine import ( "context" - "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logstorage" - "github.com/go-faster/oteldb/internal/otelstorage" ) // SupportedOps is a bitset defining ops supported by Querier. @@ -38,12 +35,6 @@ type Querier interface { // NOTE: engine would call once and then save value. // Capabilities should not change over time. Capabilities() QuerierCapabilities - // SelectLogs selects log records from storage. - SelectLogs(ctx context.Context, start, end otelstorage.Timestamp, direction Direction, params SelectLogsParams) (iterators.Iterator[logstorage.Record], error) -} - -// SelectLogsParams is a storage query params. -type SelectLogsParams struct { - Labels []logql.LabelMatcher - Line []logql.LineFilter + // Query creates new [PipelineNode]. + Query(ctx context.Context, selector []logql.LabelMatcher) (PipelineNode, error) } diff --git a/internal/logql/logqlengine/unpack_test.go b/internal/logql/logqlengine/unpack_test.go index 5ac68ca6..8c1666ac 100644 --- a/internal/logql/logqlengine/unpack_test.go +++ b/internal/logql/logqlengine/unpack_test.go @@ -58,7 +58,7 @@ func TestUnpackExtractor(t *testing.T) { e, err := buildUnpackExtractor(&logql.UnpackLabelParser{}) require.NoError(t, err) - set := newLabelSet() + set := NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, newLine, tt.expectLine) diff --git a/internal/logql/metric_expr.go b/internal/logql/metric_expr.go index 4b5da273..436f647d 100644 --- a/internal/logql/metric_expr.go +++ b/internal/logql/metric_expr.go @@ -145,10 +145,10 @@ type VectorExpr struct { // BinOpExpr defines a binary operation between two Expr. type BinOpExpr struct { - Left Expr + Left MetricExpr Op BinOp Modifier BinOpModifier - Right Expr + Right MetricExpr } // ReduceBinOp recursively precomputes literal expression. diff --git a/internal/lokihandler/lokihandler.go b/internal/lokihandler/lokihandler.go index 54c66c1a..e4b84f40 100644 --- a/internal/lokihandler/lokihandler.go +++ b/internal/lokihandler/lokihandler.go @@ -158,16 +158,17 @@ func (h *LokiAPI) Query(ctx context.Context, params lokiapi.QueryParams) (*lokia return nil, errors.Errorf("invalid direction %q", d) } - data, err := h.engine.Eval(ctx, params.Query, logqlengine.EvalParams{ - Start: otelstorage.NewTimestampFromTime(ts), - End: otelstorage.NewTimestampFromTime(ts), + data, err := h.eval(ctx, params.Query, logqlengine.EvalParams{ + Start: ts, + End: ts, Step: 0, Direction: direction, Limit: params.Limit.Or(100), }) if err != nil { - return nil, errors.Wrap(err, "eval") + return nil, err } + lg.Debug("Query", zap.String("type", string(data.Type))) return &lokiapi.QueryResponse{ @@ -209,15 +210,15 @@ func (h *LokiAPI) QueryRange(ctx context.Context, params lokiapi.QueryRangeParam return nil, errors.Errorf("invalid direction %q", d) } - data, err := h.engine.Eval(ctx, params.Query, logqlengine.EvalParams{ - Start: otelstorage.NewTimestampFromTime(start), - End: otelstorage.NewTimestampFromTime(end), + data, err := h.eval(ctx, params.Query, logqlengine.EvalParams{ + Start: start, + End: end, Step: step, Direction: direction, Limit: params.Limit.Or(100), }) if err != nil { - return nil, errors.Wrap(err, "eval") + return nil, err } lg.Debug("Query range", zap.String("type", string(data.Type))) @@ -246,9 +247,9 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok zctx.From(ctx).Info("Series", zap.Int("match", len(params.Match))) for _, q := range params.Match { // TODO(ernado): offload - data, err := h.engine.Eval(ctx, q, logqlengine.EvalParams{ - Start: otelstorage.NewTimestampFromTime(start), - End: otelstorage.NewTimestampFromTime(end), + data, err := h.eval(ctx, q, logqlengine.EvalParams{ + Start: start, + End: end, Direction: logqlengine.DirectionBackward, Limit: 1_000, }) @@ -270,6 +271,18 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok }, nil } +func (h *LokiAPI) eval(ctx context.Context, query string, params logqlengine.EvalParams) (r lokiapi.QueryResponseData, _ error) { + q, err := h.engine.NewQuery(ctx, query) + if err != nil { + return r, errors.Wrap(err, "compile query") + } + r, err = q.Eval(ctx, params) + if err != nil { + return r, errors.Wrap(err, "eval") + } + return r, nil +} + // NewError creates *ErrorStatusCode from error returned by handler. // // Used for common default response.