Skip to content

Commit

Permalink
Merge pull request #453 from go-faster/feat/stage-effects
Browse files Browse the repository at this point in the history
feat(chstorage): offload unaffected label filters
  • Loading branch information
tdakkota authored Jul 15, 2024
2 parents 06783f7 + 780cfdd commit ddd413c
Show file tree
Hide file tree
Showing 2 changed files with 396 additions and 20 deletions.
145 changes: 125 additions & 20 deletions internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package chstorage

import (
"context"
"slices"

"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlpattern"
)

// ClickhouseOptimizer replaces LogQL engine execution
Expand Down Expand Up @@ -179,44 +181,147 @@ func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode, lg *z
}

func (o *ClickhouseOptimizer) offloadLabelFilters(pipeline []logql.PipelineStage) (filters []logql.LabelPredicate) {
stageLoop:
for _, stage := range pipeline {
switch stage := stage.(type) {
case *logql.LabelFilter:
if !o.canOffloadLabelPredicate(stage.Pred) {
continue
labels := make(map[logql.Label]struct{}, 4)
nextStage:
for i, stage := range pipeline {
f, ok := stage.(*logql.LabelFilter)
if !ok {
continue
}

// Collect labels that predicate matches.
clear(labels)
if !o.collectPredicateLabels(labels, f.Pred) {
continue
}

// Ensure that any stage before filter does not affect labels
// that predicates matches.
for label := range labels {
if slices.ContainsFunc(pipeline[:i], func(stage logql.PipelineStage) bool {
return o.affectsLabel(stage, label)
}) {
continue nextStage
}
filters = append(filters, stage.Pred)
case *logql.DecolorizeExpr,
*logql.LineFilter:
// Do nothing on label set, just skip.
default:
// Stage modify the label set, can't offload label filters after this stage.
break stageLoop
}

filters = append(filters, f.Pred)
}
return filters
}

func (o *ClickhouseOptimizer) canOffloadLabelPredicate(p logql.LabelPredicate) bool {
switch p := p.(type) {
func (o *ClickhouseOptimizer) affectsLabel(stage logql.PipelineStage, label logql.Label) bool {
isErrorLabel := label == logql.ErrorLabel ||
label == logql.ErrorDetailsLabel

inLabelExpr := func(exprs []logql.LabelExtractionExpr, label logql.Label) bool {
return slices.ContainsFunc(exprs, func(expr logql.LabelExtractionExpr) bool {
return expr.Label == label
})
}

parserAffectsLabel := func(isErrorLabel bool, labels []logql.Label, exprs []logql.LabelExtractionExpr) bool {
// Parsing might fail.
if isErrorLabel {
return true
}

// All parsed fields would be added to label set.
if len(labels)+len(exprs) == 0 {
return true
}

// Any parsed label is affected.
return slices.Contains(labels, label) ||
inLabelExpr(exprs, label)
}

switch stage := stage.(type) {
case *logql.LineFilter:
return false
case *logql.JSONExpressionParser:
return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs)
case *logql.LogfmtExpressionParser:
return parserAffectsLabel(isErrorLabel, stage.Labels, stage.Exprs)
case *logql.RegexpLabelParser:
for _, v := range stage.Mapping {
if v == label {
return true
}
}
return false
case *logql.PatternLabelParser:
for _, p := range stage.Pattern.Parts {
if p.Type == logqlpattern.Capture &&
p.Value == string(label) {
return true
}
}
return false
case *logql.UnpackLabelParser:
return true
case *logql.LineFormat:
return isErrorLabel
case *logql.DecolorizeExpr:
return false
case *logql.LabelFilter:
return isErrorLabel && o.failableLabelPredicate(stage.Pred)
case *logql.LabelFormatExpr:
// Template might fail, occurred error would be added to label set.
if len(stage.Values) > 0 && isErrorLabel {
return true
}

// Any renamed-to label is affected.
if slices.ContainsFunc(stage.Labels, func(expr logql.RenameLabel) bool {
return expr.To == label
}) {
return true
}

// Any formatted label is affected.
return slices.ContainsFunc(stage.Values, func(expr logql.LabelTemplate) bool {
return expr.Label == label
})
default:
return true
}
}

func (o *ClickhouseOptimizer) collectPredicateLabels(labels map[logql.Label]struct{}, pred logql.LabelPredicate) bool {
pred = logql.UnparenLabelPredicate(pred)

switch pred := pred.(type) {
case *logql.LabelPredicateBinOp:
switch p.Op {
switch pred.Op {
case logql.OpAnd, logql.OpOr:
default:
return false
}
return o.canOffloadLabelPredicate(p.Left) &&
o.canOffloadLabelPredicate(p.Right)
case *logql.LabelPredicateParen:
return o.canOffloadLabelPredicate(p.X)
return o.collectPredicateLabels(labels, pred.Left) &&
o.collectPredicateLabels(labels, pred.Right)
case *logql.LabelMatcher:
labels[pred.Label] = struct{}{}
return true
default:
return false
}
}

func (o *ClickhouseOptimizer) failableLabelPredicate(pred logql.LabelPredicate) bool {
pred = logql.UnparenLabelPredicate(pred)

switch pred := pred.(type) {
case *logql.LabelPredicateBinOp:
return o.failableLabelPredicate(pred.Left) ||
o.failableLabelPredicate(pred.Right)
case *logql.LabelMatcher:
return false
default:
return true
}
}

func (o *ClickhouseOptimizer) offloadLineFilters(pipeline []logql.PipelineStage) (line []logql.LineFilter) {
stageLoop:
for _, stage := range pipeline {
Expand Down
Loading

0 comments on commit ddd413c

Please sign in to comment.