Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chstorage): offload unaffected label filters #453

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 125 additions & 20 deletions internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package chstorage

import (
"context"
"slices"

"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlpattern"
)

// ClickhouseOptimizer replaces LogQL engine execution
Expand Down Expand Up @@ -179,44 +181,147 @@ func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode, lg *z
}

func (o *ClickhouseOptimizer) offloadLabelFilters(pipeline []logql.PipelineStage) (filters []logql.LabelPredicate) {
stageLoop:
for _, stage := range pipeline {
switch stage := stage.(type) {
case *logql.LabelFilter:
if !o.canOffloadLabelPredicate(stage.Pred) {
continue
labels := make(map[logql.Label]struct{}, 4)
nextStage:
for i, stage := range pipeline {
f, ok := stage.(*logql.LabelFilter)
if !ok {
continue
}

// Collect labels that predicate matches.
clear(labels)
if !o.collectPredicateLabels(labels, f.Pred) {
continue
}

// Ensure that any stage before filter does not affect labels
// that predicates matches.
for label := range labels {
if slices.ContainsFunc(pipeline[:i], func(stage logql.PipelineStage) bool {
return o.affectsLabel(stage, label)
}) {
continue nextStage
}
filters = append(filters, stage.Pred)
case *logql.DecolorizeExpr,
*logql.LineFilter:
// Do nothing on label set, just skip.
default:
// Stage modify the label set, can't offload label filters after this stage.
break stageLoop
}

filters = append(filters, f.Pred)
}
return filters
}

func (o *ClickhouseOptimizer) canOffloadLabelPredicate(p logql.LabelPredicate) bool {
switch p := p.(type) {
func (o *ClickhouseOptimizer) affectsLabel(stage logql.PipelineStage, label logql.Label) bool {
isErrorLabel := label == logql.ErrorLabel ||
label == logql.ErrorDetailsLabel

inLabelExpr := func(exprs []logql.LabelExtractionExpr, label logql.Label) bool {
return slices.ContainsFunc(exprs, func(expr logql.LabelExtractionExpr) bool {
return expr.Label == label
})
}

parserAffectsLabel := func(isErrorLabel bool, labels []logql.Label, exprs []logql.LabelExtractionExpr) bool {
// Parsing might fail.
if isErrorLabel {
return true
}

// All parsed fields would be added to label set.
if len(labels)+len(exprs) == 0 {
return true
}

// Any parsed label is affected.
return slices.Contains(labels, label) ||
inLabelExpr(exprs, label)
}

switch stage := stage.(type) {
case *logql.LineFilter:
return false
case *logql.JSONExpressionParser:
return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs)
case *logql.LogfmtExpressionParser:
return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs)
case *logql.RegexpLabelParser:
for _, v := range stage.Mapping {
if v == label {
return true
}
}
return false
case *logql.PatternLabelParser:
for _, p := range stage.Pattern.Parts {
if p.Type == logqlpattern.Capture &&
p.Value == string(label) {
return true
}
}
return false
case *logql.UnpackLabelParser:
return true
case *logql.LineFormat:
return isErrorLabel
case *logql.DecolorizeExpr:
return false
case *logql.LabelFilter:
return isErrorLabel && o.failableLabelPredicate(stage.Pred)
case *logql.LabelFormatExpr:
// Template might fail, occurred error would be added to label set.
if len(stage.Values) > 0 && isErrorLabel {
return true
}

// Any renamed-to label is affected.
if slices.ContainsFunc(stage.Labels, func(expr logql.RenameLabel) bool {
return expr.To == label
}) {
return true
}

// Any formatted label is affected.
return slices.ContainsFunc(stage.Values, func(expr logql.LabelTemplate) bool {
return expr.Label == label
})
default:
return true
}
}

func (o *ClickhouseOptimizer) collectPredicateLabels(labels map[logql.Label]struct{}, pred logql.LabelPredicate) bool {
pred = logql.UnparenLabelPredicate(pred)

switch pred := pred.(type) {
case *logql.LabelPredicateBinOp:
switch p.Op {
switch pred.Op {
case logql.OpAnd, logql.OpOr:
default:
return false
}
return o.canOffloadLabelPredicate(p.Left) &&
o.canOffloadLabelPredicate(p.Right)
case *logql.LabelPredicateParen:
return o.canOffloadLabelPredicate(p.X)
return o.collectPredicateLabels(labels, pred.Left) &&
o.collectPredicateLabels(labels, pred.Right)
case *logql.LabelMatcher:
labels[pred.Label] = struct{}{}
return true
default:
return false
}
}

func (o *ClickhouseOptimizer) failableLabelPredicate(pred logql.LabelPredicate) bool {
pred = logql.UnparenLabelPredicate(pred)

switch pred := pred.(type) {
case *logql.LabelPredicateBinOp:
return o.failableLabelPredicate(pred.Left) ||
o.failableLabelPredicate(pred.Right)
case *logql.LabelMatcher:
return false
default:
return true
}
}

func (o *ClickhouseOptimizer) offloadLineFilters(pipeline []logql.PipelineStage) (line []logql.LineFilter) {
stageLoop:
for _, stage := range pipeline {
Expand Down
Loading
Loading