Skip to content

Commit

Permalink
feat(chstorage): offload OR line filters
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 3, 2024
1 parent 0a01405 commit 4753363
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 95 deletions.
12 changes: 2 additions & 10 deletions internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End))
)
for _, m := range opts.Query.Matchers {
expr, err := q.logQLLabelMatcher(m, mapping)
if err != nil {
return nil, err
}
query.Where(expr)
query.Where(q.logQLLabelMatcher(m, mapping))
}
query.Order(chsql.Ident("value"), chsql.Asc).
Limit(q.labelLimit)
Expand Down Expand Up @@ -371,11 +367,7 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re
for _, sel := range sels {
selExprs := make([]chsql.Expr, 0, len(sel.Matchers))
for _, m := range sel.Matchers {
expr, err := q.logQLLabelMatcher(m, mapping)
if err != nil {
return result, err
}
selExprs = append(selExprs, expr)
selExprs = append(selExprs, q.logQLLabelMatcher(m, mapping))
}
sets = append(sets, chsql.JoinAnd(selExprs...))
}
Expand Down
7 changes: 6 additions & 1 deletion internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ func (o *ClickhouseOptimizer) canOffloadLineFilter(lf *logql.LineFilter) bool {
case logql.OpPattern, logql.OpNotPattern:
return false
}
if lf.By.IP || len(lf.Or) > 0 {
if lf.By.IP {
return false
}
for _, by := range lf.Or {
if by.IP {
return false
}
}
return true
}
163 changes: 79 additions & 84 deletions internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ func (v *LogsQuery[E]) Execute(ctx context.Context, q *Querier) (_ iterators.Ite
out = newLogColumns()
query = chsql.Select(table, out.ChsqlResult()...)
)
if err := (logQueryPredicates{
(logQueryPredicates{
Start: v.Start,
End: v.End,
Labels: v.Labels,
Line: v.Line,
}).write(query, mapping, q); err != nil {
return nil, err
}
}).write(query, mapping, q)

switch d := v.Direction; d {
case logqlengine.DirectionBackward:
Expand Down Expand Up @@ -226,14 +224,12 @@ func (v *SampleQuery) Execute(ctx context.Context, q *Querier) (_ logqlengine.Sa
},
)
)
if err := (logQueryPredicates{
(logQueryPredicates{
Start: v.Start,
End: v.End,
Labels: v.Labels,
Line: v.Line,
}).write(query, mapping, q); err != nil {
return nil, err
}
}).write(query, mapping, q)
query.Order(chsql.Ident("timestamp"), chsql.Asc)

var result []logqlmetric.SampledEntry
Expand Down Expand Up @@ -309,108 +305,107 @@ func (p logQueryPredicates) write(
query *chsql.SelectQuery,
mapping map[string]string,
q *Querier,
) error {
) {
query.Where(
chsql.InTimeRange("timestamp", p.Start, p.End),
)
for _, m := range p.Labels {
expr, err := q.logQLLabelMatcher(m, mapping)
if err != nil {
return err
}
query.Where(expr)
query.Where(q.logQLLabelMatcher(m, mapping))
}
for _, m := range p.Line {
expr, err := q.lineFilter(m)
if err != nil {
return err
}
query.Where(expr)
query.Where(q.lineFilter(m))
}
return nil
}

