Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logqlmetric): implement absent_over_time #392

Merged
merged 8 commits into from
Apr 26, 2024
16 changes: 16 additions & 0 deletions dev/local/ch-logql-compliance/logql-test-queries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,29 @@ test_cases:
{job="varlogs"} [{{ .range }}]
)
variant_args: ["simpleRangeAggOp", "range"]
# Absent over time range aggregation.
# NOTE(tdakkota): We do a separate test because `absent_over_time` returns non-empty result
# only if there is NO samples in step and vice versa.
- query: |-
absent_over_time(
{job="varlogs"} [{{ .range }}]
)
variant_args: ["range"]
should_be_empty: true
- query: |-
absent_over_time(
{job="varlogs"} |= "no way line would contain this message" [{{ .range }}]
)
variant_args: ["range"]
# Unwrap sampler.
- query: |-
{{ .unwrapRangeAggOp }}(
{job="varlogs"} | json | unwrap status
[{{ .range }}]
)
variant_args: ["unwrapRangeAggOp", "range"]
skip_comparison: true # It seems, there is some issues with unwrap.
# Vector aggregation.
- query: |-
{{ .simpleVecAggOp }} by (filename) (
Expand Down
3 changes: 2 additions & 1 deletion internal/chstorage/querier_logs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logstorage"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalPar
}

