diff --git a/internal/chstorage/querier_logs_optimizer.go b/internal/chstorage/querier_logs_optimizer.go index 3cd7dd9b..316ae610 100644 --- a/internal/chstorage/querier_logs_optimizer.go +++ b/internal/chstorage/querier_logs_optimizer.go @@ -2,12 +2,14 @@ package chstorage import ( "context" + "slices" "github.com/go-faster/sdk/zctx" "go.uber.org/zap" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlpattern" ) // ClickhouseOptimizer replaces LogQL engine execution @@ -179,44 +181,147 @@ func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode, lg *z } func (o *ClickhouseOptimizer) offloadLabelFilters(pipeline []logql.PipelineStage) (filters []logql.LabelPredicate) { -stageLoop: - for _, stage := range pipeline { - switch stage := stage.(type) { - case *logql.LabelFilter: - if !o.canOffloadLabelPredicate(stage.Pred) { - continue + labels := make(map[logql.Label]struct{}, 4) +nextStage: + for i, stage := range pipeline { + f, ok := stage.(*logql.LabelFilter) + if !ok { + continue + } + + // Collect labels that predicate matches. + clear(labels) + if !o.collectPredicateLabels(labels, f.Pred) { + continue + } + + // Ensure that any stage before filter does not affect labels + // that predicates matches. + for label := range labels { + if slices.ContainsFunc(pipeline[:i], func(stage logql.PipelineStage) bool { + return o.affectsLabel(stage, label) + }) { + continue nextStage } - filters = append(filters, stage.Pred) - case *logql.DecolorizeExpr, - *logql.LineFilter: - // Do nothing on label set, just skip. - default: - // Stage modify the label set, can't offload label filters after this stage. - break stageLoop } + + filters = append(filters, f.Pred) } return filters } -func (o *ClickhouseOptimizer) canOffloadLabelPredicate(p logql.LabelPredicate) bool { - switch p := p.(type) { +func (o *ClickhouseOptimizer) affectsLabel(stage logql.PipelineStage, label logql.Label) bool { + isErrorLabel := label == logql.ErrorLabel || + label == logql.ErrorDetailsLabel + + inLabelExpr := func(exprs []logql.LabelExtractionExpr, label logql.Label) bool { + return slices.ContainsFunc(exprs, func(expr logql.LabelExtractionExpr) bool { + return expr.Label == label + }) + } + + parserAffectsLabel := func(isErrorLabel bool, labels []logql.Label, exprs []logql.LabelExtractionExpr) bool { + // Parsing might fail. + if isErrorLabel { + return true + } + + // All parsed fields would be added to label set. + if len(labels)+len(exprs) == 0 { + return true + } + + // Any parsed label is affected. + return slices.Contains(labels, label) || + inLabelExpr(exprs, label) + } + + switch stage := stage.(type) { + case *logql.LineFilter: + return false + case *logql.JSONExpressionParser: + return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs) + case *logql.LogfmtExpressionParser: + return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs) + case *logql.RegexpLabelParser: + for _, v := range stage.Mapping { + if v == label { + return true + } + } + return false + case *logql.PatternLabelParser: + for _, p := range stage.Pattern.Parts { + if p.Type == logqlpattern.Capture && + p.Value == string(label) { + return true + } + } + return false + case *logql.UnpackLabelParser: + return true + case *logql.LineFormat: + return isErrorLabel + case *logql.DecolorizeExpr: + return false + case *logql.LabelFilter: + return isErrorLabel && o.failableLabelPredicate(stage.Pred) + case *logql.LabelFormatExpr: + // Template might fail, occurred error would be added to label set. + if len(stage.Values) > 0 && isErrorLabel { + return true + } + + // Any renamed-to label is affected. + if slices.ContainsFunc(stage.Labels, func(expr logql.RenameLabel) bool { + return expr.To == label + }) { + return true + } + + // Any formatted label is affected. + return slices.ContainsFunc(stage.Values, func(expr logql.LabelTemplate) bool { + return expr.Label == label + }) + default: + return true + } +} + +func (o *ClickhouseOptimizer) collectPredicateLabels(labels map[logql.Label]struct{}, pred logql.LabelPredicate) bool { + pred = logql.UnparenLabelPredicate(pred) + + switch pred := pred.(type) { case *logql.LabelPredicateBinOp: - switch p.Op { + switch pred.Op { case logql.OpAnd, logql.OpOr: default: return false } - return o.canOffloadLabelPredicate(p.Left) && - o.canOffloadLabelPredicate(p.Right) - case *logql.LabelPredicateParen: - return o.canOffloadLabelPredicate(p.X) + return o.collectPredicateLabels(labels, pred.Left) && + o.collectPredicateLabels(labels, pred.Right) case *logql.LabelMatcher: + labels[pred.Label] = struct{}{} return true default: return false } } +func (o *ClickhouseOptimizer) failableLabelPredicate(pred logql.LabelPredicate) bool { + pred = logql.UnparenLabelPredicate(pred) + + switch pred := pred.(type) { + case *logql.LabelPredicateBinOp: + return o.failableLabelPredicate(pred.Left) || + o.failableLabelPredicate(pred.Right) + case *logql.LabelMatcher: + return false + default: + return true + } +} + func (o *ClickhouseOptimizer) offloadLineFilters(pipeline []logql.PipelineStage) (line []logql.LineFilter) { stageLoop: for _, stage := range pipeline { diff --git a/internal/chstorage/querier_logs_optimizer_test.go b/internal/chstorage/querier_logs_optimizer_test.go new file mode 100644 index 00000000..5a39be30 --- /dev/null +++ b/internal/chstorage/querier_logs_optimizer_test.go @@ -0,0 +1,271 @@ +package chstorage + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/go-faster/oteldb/internal/logql" +) + +func TestClickhouseOptimizer_offloadLabelFilters(t *testing.T) { + var o ClickhouseOptimizer + + tests := []struct { + input string + want []string + }{ + // No label filters: nothing to offload. + {`{job=".+"}`, nil}, + {`{job=".+"} |= "HEAD"`, nil}, + + // Parsing label filters: nothing to offload. + {`{job=".+"} | method == 10`, nil}, + {`{job=".+"} | method == 10s`, nil}, + {`{job=".+"} | method == 10mb`, nil}, + {`{job=".+"} | method = ip("127.0.0.1")`, nil}, + {`{job=".+"} | method = "HEAD" or method == 10`, nil}, + // Ensure that integer label filter affects __error__. + { + `{job=".+"} + | method = "HEAD" + | __error__ = "error" | __error_details__ = "error details" + | method = "GET" or code == 10 + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, + `__error__="error"`, `__error_details__="error details"`, + }, + }, + + // Simple cases. + {`{job=".+"} | method = "HEAD"`, []string{`method="HEAD"`}}, + {`{job=".+"} | method = "HEAD" or method = "GET"`, []string{`method="HEAD" or method="GET"`}}, + {`{job=".+"} | method = "HEAD", method = "GET"`, []string{`method="HEAD" and method="GET"`}}, + + // Line filter. + // + // Line filters does not affect labels. + { + `{job=".+"} + | method = "HEAD" + |= "error" + | method = "GET"`, + []string{ + `method="HEAD"`, + `method="GET"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" + |= ip("127.0.0.1") + | method = "GET"`, + []string{ + `method="HEAD"`, + `method="GET"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" + |> "<_> foo <_>" + | method = "GET"`, + []string{ + `method="HEAD"`, + `method="GET"`, + }, + }, + + // Parsing. + // + // JSON parser. + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | json + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | json status + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + `method!="GET"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | json protocol="request.protocol" + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + `method!="GET"`, `status!="error"`, + }, + }, + // Logfmt parser. + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | logfmt + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | logfmt status + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + `method!="GET"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | logfmt protocol="request.protocol" + | method != "GET" | status != "error" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, `status="ok"`, + `method!="GET"`, `status!="error"`, + }, + }, + // Regexp parser. + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | regexp "(?P\\w+)" + | method != "GET" | status != "error"`, + []string{ + `method="HEAD"`, `status="ok"`, + `status!="error"`, + }, + }, + // Pattern parser. + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | pattern " " + | method != "GET" | status != "error"`, + []string{ + `method="HEAD"`, `status="ok"`, + `status!="error"`, + }, + }, + // Unpack parser. + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | unpack + | method != "GET" | status != "error"`, + []string{ + `method="HEAD"`, `status="ok"`, + }, + }, + + // Line format. + // + // line_format affects `__error__` and `__error_details__`. + { + `{job=".+"} + | method = "HEAD" + | line_format "{{ . }}" + | method != "GET" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, + `method!="GET"`, + }, + }, + + // Decolorize. + // + // Decolorize does not affect labels. + { + `{job=".+"} + | method = "HEAD" + | decolorize + | method = "GET"`, + []string{ + `method="HEAD"`, + `method="GET"`, + }, + }, + + // Label format. + { + `{job=".+"} + | method = "HEAD" + | label_format method="{{ . }}" + | method != "GET"`, + []string{ + `method="HEAD"`, + }, + }, + { + `{job=".+"} + | method = "HEAD" | status = "ok" + | label_format method=status + | method != "GET" | status != "error"`, + []string{ + `method="HEAD"`, `status="ok"`, + `status!="error"`, + }, + }, + // label_format affects __error__. + { + `{job=".+"} + | method = "HEAD" + | label_format status="{{ . }}" + | method != "GET" + | __error__ != "" | __error_details__ != ""`, + []string{ + `method="HEAD"`, + `method!="GET"`, + }, + }, + } + for i, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + defer func() { + if t.Failed() { + t.Logf("Query:\n%s", tt.input) + } + }() + + expr, err := logql.Parse(tt.input, logql.ParseOptions{AllowDots: false}) + require.NoError(t, err) + logExpr := expr.(*logql.LogExpr) + + var ( + offloaded = o.offloadLabelFilters(logExpr.Pipeline) + got = make([]string, len(offloaded)) + ) + for i, pred := range offloaded { + got[i] = pred.String() + } + if len(offloaded) == 0 { + got = nil + } + + require.Equal(t, tt.want, got) + }) + } +}