func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr, rerr error) {
func (q *Querier) lineFilter(m logql.LineFilter) (e chsql.Expr) {
defer func() {
if rerr == nil {
switch m.Op {
case logql.OpNotEq, logql.OpNotRe:
e = chsql.Not(e)
}
switch m.Op {
case logql.OpNotEq, logql.OpNotRe:
e = chsql.Not(e)
}
}()

switch m.Op {
case logql.OpEq, logql.OpNotEq:
expr := chsql.Contains("body", m.By.Value)

// Clickhouse does not use tokenbf_v1 index to skip blocks
// with position* functions for some reason.
//
// Force to skip using hasToken function.
//
// Note that such optimization is applied only if operation is not negated to
// avoid false-negative skipping.
if val := m.By.Value; m.Op != logql.OpNotEq && chsql.IsSingleToken(val) {
expr = chsql.And(expr,
chsql.HasToken(chsql.Ident("body"), val),
)
}
matcher := func(op logql.BinOp, by logql.LineFilterValue) chsql.Expr {
switch op {
case logql.OpEq, logql.OpNotEq:
expr := chsql.Contains("body", by.Value)

// Clickhouse does not use tokenbf_v1 index to skip blocks
// with position* functions for some reason.
//
// Force to skip using hasToken function.
//
// Note that such optimization is applied only if operation is not negated to
// avoid false-negative skipping.
if val := by.Value; op != logql.OpNotEq && chsql.IsSingleToken(val) {
expr = chsql.And(expr,
chsql.HasToken(chsql.Ident("body"), val),
)
}

{
// HACK: check for special case of hex-encoded trace_id and span_id.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
// TODO(ernado): also handle regex?
v, _ := hex.DecodeString(m.By.Value)
switch len(v) {
case len(otelstorage.TraceID{}):
expr = chsql.Or(expr, chsql.Eq(
chsql.Ident("trace_id"),
chsql.Unhex(chsql.String(m.By.Value)),
))
case len(otelstorage.SpanID{}):
expr = chsql.Or(expr, chsql.Eq(
chsql.Ident("span_id"),
chsql.Unhex(chsql.String(m.By.Value)),
))
{
// HACK: check for special case of hex-encoded trace_id and span_id.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
// TODO(ernado): also handle regex?
v, _ := hex.DecodeString(by.Value)
switch len(v) {
case len(otelstorage.TraceID{}):
expr = chsql.Or(expr, chsql.Eq(
chsql.Ident("trace_id"),
chsql.Unhex(chsql.String(by.Value)),
))
case len(otelstorage.SpanID{}):
expr = chsql.Or(expr, chsql.Eq(
chsql.Ident("span_id"),
chsql.Unhex(chsql.String(by.Value)),
))
}
}
return expr
case logql.OpRe, logql.OpNotRe:
return chsql.Match(chsql.Ident("body"), chsql.String(by.Value))
default:
panic(fmt.Sprintf("unexpected line matcher op %v", m.Op))
}
return expr, nil
case logql.OpRe, logql.OpNotRe:
return chsql.Match(chsql.Ident("body"), chsql.String(m.By.Value)), nil
default:
return e, errors.Errorf("unexpected line matcher op %v", m.Op)
}

if len(m.Or) == 0 {
return matcher(m.Op, m.By)
}
matchers := make([]chsql.Expr, 0, len(m.Or)+1)
matchers = append(matchers, matcher(m.Op, m.By))
for _, by := range m.Or {
matchers = append(matchers, matcher(m.Op, by))
}
return chsql.JoinOr(matchers...)
}

func (q *Querier) logQLLabelMatcher(
m logql.LabelMatcher,
mapping map[string]string,
) (e chsql.Expr, rerr error) {
) (e chsql.Expr) {
defer func() {
if rerr == nil {
switch m.Op {
case logql.OpNotEq, logql.OpNotRe:
e = chsql.Not(e)
}
switch m.Op {
case logql.OpNotEq, logql.OpNotRe:
e = chsql.Not(e)
}
}()

matchHex := func(column chsql.Expr, m logql.LabelMatcher) (e chsql.Expr, _ error) {
matchHex := func(column chsql.Expr, m logql.LabelMatcher) (e chsql.Expr) {
switch m.Op {
case logql.OpEq, logql.OpNotEq:
return chsql.Eq(
column,
chsql.Unhex(chsql.String(m.Value)),
), nil
)
case logql.OpRe, logql.OpNotRe:
// FIXME(tdakkota): match is case-sensitive
return chsql.Match(
chsql.Hex(column),
chsql.String(m.Value),
), nil
)
default:
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
panic(fmt.Sprintf("unexpected label matcher op %v", m.Op))
}
}

Expand All @@ -431,7 +426,7 @@ func (q *Querier) logQLLabelMatcher(
break
}
}
return chsql.ColumnEq("severity_number", severityNumber), nil
return chsql.ColumnEq("severity_number", severityNumber)
case logql.OpRe, logql.OpNotRe:
matches := make([]int, 0, 6)
for i := plog.SeverityNumberUnspecified; i <= plog.SeverityNumberFatal4; i++ {
Expand All @@ -446,18 +441,18 @@ func (q *Querier) logQLLabelMatcher(
}
}
}
return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...)), nil
return chsql.In(chsql.Ident("severity_number"), chsql.TupleValues(matches...))
default:
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
panic(fmt.Sprintf("unexpected label matcher op %v", m.Op))
}
case logstorage.LabelBody:
switch m.Op {
case logql.OpEq, logql.OpNotEq:
return chsql.Contains("body", m.Value), nil
return chsql.Contains("body", m.Value)
case logql.OpRe, logql.OpNotRe:
return chsql.Match(chsql.Ident("body"), chsql.String(m.Value)), nil
return chsql.Match(chsql.Ident("body"), chsql.String(m.Value))
default:
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
panic(fmt.Sprintf("unexpected label matcher op %v", m.Op))
}
case logstorage.LabelSpanID:
return matchHex(chsql.Ident("span_id"), m)
Expand All @@ -468,11 +463,11 @@ func (q *Querier) logQLLabelMatcher(
if ok {
switch m.Op {
case logql.OpEq, logql.OpNotEq:
return chsql.Eq(expr, chsql.String(m.Value)), nil
return chsql.Eq(expr, chsql.String(m.Value))
case logql.OpRe, logql.OpNotRe:
return chsql.Match(expr, chsql.String(m.Value)), nil
return chsql.Match(expr, chsql.String(m.Value))
default:
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
panic(fmt.Sprintf("unexpected label matcher op %v", m.Op))
}
}

Expand All @@ -494,10 +489,10 @@ func (q *Querier) logQLLabelMatcher(
case logql.OpRe, logql.OpNotRe:
sub = chsql.Match(selector, chsql.String(m.Value))
default:
return e, errors.Errorf("unexpected label matcher op %v", m.Op)
panic(fmt.Sprintf("unexpected label matcher op %v", m.Op))
}
exprs = append(exprs, sub)
}
return chsql.JoinOr(exprs...), nil
return chsql.JoinOr(exprs...)
}
}

0 comments on commit 4753363

Please sign in to comment.