From 4753363f971e50da534c41483dbf3265b0df9acf Mon Sep 17 00:00:00 2001 From: tdakkota Date: Wed, 3 Jul 2024 18:25:33 +0300 Subject: [PATCH] feat(chstorage): offload OR line filters --- internal/chstorage/querier_logs.go | 12 +- internal/chstorage/querier_logs_optimizer.go | 7 +- internal/chstorage/querier_logs_query.go | 163 +++++++++---------- 3 files changed, 87 insertions(+), 95 deletions(-) diff --git a/internal/chstorage/querier_logs.go b/internal/chstorage/querier_logs.go index 4ad1e87c..2df84944 100644 --- a/internal/chstorage/querier_logs.go +++ b/internal/chstorage/querier_logs.go @@ -183,11 +183,7 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)) ) for _, m := range opts.Query.Matchers { - expr, err := q.logQLLabelMatcher(m, mapping) - if err != nil { - return nil, err - } - query.Where(expr) + query.Where(q.logQLLabelMatcher(m, mapping)) } query.Order(chsql.Ident("value"), chsql.Asc). Limit(q.labelLimit) @@ -371,11 +367,7 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re for _, sel := range sels { selExprs := make([]chsql.Expr, 0, len(sel.Matchers)) for _, m := range sel.Matchers { - expr, err := q.logQLLabelMatcher(m, mapping) - if err != nil { - return result, err - } - selExprs = append(selExprs, expr) + selExprs = append(selExprs, q.logQLLabelMatcher(m, mapping)) } sets = append(sets, chsql.JoinAnd(selExprs...)) } diff --git a/internal/chstorage/querier_logs_optimizer.go b/internal/chstorage/querier_logs_optimizer.go index 95610b3b..70fa27b1 100644 --- a/internal/chstorage/querier_logs_optimizer.go +++ b/internal/chstorage/querier_logs_optimizer.go @@ -184,8 +184,13 @@ func (o *ClickhouseOptimizer) canOffloadLineFilter(lf *logql.LineFilter) bool { case logql.OpPattern, logql.OpNotPattern: return false } - if lf.By.IP || len(lf.Or) > 0 { + if lf.By.IP { return false } + for _, by := range lf.Or { + if by.IP { + return false + } + } return true } diff --git a/internal/chstorage/querier_logs_query.go b/internal/chstorage/querier_logs_query.go index c0a7597a..4a186b6b 100644 --- a/internal/chstorage/querier_logs_query.go +++ b/internal/chstorage/querier_logs_query.go @@ -74,14 +74,12 @@ func (v *LogsQuery[E]) Execute(ctx context.Context, q *Querier) (_ iterators.Ite out = newLogColumns() query = chsql.Select(table, out.ChsqlResult()...) ) - if err := (logQueryPredicates{ + (logQueryPredicates{ Start: v.Start, End: v.End, Labels: v.Labels, Line: v.Line, - }).write(query, mapping, q); err != nil { - return nil, err - } + }).write(query, mapping, q) switch d := v.Direction; d { case logqlengine.DirectionBackward: @@ -226,14 +224,12 @@ func (v *SampleQuery) Execute(ctx context.Context, q *Querier) (_ logqlengine.Sa }, ) ) - if err := (logQueryPredicates{ + (logQueryPredicates{ Start: v.Start, End: v.End, Labels: v.Labels, Line: v.Line, - }).write(query, mapping, q); err != nil { - return nil, err - } + }).write(query, mapping, q) query.Order(chsql.Ident("timestamp"), chsql.Asc) var result []logqlmetric.SampledEntry @@ -309,108 +305,107 @@ func (p logQueryPredicates) write( query *chsql.SelectQuery, mapping map[string]string, q *Querier, -) error { +) { query.Where( chsql.InTimeRange("timestamp", p.Start, p.End), ) for _, m := range p.Labels { - expr, err := q.logQLLabelMatcher(m, mapping) - if err != nil { - return err - } - query.Where(expr) + query.Where(q.logQLLabelMatcher(m, mapping)) } for _, m := range p.Line { - expr, err := q.lineFilter(m) - if err != nil { - return err - } - query.Where(expr) + query.Where(q.lineFilter(m)) } - return nil } -func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr, rerr error) { +func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr) { defer func() { - if rerr == nil { - switch m.Op { - case logql.OpNotEq, logql.OpNotRe: - e = chsql.Not(e) - } + switch m.Op { + case logql.OpNotEq, logql.OpNotRe: + e = chsql.Not(e) } }() - switch m.Op { - case logql.OpEq, logql.OpNotEq: - expr := chsql.Contains("body", m.By.Value) - - // Clickhouse does not use tokenbf_v1 index to skip blocks - // with position* functions for some reason. - // - // Force to skip using hasToken function. - // - // Note that such optimization is applied only if operation is not negated to - // avoid false-negative skipping. - if val := m.By.Value; m.Op != logql.OpNotEq && chsql.IsSingleToken(val) { - expr = chsql.And(expr, - chsql.HasToken(chsql.Ident("body"), val), - ) - } + matcher := func(op logql.BinOp, by logql.LineFilterValue) chsql.Expr { + switch op { + case logql.OpEq, logql.OpNotEq: + expr := chsql.Contains("body", by.Value) + + // Clickhouse does not use tokenbf_v1 index to skip blocks + // with position* functions for some reason. + // + // Force to skip using hasToken function. + // + // Note that such optimization is applied only if operation is not negated to + // avoid false-negative skipping. + if val := by.Value; op != logql.OpNotEq && chsql.IsSingleToken(val) { + expr = chsql.And(expr, + chsql.HasToken(chsql.Ident("body"), val), + ) + } - { - // HACK: check for special case of hex-encoded trace_id and span_id. - // Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`. - // TODO(ernado): also handle regex? - v, _ := hex.DecodeString(m.By.Value) - switch len(v) { - case len(otelstorage.TraceID{}): - expr = chsql.Or(expr, chsql.Eq( - chsql.Ident("trace_id"), - chsql.Unhex(chsql.String(m.By.Value)), - )) - case len(otelstorage.SpanID{}): - expr = chsql.Or(expr, chsql.Eq( - chsql.Ident("span_id"), - chsql.Unhex(chsql.String(m.By.Value)), - )) + { + // HACK: check for special case of hex-encoded trace_id and span_id. + // Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`. + // TODO(ernado): also handle regex? + v, _ := hex.DecodeString(by.Value) + switch len(v) { + case len(otelstorage.TraceID{}): + expr = chsql.Or(expr, chsql.Eq( + chsql.Ident("trace_id"), + chsql.Unhex(chsql.String(by.Value)), + )) + case len(otelstorage.SpanID{}): + expr = chsql.Or(expr, chsql.Eq( + chsql.Ident("span_id"), + chsql.Unhex(chsql.String(by.Value)), + )) + } } + return expr + case logql.OpRe, logql.OpNotRe: + return chsql.Match(chsql.Ident("body"), chsql.String(by.Value)) + default: + panic(fmt.Sprintf("unexpected line matcher op %v", m.Op)) } - return expr, nil - case logql.OpRe, logql.OpNotRe: - return chsql.Match(chsql.Ident("body"), chsql.String(m.By.Value)), nil - default: - return e, errors.Errorf("unexpected line matcher op %v", m.Op) } + + if len(m.Or) == 0 { + return matcher(m.Op, m.By) + } + matchers := make([]chsql.Expr, 0, len(m.Or)+1) + matchers = append(matchers, matcher(m.Op, m.By)) + for _, by := range m.Or { + matchers = append(matchers, matcher(m.Op, by)) + } + return chsql.JoinOr(matchers...) } func (q *Querier) logQLLabelMatcher( m logql.LabelMatcher, mapping map[string]string, -) (e chsql.Expr, rerr error) { +) (e chsql.Expr) { defer func() { - if rerr == nil { - switch m.Op { - case logql.OpNotEq, logql.OpNotRe: - e = chsql.Not(e) - } + switch m.Op { + case logql.OpNotEq, logql.OpNotRe: + e = chsql.Not(e) } }() - matchHex := func(column chsql.Expr, m logql.LabelMatcher) (e chsql.Expr, _ error) { + matchHex := func(column chsql.Expr, m logql.LabelMatcher) (e chsql.Expr) { switch m.Op { case logql.OpEq, logql.OpNotEq: return chsql.Eq( column, chsql.Unhex(chsql.String(m.Value)), - ), nil + ) case logql.OpRe, logql.OpNotRe: // FIXME(tdakkota): match is case-sensitive return chsql.Match( chsql.Hex(column), chsql.String(m.Value), - ), nil + ) default: - return e, errors.Errorf("unexpected label matcher op %v", m.Op) + panic(fmt.Sprintf("unexpected label matcher op %v", m.Op)) } } @@ -431,7 +426,7 @@ func (q *Querier) logQLLabelMatcher( break } } - return chsql.ColumnEq("severity_number", severityNumber), nil + return chsql.ColumnEq("severity_number", severityNumber) case logql.OpRe, logql.OpNotRe: matches := make([]int, 0, 6) for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ { @@ -446,18 +441,18 @@ func (q *Querier) logQLLabelMatcher( } } } - return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...)), nil + return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...)) default: - return e, errors.Errorf("unexpected label matcher op %v", m.Op) + panic(fmt.Sprintf("unexpected label matcher op %v", m.Op)) } case logstorage.LabelBody: switch m.Op { case logql.OpEq, logql.OpNotEq: - return chsql.Contains("body", m.Value), nil + return chsql.Contains("body", m.Value) case logql.OpRe, logql.OpNotRe: - return chsql.Match(chsql.Ident("body"), chsql.String(m.Value)), nil + return chsql.Match(chsql.Ident("body"), chsql.String(m.Value)) default: - return e, errors.Errorf("unexpected label matcher op %v", m.Op) + panic(fmt.Sprintf("unexpected label matcher op %v", m.Op)) } case logstorage.LabelSpanID: return matchHex(chsql.Ident("span_id"), m) @@ -468,11 +463,11 @@ func (q *Querier) logQLLabelMatcher( if ok { switch m.Op { case logql.OpEq, logql.OpNotEq: - return chsql.Eq(expr, chsql.String(m.Value)), nil + return chsql.Eq(expr, chsql.String(m.Value)) case logql.OpRe, logql.OpNotRe: - return chsql.Match(expr, chsql.String(m.Value)), nil + return chsql.Match(expr, chsql.String(m.Value)) default: - return e, errors.Errorf("unexpected label matcher op %v", m.Op) + panic(fmt.Sprintf("unexpected label matcher op %v", m.Op)) } } @@ -494,10 +489,10 @@ func (q *Querier) logQLLabelMatcher( case logql.OpRe, logql.OpNotRe: sub = chsql.Match(selector, chsql.String(m.Value)) default: - return e, errors.Errorf("unexpected label matcher op %v", m.Op) + panic(fmt.Sprintf("unexpected label matcher op %v", m.Op)) } exprs = append(exprs, sub) } - return chsql.JoinOr(exprs...), nil + return chsql.JoinOr(exprs...) } }