func entryMapper(r logstorage.Record) (logqlengine.Entry, error) {
set := logqlengine.NewLabelSet()
set := logqlabels.NewLabelSet()
e := logqlengine.Entry{
Timestamp: r.Timestamp,
Line: r.Body,
Expand Down
3 changes: 2 additions & 1 deletion internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (v *SampleQuery) Eval(ctx context.Context, q *Querier) (_ logqlengine.Sampl
result = append(result, logqlmetric.SampledEntry{
Timestamp: pcommon.NewTimestampFromTime(timestamp),
Sample: sample,
Set: logqlengine.AggregatedLabelsFromMap(labels),
Set: logqlabels.AggregatedLabelsFromMap(labels),
})
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"regexp"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand All @@ -20,6 +21,6 @@ const ansiPattern = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\
var ansiRegex = regexp.MustCompile(ansiPattern)

// Process implements Processor.
func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (string, bool) {
func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (string, bool) {
return ansiRegex.ReplaceAllString(line, ""), true
}
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/decolorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
)

func TestDecolorize(t *testing.T) {
Expand All @@ -26,7 +27,7 @@ func TestDecolorize(t *testing.T) {
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
set := NewLabelSet()
set := logqlabels.NewLabelSet()

f, err := buildDecolorize(&logql.DecolorizeExpr{})
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logqlengine

import (
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand All @@ -24,7 +25,7 @@ func buildDistinctFilter(stage *logql.DistinctFilter) (Processor, error) {
}

// Process implements Processor.
func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
for _, label := range d.labels {
val, ok := set.GetString(label)
if !ok {
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -36,7 +37,7 @@ func buildDropLabels(stage *logql.DropLabelsExpr) (Processor, error) {
}

// Process implements Processor.
func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
set.Range(func(label logql.Label, val pcommon.Value) {
if k.dropPair(label, val) {
set.Delete(label)
Expand Down
3 changes: 1 addition & 2 deletions internal/logql/logqlengine/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func TestDropLabels(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set.labels = tt.input
set := newLabelSet(tt.input)
newLine, ok := e.Process(0, ``, set)
// Ensure that processor does not change the line.
require.Equal(t, ``, newLine)
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/engine_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric"
"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -38,7 +39,7 @@ const (
type Entry struct {
Timestamp otelstorage.Timestamp
Line string
Set LabelSet
Set logqlabels.LabelSet
}

type (
Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/otelstorage"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (n *mockPipelineNode) EvalPipeline(ctx context.Context, params EvalParams)
}
}

set := NewLabelSet()
set := logqlabels.NewLabelSet()
set.SetAttrs(
otelstorage.Attrs(attrs),
otelstorage.Attrs(scopeAttrs),
Expand Down
9 changes: 5 additions & 4 deletions internal/logql/logqlengine/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/jsonexpr"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -52,7 +53,7 @@ func buildJSONExtractor(stage *logql.JSONExpressionParser) (Processor, error) {
}

// Process implements Processor.
func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
var err error
switch {
case len(e.paths) != 0:
Expand All @@ -68,7 +69,7 @@ func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelS
return line, true
}

func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet) error {
func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set logqlabels.LabelSet) error {
// TODO(tdakkota): allocates buffer for each line.
d := decodeStr(line)
return jsonexpr.Extract(
Expand All @@ -80,7 +81,7 @@ func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet
)
}

func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) error {
func extractSome(labels map[logql.Label]struct{}, line string, set logqlabels.LabelSet) error {
d := decodeStr(line)
return d.ObjBytes(func(d *jx.Decoder, key []byte) error {
if _, ok := labels[logql.Label(key)]; !ok {
Expand All @@ -101,7 +102,7 @@ func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) err
})
}

func extractAll(line string, set LabelSet) error {
func extractAll(line string, set logqlabels.LabelSet) error {
d := decodeStr(line)
return d.Obj(func(d *jx.Decoder, key string) error {
value, ok, err := parseValue(d)
Expand Down
9 changes: 5 additions & 4 deletions internal/logql/logqlengine/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
)

func TestJSONExtractor(t *testing.T) {
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestJSONExtractor(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set := logqlabels.NewLabelSet()
newLine, ok := e.Process(0, tt.input, set)
// Ensure that extractor does not change the line.
require.Equal(t, tt.input, newLine)
Expand All @@ -104,7 +105,7 @@ func TestJSONExtractor(t *testing.T) {
errMsg, ok := set.GetError()
require.False(t, ok, "got error: %s", errMsg)

require.Len(t, set.labels, len(tt.expectLabels))
require.Equal(t, len(tt.expectLabels), set.Len())
for k, expect := range tt.expectLabels {
got, ok := set.Get(k)
require.Truef(t, ok, "key %q", k)
Expand Down Expand Up @@ -172,7 +173,7 @@ func BenchmarkJSONExtractor(b *testing.B) {
p, err := buildJSONExtractor(bb.expr)
require.NoError(b, err)

set := NewLabelSet()
set := logqlabels.NewLabelSet()
var (
line string
ok bool
Expand All @@ -182,7 +183,7 @@ func BenchmarkJSONExtractor(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
set.reset()
set.Reset()
line, ok = p.Process(10, benchdata, set)
}

Expand Down
3 changes: 2 additions & 1 deletion internal/logql/logqlengine/keep.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -36,7 +37,7 @@ func buildKeepLabels(stage *logql.KeepLabelsExpr) (Processor, error) {
}

// Process implements Processor.
func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) {
func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) {
set.Range(func(label logql.Label, val pcommon.Value) {
if !k.keepPair(label, val) {
set.Delete(label)
Expand Down
3 changes: 1 addition & 2 deletions internal/logql/logqlengine/keep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ func TestKeepLabels(t *testing.T) {
})
require.NoError(t, err)

set := NewLabelSet()
set.labels = tt.input
set := newLabelSet(tt.input)
newLine, ok := e.Process(0, ``, set)
// Ensure that processor does not change the line.
require.Equal(t, ``, newLine)
Expand Down
15 changes: 8 additions & 7 deletions internal/logql/logqlengine/label_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-faster/errors"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ type AndLabelMatcher struct {
}

// Process implements Processor.
func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
line, keep = m.Left.Process(ts, line, set)
if !keep {
return line, keep
Expand All @@ -82,7 +83,7 @@ type OrLabelMatcher struct {
}

// Process implements Processor.
func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
line, keep = m.Left.Process(ts, line, set)
if keep {
return line, keep
Expand All @@ -109,7 +110,7 @@ func buildLabelMatcher(pred logql.LabelMatcher) (Processor, error) {
}

// Process implements Processor.
func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
labelValue, _ := set.GetString(lf.name)
keep = lf.matcher.Match(labelValue)
return line, keep
Expand Down Expand Up @@ -160,7 +161,7 @@ func buildDurationLabelFilter(pred *logql.DurationFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down Expand Up @@ -222,7 +223,7 @@ func buildBytesLabelFilter(pred *logql.BytesFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down Expand Up @@ -284,7 +285,7 @@ func buildNumberLabelFilter(pred *logql.NumberFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
switch val, ok, err := set.GetFloat(lf.name); {
case err != nil:
// Keep the line, but set error label.
Expand Down Expand Up @@ -314,7 +315,7 @@ func buildIPLabelFilter(pred *logql.IPFilter) (Processor, error) {
}

// Process implements Processor.
func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) {
func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) {
v, ok := set.GetString(lf.name)
if !ok {
return "", false
Expand Down
Loading
Loading