From 57b524b3af5619725119554c39d18b4135826f1f Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 11:42:42 +0300 Subject: [PATCH 1/8] feat(logqlabels): add label utilities package --- .../logqlabels/aggregated_labels.go | 198 ++++++++++++++++++ .../logqlabels/aggregated_labels_test.go | 128 +++++++++++ .../logqlabels/empty_aggregated_labels.go | 33 +++ .../empty_aggregated_labels_test.go | 13 ++ .../logql/logqlengine/logqlabels/label_set.go | 184 ++++++++++++++++ .../logqlengine/logqlabels/logqlabels.go | 27 +++ 6 files changed, 583 insertions(+) create mode 100644 internal/logql/logqlengine/logqlabels/aggregated_labels.go create mode 100644 internal/logql/logqlengine/logqlabels/aggregated_labels_test.go create mode 100644 internal/logql/logqlengine/logqlabels/empty_aggregated_labels.go create mode 100644 internal/logql/logqlengine/logqlabels/empty_aggregated_labels_test.go create mode 100644 internal/logql/logqlengine/logqlabels/label_set.go create mode 100644 internal/logql/logqlengine/logqlabels/logqlabels.go diff --git a/internal/logql/logqlengine/logqlabels/aggregated_labels.go b/internal/logql/logqlengine/logqlabels/aggregated_labels.go new file mode 100644 index 00000000..9245d9aa --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/aggregated_labels.go @@ -0,0 +1,198 @@ +package logqlabels + +import ( + "cmp" + "maps" + "regexp" + "slices" + + "github.com/cespare/xxhash/v2" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/lokiapi" +) + +type aggregatedLabels struct { + entries []labelEntry + without map[string]struct{} + by map[string]struct{} +} + +type labelEntry struct { + name string + value string +} + +// AggregatedLabelsFromSet creates new [AggregatedLabels] from [LabelSet]. +func AggregatedLabelsFromSet(set LabelSet, by, without map[string]struct{}) AggregatedLabels { + labels := make([]labelEntry, 0, set.Len()) + set.Range(func(l logql.Label, v pcommon.Value) { + labels = append(labels, labelEntry{ + name: string(l), + value: v.AsString(), + }) + }) + slices.SortFunc(labels, func(a, b labelEntry) int { + return cmp.Compare(a.name, b.name) + }) + + return &aggregatedLabels{ + entries: labels, + without: without, + by: by, + } +} + +// AggregatedLabelsFromMap creates new [AggregatedLabels] from label map. +func AggregatedLabelsFromMap(m map[string]string) AggregatedLabels { + labels := make([]labelEntry, 0, len(m)) + for key, value := range m { + labels = append(labels, labelEntry{name: key, value: value}) + } + slices.SortFunc(labels, func(a, b labelEntry) int { + return cmp.Compare(a.name, b.name) + }) + + return &aggregatedLabels{ + entries: labels, + without: nil, + by: nil, + } +} + +// By returns new set of labels containing only given list of labels. +func (a *aggregatedLabels) By(labels ...logql.Label) AggregatedLabels { + if len(labels) == 0 { + return a + } + + sub := &aggregatedLabels{ + entries: a.entries, + without: a.without, + by: buildSet(maps.Clone(a.by), labels...), + } + return sub +} + +// Without returns new set of labels without given list of labels. +func (a *aggregatedLabels) Without(labels ...logql.Label) AggregatedLabels { + if len(labels) == 0 { + return a + } + + sub := &aggregatedLabels{ + entries: a.entries, + without: buildSet(maps.Clone(a.without), labels...), + by: a.by, + } + return sub +} + +// Key computes grouping key from set of labels. +func (a *aggregatedLabels) Key() GroupingKey { + h := xxhash.New() + a.forEach(func(k, v string) { + _, _ = h.WriteString(k) + _, _ = h.WriteString(v) + }) + return h.Sum64() +} + +// Replace replaces labels using given regexp. +func (a *aggregatedLabels) Replace(dstLabel, replacement, srcLabel string, re *regexp.Regexp) AggregatedLabels { + src := a.findEntry(srcLabel) + + idxs := re.FindStringSubmatchIndex(src) + if idxs == nil { + return a + } + + dst := re.ExpandString(nil, replacement, src, idxs) + if len(dst) == 0 { + // Destination value is empty, delete it. + a.deleteEntry(dstLabel) + } else { + a.setEntry(dstLabel, string(dst)) + } + + return a +} + +func (a *aggregatedLabels) findEntry(key string) string { + for _, e := range a.entries { + if e.name == key { + return e.value + } + } + return "" +} + +func (a *aggregatedLabels) deleteEntry(key string) { + n := 0 + for _, e := range a.entries { + if e.name == key { + continue + } + a.entries[n] = e + n++ + } + a.entries = a.entries[:n] +} + +func (a *aggregatedLabels) setEntry(key, value string) { + var entry *labelEntry + for i, e := range a.entries { + if e.name == key { + entry = &a.entries[i] + break + } + } + + replacement := labelEntry{ + name: key, + value: value, + } + if entry == nil { + a.entries = append(a.entries, replacement) + } else { + *entry = replacement + } +} + +// AsLokiAPI returns API structure for label set. +func (a *aggregatedLabels) AsLokiAPI() (r lokiapi.LabelSet) { + r = lokiapi.LabelSet{} + a.forEach(func(k, v string) { + r[k] = v + }) + return r +} + +func (a *aggregatedLabels) forEach(cb func(k, v string)) { + for _, e := range a.entries { + if _, ok := a.without[e.name]; ok { + continue + } + if len(a.by) > 0 { + if _, ok := a.by[e.name]; !ok { + continue + } + } + cb(e.name, e.value) + } +} + +func buildSet[K ~string](r map[string]struct{}, input ...K) map[string]struct{} { + if len(input) == 0 { + return r + } + + if r == nil { + r = make(map[string]struct{}, len(input)) + } + for _, k := range input { + r[string(k)] = struct{}{} + } + return r +} diff --git a/internal/logql/logqlengine/logqlabels/aggregated_labels_test.go b/internal/logql/logqlengine/logqlabels/aggregated_labels_test.go new file mode 100644 index 00000000..358bb917 --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/aggregated_labels_test.go @@ -0,0 +1,128 @@ +package logqlabels + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/go-faster/oteldb/internal/logql" +) + +func TestAggregatedLabels(t *testing.T) { + defaultSet := map[string]string{ + "foo": "1", + "bar": "2", + "baz": "3", + } + tests := []struct { + set map[string]string + by []logql.Label + without []logql.Label + want []string + }{ + // By. + { + defaultSet, + []logql.Label{"foo"}, + nil, + []string{"foo"}, + }, + { + defaultSet, + []logql.Label{"foo", "bar"}, + nil, + []string{"foo", "bar"}, + }, + // Without. + { + defaultSet, + nil, + []logql.Label{"bar", "baz"}, + []string{"foo"}, + }, + // Both. + { + defaultSet, + []logql.Label{"foo", "bar"}, + []logql.Label{"bar"}, + []string{"foo"}, + }, + { + defaultSet, + []logql.Label{"foo", "bar", "baz"}, + []logql.Label{"bar"}, + []string{"foo", "baz"}, + }, + } + for i, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + set := NewLabelSet() + for k, v := range tt.set { + set.Set(logql.Label(k), pcommon.NewValueStr(v)) + } + + buildWays := []struct { + name string + build func(set LabelSet, by, without []logql.Label) AggregatedLabels + }{ + { + "ByThenWithout", + func(set LabelSet, by, without []logql.Label) AggregatedLabels { + labels := AggregatedLabelsFromSet( + set, + nil, + nil, + ) + if len(tt.by) > 0 { + labels = labels.By(tt.by...) + } + if len(tt.without) > 0 { + labels = labels.Without(tt.without...) + } + return labels + }, + }, + { + "WithoutThenBy", + func(set LabelSet, by, without []logql.Label) AggregatedLabels { + labels := AggregatedLabelsFromSet( + set, + nil, + nil, + ) + if len(tt.without) > 0 { + labels = labels.Without(tt.without...) + } + if len(tt.by) > 0 { + labels = labels.By(tt.by...) + } + return labels + }, + }, + { + "Constructor", + func(set LabelSet, by, without []logql.Label) AggregatedLabels { + return AggregatedLabelsFromSet( + set, + buildSet(nil, by...), + buildSet(nil, without...), + ) + }, + }, + } + for _, bw := range buildWays { + bw := bw + t.Run(bw.name, func(t *testing.T) { + labels := bw.build(set, tt.by, tt.without) + got := labels.AsLokiAPI() + for _, k := range tt.want { + require.Contains(t, got, k) + } + }) + } + }) + } +} diff --git a/internal/logql/logqlengine/logqlabels/empty_aggregated_labels.go b/internal/logql/logqlengine/logqlabels/empty_aggregated_labels.go new file mode 100644 index 00000000..6500254f --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/empty_aggregated_labels.go @@ -0,0 +1,33 @@ +package logqlabels + +import ( + "regexp" + + "github.com/cespare/xxhash/v2" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/lokiapi" +) + +// EmptyAggregatedLabels returns empty set of aggregated labels. +func EmptyAggregatedLabels() AggregatedLabels { + return emptyAggregatedLabels +} + +var ( + emptyAggregatedLabels = new(emptyLabels) + + zeroHash = xxhash.New().Sum64() +) + +type emptyLabels struct{} + +func (l *emptyLabels) By(...logql.Label) AggregatedLabels { return l } + +func (l *emptyLabels) Without(...logql.Label) AggregatedLabels { return l } + +func (l *emptyLabels) Key() GroupingKey { return zeroHash } + +func (l *emptyLabels) Replace(_, _, _ string, _ *regexp.Regexp) AggregatedLabels { return l } + +func (l *emptyLabels) AsLokiAPI() lokiapi.LabelSet { return lokiapi.LabelSet{} } diff --git a/internal/logql/logqlengine/logqlabels/empty_aggregated_labels_test.go b/internal/logql/logqlengine/logqlabels/empty_aggregated_labels_test.go new file mode 100644 index 00000000..0e94a086 --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/empty_aggregated_labels_test.go @@ -0,0 +1,13 @@ +package logqlabels + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEmptyAggregatedLabels(t *testing.T) { + al := AggregatedLabelsFromSet(LabelSet{}, nil, nil) + el := EmptyAggregatedLabels() + require.Equal(t, el.Key(), al.Key()) +} diff --git a/internal/logql/logqlengine/logqlabels/label_set.go b/internal/logql/logqlengine/logqlabels/label_set.go new file mode 100644 index 00000000..4f79078f --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/label_set.go @@ -0,0 +1,184 @@ +package logqlabels + +import ( + "slices" + "strconv" + "strings" + + "github.com/go-faster/errors" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/exp/maps" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logstorage" + "github.com/go-faster/oteldb/internal/lokiapi" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// LabelSet is a log record's label set. +type LabelSet struct { + labels map[logql.Label]pcommon.Value +} + +// NewLabelSet creates new [LabelSet]. +func NewLabelSet() LabelSet { + return LabelSet{ + labels: map[logql.Label]pcommon.Value{}, + } +} + +// AllowDots whether if dots in labels are allowed. +func (l *LabelSet) AllowDots() bool { + return true +} + +// Reset resets internal state of [LabelSet]. +func (l *LabelSet) Reset() { + if l.labels == nil { + l.labels = map[logql.Label]pcommon.Value{} + } + maps.Clear(l.labels) +} + +// AsLokiAPI returns lokiapi.LabelSet +func (l *LabelSet) AsLokiAPI() lokiapi.LabelSet { + return lokiapi.LabelSet(l.AsMap()) +} + +// AsMap returns labels as strings map. +func (l *LabelSet) AsMap() map[string]string { + set := make(map[string]string, len(l.labels)) + for k, v := range l.labels { + set[string(k)] = v.AsString() + } + return set +} + +// String returns text representation of labels. +func (l *LabelSet) String() string { + var sb strings.Builder + sb.WriteByte('{') + + keys := maps.Keys(l.labels) + slices.Sort(keys) + + i := 0 + for _, k := range keys { + v := l.labels[k] + if i != 0 { + sb.WriteByte(',') + } + sb.WriteString(string(k)) + sb.WriteByte('=') + sb.WriteString(strconv.Quote(v.AsString())) + i++ + } + sb.WriteByte('}') + return sb.String() +} + +// SetFromRecord sets labels from given log record. +func (l *LabelSet) SetFromRecord(record logstorage.Record) { + l.Reset() + + if traceID := record.TraceID; !traceID.IsEmpty() { + l.Set(logstorage.LabelTraceID, pcommon.NewValueStr(traceID.Hex())) + } + if spanID := record.SpanID; !spanID.IsEmpty() { + l.Set(logstorage.LabelSpanID, pcommon.NewValueStr(spanID.Hex())) + } + if severity := record.SeverityNumber; severity != plog.SeverityNumberUnspecified { + l.Set(logstorage.LabelSeverity, pcommon.NewValueStr(severity.String())) + } + l.SetAttrs(record.Attrs, record.ScopeAttrs, record.ResourceAttrs) +} + +// Len returns set length +func (l *LabelSet) Len() int { + return len(l.labels) +} + +// SetAttrs sets labels from attrs. +func (l *LabelSet) SetAttrs(attrMaps ...otelstorage.Attrs) { + for _, attrs := range attrMaps { + m := attrs.AsMap() + if m == (pcommon.Map{}) { + continue + } + m.Range(func(k string, v pcommon.Value) bool { + k = otelstorage.KeyToLabel(k) + l.Set(logql.Label(k), v) + return true + }) + } +} + +// Set sets label. +func (l *LabelSet) Set(s logql.Label, val pcommon.Value) { + l.labels[s] = val +} + +// Delete deletes label. +func (l *LabelSet) Delete(s logql.Label) { + delete(l.labels, s) +} + +// Range iterates over label set. +func (l *LabelSet) Range(cb func(logql.Label, pcommon.Value)) { + for k, v := range l.labels { + cb(k, v) + } +} + +// Get returns attr value. +func (l *LabelSet) Get(name logql.Label) (v pcommon.Value, ok bool) { + v, ok = l.labels[name] + return v, ok +} + +// GetString returns stringified attr value. +func (l *LabelSet) GetString(name logql.Label) (string, bool) { + v, ok := l.Get(name) + if ok { + return v.AsString(), true + } + return "", false +} + +// GetFloat returns number attr value. +func (l *LabelSet) GetFloat(name logql.Label) (_ float64, ok bool, err error) { + v, ok := l.Get(name) + if !ok { + return 0, false, nil + } + switch t := v.Type(); t { + case pcommon.ValueTypeStr: + v, err := strconv.ParseFloat(v.Str(), 64) + return v, true, err + case pcommon.ValueTypeInt: + // TODO(tdakkota): check for overflow. + return float64(v.Int()), true, nil + case pcommon.ValueTypeDouble: + return v.Double(), true, nil + default: + return 0, false, errors.Errorf("can't convert %q to float", t) + } +} + +// SetError sets special error label. +func (l *LabelSet) SetError(typ string, err error) { + if _, ok := l.labels[logql.ErrorLabel]; ok { + // Do not override old error. + return + } + if err != nil { + l.labels[logql.ErrorLabel] = pcommon.NewValueStr(typ) + l.labels[logql.ErrorDetailsLabel] = pcommon.NewValueStr(err.Error()) + } +} + +// GetError returns error label. +func (l *LabelSet) GetError() (string, bool) { + return l.GetString(logql.ErrorLabel) +} diff --git a/internal/logql/logqlengine/logqlabels/logqlabels.go b/internal/logql/logqlengine/logqlabels/logqlabels.go new file mode 100644 index 00000000..5df2975e --- /dev/null +++ b/internal/logql/logqlengine/logqlabels/logqlabels.go @@ -0,0 +1,27 @@ +// Package logqlabels contains LogQL label utilities. +package logqlabels + +import ( + "regexp" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/lokiapi" +) + +// GroupingKey is a key to group metrics by label. +type GroupingKey = uint64 + +// AggregatedLabels is a set of labels. +type AggregatedLabels interface { + // By returns new set of labels containing only given list of labels. + By(...logql.Label) AggregatedLabels + // Without returns new set of labels without given list of labels. + Without(...logql.Label) AggregatedLabels + // Key computes grouping key from set of labels. + Key() GroupingKey + // Replace replaces labels using given regexp. + Replace(dstLabel, replacement, srcLabel string, re *regexp.Regexp) AggregatedLabels + + // AsLokiAPI returns API structure for label set. + AsLokiAPI() lokiapi.LabelSet +} From 8dda095c347807c9e0727db3de8ebe55a34554a5 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 11:54:48 +0300 Subject: [PATCH 2/8] fix(logqlabels): sort entries after adding a new one --- .../logqlengine/logqlabels/aggregated_labels.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/logql/logqlengine/logqlabels/aggregated_labels.go b/internal/logql/logqlengine/logqlabels/aggregated_labels.go index 9245d9aa..426d3072 100644 --- a/internal/logql/logqlengine/logqlabels/aggregated_labels.go +++ b/internal/logql/logqlengine/logqlabels/aggregated_labels.go @@ -24,6 +24,12 @@ type labelEntry struct { value string } +func sortEntries(entries []labelEntry) { + slices.SortFunc(entries, func(a, b labelEntry) int { + return cmp.Compare(a.name, b.name) + }) +} + // AggregatedLabelsFromSet creates new [AggregatedLabels] from [LabelSet]. func AggregatedLabelsFromSet(set LabelSet, by, without map[string]struct{}) AggregatedLabels { labels := make([]labelEntry, 0, set.Len()) @@ -33,9 +39,7 @@ func AggregatedLabelsFromSet(set LabelSet, by, without map[string]struct{}) Aggr value: v.AsString(), }) }) - slices.SortFunc(labels, func(a, b labelEntry) int { - return cmp.Compare(a.name, b.name) - }) + sortEntries(labels) return &aggregatedLabels{ entries: labels, @@ -158,6 +162,8 @@ func (a *aggregatedLabels) setEntry(key, value string) { } else { *entry = replacement } + // TODO(tdakkota): suboptimal, probably should use heap/tree instead. + sortEntries(a.entries) } // AsLokiAPI returns API structure for label set. From 8bfead6fcaa00e4c2beaefeaa527251ae704a0cf Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 11:55:19 +0300 Subject: [PATCH 3/8] refactor(logqlengine): use new `logqlabels` package --- .../logql/logqlengine/aggregated_labels.go | 199 ------------------ .../logqlengine/aggregated_labels_test.go | 135 ------------ internal/logql/logqlengine/decolorize.go | 3 +- internal/logql/logqlengine/decolorize_test.go | 3 +- internal/logql/logqlengine/distinct.go | 3 +- internal/logql/logqlengine/drop.go | 3 +- internal/logql/logqlengine/drop_test.go | 3 +- internal/logql/logqlengine/engine_plan.go | 3 +- internal/logql/logqlengine/engine_test.go | 3 +- internal/logql/logqlengine/json.go | 9 +- internal/logql/logqlengine/json_test.go | 9 +- internal/logql/logqlengine/keep.go | 3 +- internal/logql/logqlengine/keep_test.go | 3 +- internal/logql/logqlengine/label_filter.go | 15 +- .../logql/logqlengine/label_filter_test.go | 12 +- internal/logql/logqlengine/label_format.go | 5 +- .../logql/logqlengine/label_format_test.go | 3 +- internal/logql/logqlengine/label_set.go | 182 ---------------- internal/logql/logqlengine/line_filter.go | 5 +- .../logql/logqlengine/line_filter_test.go | 5 +- internal/logql/logqlengine/line_format.go | 3 +- .../logql/logqlengine/line_format_test.go | 3 +- internal/logql/logqlengine/logfmt.go | 7 +- internal/logql/logqlengine/logfmt_test.go | 3 +- .../logql/logqlengine/logqlengine_test.go | 16 ++ .../logql/logqlengine/logqlmetric/bin_op.go | 7 +- .../logqlengine/logqlmetric/bin_op_test.go | 58 ++--- .../logqlmetric/label_replace_test.go | 12 +- .../logql/logqlengine/logqlmetric/labels.go | 45 ---- .../logqlengine/logqlmetric/logqlmetric.go | 5 +- .../logqlmetric/logqlmetric_test.go | 11 + .../logql/logqlengine/logqlmetric/metric.go | 7 +- .../logqlengine/logqlmetric/query_test.go | 174 +++++---------- .../logqlengine/logqlmetric/range_agg.go | 13 +- .../logql/logqlengine/logqlmetric/vector.go | 3 +- .../logqlengine/logqlmetric/vector_agg.go | 13 +- .../logqlmetric/vector_agg_test.go | 44 ++-- internal/logql/logqlengine/pattern.go | 3 +- internal/logql/logqlengine/pattern_test.go | 3 +- internal/logql/logqlengine/processor.go | 7 +- internal/logql/logqlengine/regexp.go | 3 +- internal/logql/logqlengine/regexp_test.go | 3 +- internal/logql/logqlengine/sampler.go | 17 +- internal/logql/logqlengine/sampler_test.go | 4 +- internal/logql/logqlengine/unpack.go | 7 +- internal/logql/logqlengine/unpack_test.go | 3 +- 46 files changed, 252 insertions(+), 828 deletions(-) delete mode 100644 internal/logql/logqlengine/aggregated_labels.go delete mode 100644 internal/logql/logqlengine/aggregated_labels_test.go delete mode 100644 internal/logql/logqlengine/label_set.go create mode 100644 internal/logql/logqlengine/logqlengine_test.go delete mode 100644 internal/logql/logqlengine/logqlmetric/labels.go create mode 100644 internal/logql/logqlengine/logqlmetric/logqlmetric_test.go diff --git a/internal/logql/logqlengine/aggregated_labels.go b/internal/logql/logqlengine/aggregated_labels.go deleted file mode 100644 index 12fe213d..00000000 --- a/internal/logql/logqlengine/aggregated_labels.go +++ /dev/null @@ -1,199 +0,0 @@ -package logqlengine - -import ( - "cmp" - "maps" - "regexp" - "slices" - - "github.com/cespare/xxhash/v2" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" - "github.com/go-faster/oteldb/internal/lokiapi" -) - -type aggregatedLabels struct { - entries []labelEntry - without map[string]struct{} - by map[string]struct{} -} - -type labelEntry struct { - name string - value string -} - -// AggregatedLabelsFromSet creates new [logqlmetric.AggregatedLabels] from [LabelSet]. -func AggregatedLabelsFromSet(set LabelSet, by, without map[string]struct{}) logqlmetric.AggregatedLabels { - labels := make([]labelEntry, 0, set.Len()) - set.Range(func(l logql.Label, v pcommon.Value) { - labels = append(labels, labelEntry{ - name: string(l), - value: v.AsString(), - }) - }) - slices.SortFunc(labels, func(a, b labelEntry) int { - return cmp.Compare(a.name, b.name) - }) - - return &aggregatedLabels{ - entries: labels, - without: without, - by: by, - } -} - -// AggregatedLabelsFromMap creates new [logqlmetric.AggregatedLabels] from label map. -func AggregatedLabelsFromMap(m map[string]string) logqlmetric.AggregatedLabels { - labels := make([]labelEntry, 0, len(m)) - for key, value := range m { - labels = append(labels, labelEntry{name: key, value: value}) - } - slices.SortFunc(labels, func(a, b labelEntry) int { - return cmp.Compare(a.name, b.name) - }) - - return &aggregatedLabels{ - entries: labels, - without: nil, - by: nil, - } -} - -// By returns new set of labels containing only given list of labels. -func (a *aggregatedLabels) By(labels ...logql.Label) logqlmetric.AggregatedLabels { - if len(labels) == 0 { - return a - } - - sub := &aggregatedLabels{ - entries: a.entries, - without: a.without, - by: buildSet(maps.Clone(a.by), labels...), - } - return sub -} - -// Without returns new set of labels without given list of labels. -func (a *aggregatedLabels) Without(labels ...logql.Label) logqlmetric.AggregatedLabels { - if len(labels) == 0 { - return a - } - - sub := &aggregatedLabels{ - entries: a.entries, - without: buildSet(maps.Clone(a.without), labels...), - by: a.by, - } - return sub -} - -// Key computes grouping key from set of labels. -func (a *aggregatedLabels) Key() logqlmetric.GroupingKey { - h := xxhash.New() - a.forEach(func(k, v string) { - _, _ = h.WriteString(k) - _, _ = h.WriteString(v) - }) - return h.Sum64() -} - -// Replace replaces labels using given regexp. -func (a *aggregatedLabels) Replace(dstLabel, replacement, srcLabel string, re *regexp.Regexp) logqlmetric.AggregatedLabels { - src := a.findEntry(srcLabel) - - idxs := re.FindStringSubmatchIndex(src) - if idxs == nil { - return a - } - - dst := re.ExpandString(nil, replacement, src, idxs) - if len(dst) == 0 { - // Destination value is empty, delete it. - a.deleteEntry(dstLabel) - } else { - a.setEntry(dstLabel, string(dst)) - } - - return a -} - -func (a *aggregatedLabels) findEntry(key string) string { - for _, e := range a.entries { - if e.name == key { - return e.value - } - } - return "" -} - -func (a *aggregatedLabels) deleteEntry(key string) { - n := 0 - for _, e := range a.entries { - if e.name == key { - continue - } - a.entries[n] = e - n++ - } - a.entries = a.entries[:n] -} - -func (a *aggregatedLabels) setEntry(key, value string) { - var entry *labelEntry - for i, e := range a.entries { - if e.name == key { - entry = &a.entries[i] - break - } - } - - replacement := labelEntry{ - name: key, - value: value, - } - if entry == nil { - a.entries = append(a.entries, replacement) - } else { - *entry = replacement - } -} - -// AsLokiAPI returns API structure for label set. -func (a *aggregatedLabels) AsLokiAPI() (r lokiapi.LabelSet) { - r = lokiapi.LabelSet{} - a.forEach(func(k, v string) { - r[k] = v - }) - return r -} - -func (a *aggregatedLabels) forEach(cb func(k, v string)) { - for _, e := range a.entries { - if _, ok := a.without[e.name]; ok { - continue - } - if len(a.by) > 0 { - if _, ok := a.by[e.name]; !ok { - continue - } - } - cb(e.name, e.value) - } -} - -func buildSet[K ~string](r map[string]struct{}, input ...K) map[string]struct{} { - if len(input) == 0 { - return r - } - - if r == nil { - r = make(map[string]struct{}, len(input)) - } - for _, k := range input { - r[string(k)] = struct{}{} - } - return r -} diff --git a/internal/logql/logqlengine/aggregated_labels_test.go b/internal/logql/logqlengine/aggregated_labels_test.go deleted file mode 100644 index 86505f2d..00000000 --- a/internal/logql/logqlengine/aggregated_labels_test.go +++ /dev/null @@ -1,135 +0,0 @@ -package logqlengine - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" -) - -func TestAggregatedLabels(t *testing.T) { - defaultSet := map[string]string{ - "foo": "1", - "bar": "2", - "baz": "3", - } - tests := []struct { - set map[string]string - by []logql.Label - without []logql.Label - want []string - }{ - // By. - { - defaultSet, - []logql.Label{"foo"}, - nil, - []string{"foo"}, - }, - { - defaultSet, - []logql.Label{"foo", "bar"}, - nil, - []string{"foo", "bar"}, - }, - // Without. - { - defaultSet, - nil, - []logql.Label{"bar", "baz"}, - []string{"foo"}, - }, - // Both. - { - defaultSet, - []logql.Label{"foo", "bar"}, - []logql.Label{"bar"}, - []string{"foo"}, - }, - { - defaultSet, - []logql.Label{"foo", "bar", "baz"}, - []logql.Label{"bar"}, - []string{"foo", "baz"}, - }, - } - for i, tt := range tests { - tt := tt - t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() - for k, v := range tt.set { - set.Set(logql.Label(k), pcommon.NewValueStr(v)) - } - - buildWays := []struct { - name string - build func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels - }{ - { - "ByThenWithout", - func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - var labels logqlmetric.AggregatedLabels = AggregatedLabelsFromSet( - set, - nil, - nil, - ) - if len(tt.by) > 0 { - labels = labels.By(tt.by...) - } - if len(tt.without) > 0 { - labels = labels.Without(tt.without...) - } - return labels - }, - }, - { - "WithoutThenBy", - func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - var labels logqlmetric.AggregatedLabels = AggregatedLabelsFromSet( - set, - nil, - nil, - ) - if len(tt.without) > 0 { - labels = labels.Without(tt.without...) - } - if len(tt.by) > 0 { - labels = labels.By(tt.by...) - } - return labels - }, - }, - { - "Constructor", - func(set LabelSet, by, without []logql.Label) logqlmetric.AggregatedLabels { - return AggregatedLabelsFromSet( - set, - buildSet(nil, by...), - buildSet(nil, without...), - ) - }, - }, - } - for _, bw := range buildWays { - bw := bw - t.Run(bw.name, func(t *testing.T) { - labels := bw.build(set, tt.by, tt.without) - got := labels.AsLokiAPI() - for _, k := range tt.want { - require.Contains(t, got, k) - } - }) - } - }) - } -} - -func TestEmptyAggregatedLabels(t *testing.T) { - al := AggregatedLabelsFromSet(LabelSet{}, nil, nil) - el := logqlmetric.EmptyAggregatedLabels() - require.Equal(t, el.Key(), al.Key()) -} diff --git a/internal/logql/logqlengine/decolorize.go b/internal/logql/logqlengine/decolorize.go index c11b0fe8..a6c0e154 100644 --- a/internal/logql/logqlengine/decolorize.go +++ b/internal/logql/logqlengine/decolorize.go @@ -4,6 +4,7 @@ import ( "regexp" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -20,6 +21,6 @@ const ansiPattern = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\ var ansiRegex = regexp.MustCompile(ansiPattern) // Process implements Processor. -func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (string, bool) { +func (d *Decolorize) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (string, bool) { return ansiRegex.ReplaceAllString(line, ""), true } diff --git a/internal/logql/logqlengine/decolorize_test.go b/internal/logql/logqlengine/decolorize_test.go index 2321908e..1f9bbeed 100644 --- a/internal/logql/logqlengine/decolorize_test.go +++ b/internal/logql/logqlengine/decolorize_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestDecolorize(t *testing.T) { @@ -26,7 +27,7 @@ func TestDecolorize(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() + set := logqlabels.NewLabelSet() f, err := buildDecolorize(&logql.DecolorizeExpr{}) require.NoError(t, err) diff --git a/internal/logql/logqlengine/distinct.go b/internal/logql/logqlengine/distinct.go index 4aca4349..09b36a79 100644 --- a/internal/logql/logqlengine/distinct.go +++ b/internal/logql/logqlengine/distinct.go @@ -2,6 +2,7 @@ package logqlengine import ( "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -24,7 +25,7 @@ func buildDistinctFilter(stage *logql.DistinctFilter) (Processor, error) { } // Process implements Processor. -func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (d *DistinctFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { for _, label := range d.labels { val, ok := set.GetString(label) if !ok { diff --git a/internal/logql/logqlengine/drop.go b/internal/logql/logqlengine/drop.go index 806e7e45..5dfb7cb7 100644 --- a/internal/logql/logqlengine/drop.go +++ b/internal/logql/logqlengine/drop.go @@ -4,6 +4,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -36,7 +37,7 @@ func buildDropLabels(stage *logql.DropLabelsExpr) (Processor, error) { } // Process implements Processor. -func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (k *DropLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { set.Range(func(label logql.Label, val pcommon.Value) { if k.dropPair(label, val) { set.Delete(label) diff --git a/internal/logql/logqlengine/drop_test.go b/internal/logql/logqlengine/drop_test.go index fcee40d1..d6c51674 100644 --- a/internal/logql/logqlengine/drop_test.go +++ b/internal/logql/logqlengine/drop_test.go @@ -87,8 +87,7 @@ func TestDropLabels(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) newLine, ok := e.Process(0, ``, set) // Ensure that processor does not change the line. require.Equal(t, ``, newLine) diff --git a/internal/logql/logqlengine/engine_plan.go b/internal/logql/logqlengine/engine_plan.go index b6ee5f96..5be63fcf 100644 --- a/internal/logql/logqlengine/engine_plan.go +++ b/internal/logql/logqlengine/engine_plan.go @@ -5,6 +5,7 @@ import ( "time" "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/otelstorage" @@ -38,7 +39,7 @@ const ( type Entry struct { Timestamp otelstorage.Timestamp Line string - Set LabelSet + Set logqlabels.LabelSet } type ( diff --git a/internal/logql/logqlengine/engine_test.go b/internal/logql/logqlengine/engine_test.go index c23b827f..1f9bf94b 100644 --- a/internal/logql/logqlengine/engine_test.go +++ b/internal/logql/logqlengine/engine_test.go @@ -17,6 +17,7 @@ import ( "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logstorage" "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/otelstorage" @@ -161,7 +162,7 @@ func (n *mockPipelineNode) EvalPipeline(ctx context.Context, params EvalParams) } } - set := NewLabelSet() + set := logqlabels.NewLabelSet() set.SetAttrs( otelstorage.Attrs(attrs), otelstorage.Attrs(scopeAttrs), diff --git a/internal/logql/logqlengine/json.go b/internal/logql/logqlengine/json.go index ff70f957..096187bd 100644 --- a/internal/logql/logqlengine/json.go +++ b/internal/logql/logqlengine/json.go @@ -7,6 +7,7 @@ import ( "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine/jsonexpr" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -52,7 +53,7 @@ func buildJSONExtractor(stage *logql.JSONExpressionParser) (Processor, error) { } // Process implements Processor. -func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { var err error switch { case len(e.paths) != 0: @@ -68,7 +69,7 @@ func (e *JSONExtractor) Process(_ otelstorage.Timestamp, line string, set LabelS return line, true } -func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet) error { +func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set logqlabels.LabelSet) error { // TODO(tdakkota): allocates buffer for each line. d := decodeStr(line) return jsonexpr.Extract( @@ -80,7 +81,7 @@ func extractExprs(paths map[logql.Label]jsonexpr.Path, line string, set LabelSet ) } -func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) error { +func extractSome(labels map[logql.Label]struct{}, line string, set logqlabels.LabelSet) error { d := decodeStr(line) return d.ObjBytes(func(d *jx.Decoder, key []byte) error { if _, ok := labels[logql.Label(key)]; !ok { @@ -101,7 +102,7 @@ func extractSome(labels map[logql.Label]struct{}, line string, set LabelSet) err }) } -func extractAll(line string, set LabelSet) error { +func extractAll(line string, set logqlabels.LabelSet) error { d := decodeStr(line) return d.Obj(func(d *jx.Decoder, key string) error { value, ok, err := parseValue(d) diff --git a/internal/logql/logqlengine/json_test.go b/internal/logql/logqlengine/json_test.go index 43e021f3..f4edcb8c 100644 --- a/internal/logql/logqlengine/json_test.go +++ b/internal/logql/logqlengine/json_test.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestJSONExtractor(t *testing.T) { @@ -90,7 +91,7 @@ func TestJSONExtractor(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) @@ -104,7 +105,7 @@ func TestJSONExtractor(t *testing.T) { errMsg, ok := set.GetError() require.False(t, ok, "got error: %s", errMsg) - require.Len(t, set.labels, len(tt.expectLabels)) + require.Equal(t, len(tt.expectLabels), set.Len()) for k, expect := range tt.expectLabels { got, ok := set.Get(k) require.Truef(t, ok, "key %q", k) @@ -172,7 +173,7 @@ func BenchmarkJSONExtractor(b *testing.B) { p, err := buildJSONExtractor(bb.expr) require.NoError(b, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() var ( line string ok bool @@ -182,7 +183,7 @@ func BenchmarkJSONExtractor(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - set.reset() + set.Reset() line, ok = p.Process(10, benchdata, set) } diff --git a/internal/logql/logqlengine/keep.go b/internal/logql/logqlengine/keep.go index 7366868d..2b5ecfd0 100644 --- a/internal/logql/logqlengine/keep.go +++ b/internal/logql/logqlengine/keep.go @@ -4,6 +4,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -36,7 +37,7 @@ func buildKeepLabels(stage *logql.KeepLabelsExpr) (Processor, error) { } // Process implements Processor. -func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (k *KeepLabels) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { set.Range(func(label logql.Label, val pcommon.Value) { if !k.keepPair(label, val) { set.Delete(label) diff --git a/internal/logql/logqlengine/keep_test.go b/internal/logql/logqlengine/keep_test.go index 057a24b3..1ca90738 100644 --- a/internal/logql/logqlengine/keep_test.go +++ b/internal/logql/logqlengine/keep_test.go @@ -84,8 +84,7 @@ func TestKeepLabels(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) newLine, ok := e.Process(0, ``, set) // Ensure that processor does not change the line. require.Equal(t, ``, newLine) diff --git a/internal/logql/logqlengine/label_filter.go b/internal/logql/logqlengine/label_filter.go index ce0a634f..d87fd3e1 100644 --- a/internal/logql/logqlengine/label_filter.go +++ b/internal/logql/logqlengine/label_filter.go @@ -9,6 +9,7 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -67,7 +68,7 @@ type AndLabelMatcher struct { } // Process implements Processor. -func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (m *AndLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { line, keep = m.Left.Process(ts, line, set) if !keep { return line, keep @@ -82,7 +83,7 @@ type OrLabelMatcher struct { } // Process implements Processor. -func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (m *OrLabelMatcher) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { line, keep = m.Left.Process(ts, line, set) if keep { return line, keep @@ -109,7 +110,7 @@ func buildLabelMatcher(pred logql.LabelMatcher) (Processor, error) { } // Process implements Processor. -func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *LabelMatcher) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { labelValue, _ := set.GetString(lf.name) keep = lf.matcher.Match(labelValue) return line, keep @@ -160,7 +161,7 @@ func buildDurationLabelFilter(pred *logql.DurationFilter) (Processor, error) { } // Process implements Processor. -func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *DurationLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { v, ok := set.GetString(lf.name) if !ok { return "", false @@ -222,7 +223,7 @@ func buildBytesLabelFilter(pred *logql.BytesFilter) (Processor, error) { } // Process implements Processor. -func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *BytesLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { v, ok := set.GetString(lf.name) if !ok { return "", false @@ -284,7 +285,7 @@ func buildNumberLabelFilter(pred *logql.NumberFilter) (Processor, error) { } // Process implements Processor. -func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *NumberLabelFilter[C]) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { switch val, ok, err := set.GetFloat(lf.name); { case err != nil: // Keep the line, but set error label. @@ -314,7 +315,7 @@ func buildIPLabelFilter(pred *logql.IPFilter) (Processor, error) { } // Process implements Processor. -func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *IPLabelFilter) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { v, ok := set.GetString(lf.name) if !ok { return "", false diff --git a/internal/logql/logqlengine/label_filter_test.go b/internal/logql/logqlengine/label_filter_test.go index 5fe2efb2..606cc75b 100644 --- a/internal/logql/logqlengine/label_filter_test.go +++ b/internal/logql/logqlengine/label_filter_test.go @@ -109,8 +109,7 @@ func TestDurationLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) f, err := buildDurationLabelFilter(&logql.DurationFilter{ Label: logql.Label(tt.label), @@ -234,8 +233,7 @@ func TestBytesLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) f, err := buildBytesLabelFilter(&logql.BytesFilter{ Label: logql.Label(tt.label), @@ -379,8 +377,7 @@ func TestNumberLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) f, err := buildNumberLabelFilter(&logql.NumberFilter{ Label: logql.Label(tt.label), @@ -509,9 +506,8 @@ func TestIPLabelFilter(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() _, hasLabel := tt.input[tt.label] - set.labels = tt.input + set := newLabelSet(tt.input) for _, cse := range []struct { op logql.BinOp diff --git a/internal/logql/logqlengine/label_format.go b/internal/logql/logqlengine/label_format.go index c88e0bf1..86d284f7 100644 --- a/internal/logql/logqlengine/label_format.go +++ b/internal/logql/logqlengine/label_format.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -46,7 +47,7 @@ type RenameLabel struct { } // Process implements Processor. -func (rl *RenameLabel) Process(_ otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (rl *RenameLabel) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { for _, p := range rl.pairs { if v, ok := set.Get(p.From); ok { set.Set(p.To, v) @@ -82,7 +83,7 @@ func (lf *LabelFormat) currentTimestamp() time.Time { } // Process implements Processor. -func (lf *LabelFormat) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *LabelFormat) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { line, _ = lf.rename.Process(ts, line, set) lf.ts = ts diff --git a/internal/logql/logqlengine/label_format_test.go b/internal/logql/logqlengine/label_format_test.go index 87ef1ee4..8fa95a05 100644 --- a/internal/logql/logqlengine/label_format_test.go +++ b/internal/logql/logqlengine/label_format_test.go @@ -80,8 +80,7 @@ func TestLabelFormat(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() - set.labels = tt.input + set := newLabelSet(tt.input) newLine, ok := e.Process(1700000001_000000000, "original line", set) // Ensure that processor does not change the line. require.Equal(t, "original line", newLine) diff --git a/internal/logql/logqlengine/label_set.go b/internal/logql/logqlengine/label_set.go deleted file mode 100644 index 7262d29a..00000000 --- a/internal/logql/logqlengine/label_set.go +++ /dev/null @@ -1,182 +0,0 @@ -package logqlengine - -import ( - "slices" - "strconv" - "strings" - - "github.com/go-faster/errors" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "golang.org/x/exp/maps" - - "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/logstorage" - "github.com/go-faster/oteldb/internal/lokiapi" - "github.com/go-faster/oteldb/internal/otelstorage" -) - -// LabelSet is a log record's label set. -type LabelSet struct { - labels map[logql.Label]pcommon.Value -} - -// NewLabelSet creates new [LabelSet]. -func NewLabelSet() LabelSet { - return LabelSet{ - labels: map[logql.Label]pcommon.Value{}, - } -} - -func (l *LabelSet) allowDots() bool { - return true -} - -func (l *LabelSet) reset() { - if l.labels == nil { - l.labels = map[logql.Label]pcommon.Value{} - } - maps.Clear(l.labels) -} - -// AsLokiAPI returns lokiapi.LabelSet -func (l *LabelSet) AsLokiAPI() lokiapi.LabelSet { - return lokiapi.LabelSet(l.AsMap()) -} - -// AsMap returns labels as strings map. -func (l *LabelSet) AsMap() map[string]string { - set := make(map[string]string, len(l.labels)) - for k, v := range l.labels { - set[string(k)] = v.AsString() - } - return set -} - -// String returns text representation of labels. -func (l *LabelSet) String() string { - var sb strings.Builder - sb.WriteByte('{') - - keys := maps.Keys(l.labels) - slices.Sort(keys) - - i := 0 - for _, k := range keys { - v := l.labels[k] - if i != 0 { - sb.WriteByte(',') - } - sb.WriteString(string(k)) - sb.WriteByte('=') - sb.WriteString(strconv.Quote(v.AsString())) - i++ - } - sb.WriteByte('}') - return sb.String() -} - -// SetFromRecord sets labels from given log record. -func (l *LabelSet) SetFromRecord(record logstorage.Record) { - l.reset() - - if traceID := record.TraceID; !traceID.IsEmpty() { - l.Set(logstorage.LabelTraceID, pcommon.NewValueStr(traceID.Hex())) - } - if spanID := record.SpanID; !spanID.IsEmpty() { - l.Set(logstorage.LabelSpanID, pcommon.NewValueStr(spanID.Hex())) - } - if severity := record.SeverityNumber; severity != plog.SeverityNumberUnspecified { - l.Set(logstorage.LabelSeverity, pcommon.NewValueStr(severity.String())) - } - l.SetAttrs(record.Attrs, record.ScopeAttrs, record.ResourceAttrs) -} - -// Len returns set length -func (l *LabelSet) Len() int { - return len(l.labels) -} - -// SetAttrs sets labels from attrs. -func (l *LabelSet) SetAttrs(attrMaps ...otelstorage.Attrs) { - for _, attrs := range attrMaps { - m := attrs.AsMap() - if m == (pcommon.Map{}) { - continue - } - m.Range(func(k string, v pcommon.Value) bool { - k = otelstorage.KeyToLabel(k) - l.Set(logql.Label(k), v) - return true - }) - } -} - -// Set sets label. -func (l *LabelSet) Set(s logql.Label, val pcommon.Value) { - l.labels[s] = val -} - -// Delete deletes label. -func (l *LabelSet) Delete(s logql.Label) { - delete(l.labels, s) -} - -// Range iterates over label set. -func (l *LabelSet) Range(cb func(logql.Label, pcommon.Value)) { - for k, v := range l.labels { - cb(k, v) - } -} - -// Get returns attr value. -func (l *LabelSet) Get(name logql.Label) (v pcommon.Value, ok bool) { - v, ok = l.labels[name] - return v, ok -} - -// GetString returns stringified attr value. -func (l *LabelSet) GetString(name logql.Label) (string, bool) { - v, ok := l.Get(name) - if ok { - return v.AsString(), true - } - return "", false -} - -// GetFloat returns number attr value. -func (l *LabelSet) GetFloat(name logql.Label) (_ float64, ok bool, err error) { - v, ok := l.Get(name) - if !ok { - return 0, false, nil - } - switch t := v.Type(); t { - case pcommon.ValueTypeStr: - v, err := strconv.ParseFloat(v.Str(), 64) - return v, true, err - case pcommon.ValueTypeInt: - // TODO(tdakkota): check for overflow. - return float64(v.Int()), true, nil - case pcommon.ValueTypeDouble: - return v.Double(), true, nil - default: - return 0, false, errors.Errorf("can't convert %q to float", t) - } -} - -// SetError sets special error label. -func (l *LabelSet) SetError(typ string, err error) { - if _, ok := l.labels[logql.ErrorLabel]; ok { - // Do not override old error. - return - } - if err != nil { - l.labels[logql.ErrorLabel] = pcommon.NewValueStr(typ) - l.labels[logql.ErrorDetailsLabel] = pcommon.NewValueStr(err.Error()) - } -} - -// GetError returns error label. -func (l *LabelSet) GetError() (string, bool) { - return l.GetString(logql.ErrorLabel) -} diff --git a/internal/logql/logqlengine/line_filter.go b/internal/logql/logqlengine/line_filter.go index 7d1a8f71..6aa5fcf5 100644 --- a/internal/logql/logqlengine/line_filter.go +++ b/internal/logql/logqlengine/line_filter.go @@ -5,6 +5,7 @@ import ( "net/netip" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -41,7 +42,7 @@ type LineFilter struct { } // Process implements Processor. -func (lf *LineFilter) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (_ string, keep bool) { +func (lf *LineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) { keep = lf.matcher.Match(line) return line, keep } @@ -52,7 +53,7 @@ type IPLineFilter struct { } // Process implements Processor. -func (lf *IPLineFilter) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (_ string, keep bool) { +func (lf *IPLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) { for i := 0; i < len(line); { c := line[i] if !isHexDigit(c) && c != ':' { diff --git a/internal/logql/logqlengine/line_filter_test.go b/internal/logql/logqlengine/line_filter_test.go index 508ab504..8689b2a4 100644 --- a/internal/logql/logqlengine/line_filter_test.go +++ b/internal/logql/logqlengine/line_filter_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) var ipLineFilterTests = []struct { @@ -105,7 +106,7 @@ func TestIPLineFilter(t *testing.T) { for i, tt := range ipLineFilterTests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() + set := logqlabels.NewLabelSet() f, err := buildLineFilter(&logql.LineFilter{ Op: logql.OpEq, @@ -142,7 +143,7 @@ func FuzzIPLineFilter(f *testing.F) { } // Ensure there is no crash. - f.Process(1, line, LabelSet{}) + f.Process(1, line, logqlabels.LabelSet{}) }) } diff --git a/internal/logql/logqlengine/line_format.go b/internal/logql/logqlengine/line_format.go index e207df6a..4d5478ea 100644 --- a/internal/logql/logqlengine/line_format.go +++ b/internal/logql/logqlengine/line_format.go @@ -8,6 +8,7 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -49,7 +50,7 @@ func (lf *LineFormat) currentTimestamp() time.Time { } // Process implements Processor. -func (lf *LineFormat) Process(ts otelstorage.Timestamp, line string, set LabelSet) (_ string, keep bool) { +func (lf *LineFormat) Process(ts otelstorage.Timestamp, line string, set logqlabels.LabelSet) (_ string, keep bool) { lf.ts = ts lf.line = line lf.buf.Reset() diff --git a/internal/logql/logqlengine/line_format_test.go b/internal/logql/logqlengine/line_format_test.go index 86dff7b1..8234b4b5 100644 --- a/internal/logql/logqlengine/line_format_test.go +++ b/internal/logql/logqlengine/line_format_test.go @@ -56,8 +56,7 @@ func TestLineFormat(t *testing.T) { for i, tt := range tests { tt := tt t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { - set := NewLabelSet() - set.labels = tt.labels + set := newLabelSet(tt.labels) f, err := buildLineFormat(&logql.LineFormat{ Template: tt.tmpl, diff --git a/internal/logql/logqlengine/logfmt.go b/internal/logql/logqlengine/logfmt.go index 9ca57773..6191ecc9 100644 --- a/internal/logql/logqlengine/logfmt.go +++ b/internal/logql/logqlengine/logfmt.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -47,7 +48,7 @@ func buildLogfmtExtractor(stage *logql.LogfmtExpressionParser) (Processor, error } // Process implements Processor. -func (e *LogfmtExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (e *LogfmtExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { var err error if len(e.labels) == 0 { err = e.extractAll(line, set) @@ -60,7 +61,7 @@ func (e *LogfmtExtractor) Process(_ otelstorage.Timestamp, line string, set Labe return line, true } -func (e *LogfmtExtractor) extractSome(line string, set LabelSet) error { +func (e *LogfmtExtractor) extractSome(line string, set logqlabels.LabelSet) error { // TODO(tdakkota): re-use decoder somehow. d := logfmt.NewDecoder(strings.NewReader(line)) @@ -76,7 +77,7 @@ func (e *LogfmtExtractor) extractSome(line string, set LabelSet) error { return d.Err() } -func (e *LogfmtExtractor) extractAll(line string, set LabelSet) error { +func (e *LogfmtExtractor) extractAll(line string, set logqlabels.LabelSet) error { // TODO(tdakkota): re-use decoder somehow. d := logfmt.NewDecoder(strings.NewReader(line)) diff --git a/internal/logql/logqlengine/logfmt_test.go b/internal/logql/logqlengine/logfmt_test.go index b5880fc3..f392292c 100644 --- a/internal/logql/logqlengine/logfmt_test.go +++ b/internal/logql/logqlengine/logfmt_test.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestLogfmtExtractor(t *testing.T) { @@ -88,7 +89,7 @@ func TestLogfmtExtractor(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/logqlengine_test.go b/internal/logql/logqlengine/logqlengine_test.go new file mode 100644 index 00000000..c2940263 --- /dev/null +++ b/internal/logql/logqlengine/logqlengine_test.go @@ -0,0 +1,16 @@ +package logqlengine + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" +) + +func newLabelSet[S ~string](m map[S]pcommon.Value) logqlabels.LabelSet { + set := logqlabels.NewLabelSet() + for k, v := range m { + set.Set(logql.Label(k), v) + } + return set +} diff --git a/internal/logql/logqlengine/logqlmetric/bin_op.go b/internal/logql/logqlengine/logqlmetric/bin_op.go index c5f5c0ed..f1cb3a83 100644 --- a/internal/logql/logqlengine/logqlmetric/bin_op.go +++ b/internal/logql/logqlengine/logqlmetric/bin_op.go @@ -5,6 +5,7 @@ import ( "go.uber.org/multierr" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors" ) @@ -57,7 +58,7 @@ func (i *binOpIterator) Next(r *Step) bool { r.Samples = r.Samples[:0] r.Timestamp = left.Timestamp - leftSamples := make(map[GroupingKey]Sample, len(left.Samples)) + leftSamples := make(map[logqlabels.GroupingKey]Sample, len(left.Samples)) for _, s := range left.Samples { key := s.Set.Key() leftSamples[key] = s @@ -181,8 +182,8 @@ func buildMergeSamplesOp(op logql.BinOp, grouper grouperFunc, groupLabels []logq } } -func samplesSet(samples []Sample, grouper grouperFunc, groupLabels []logql.Label) map[GroupingKey]struct{} { - r := make(map[GroupingKey]struct{}, len(samples)) +func samplesSet(samples []Sample, grouper grouperFunc, groupLabels []logql.Label) map[logqlabels.GroupingKey]struct{} { + r := make(map[logqlabels.GroupingKey]struct{}, len(samples)) for _, s := range samples { key := grouper(s.Set, groupLabels...).Key() r[key] = struct{}{} diff --git a/internal/logql/logqlengine/logqlmetric/bin_op_test.go b/internal/logql/logqlengine/logqlmetric/bin_op_test.go index 48f1750a..9988f78c 100644 --- a/internal/logql/logqlengine/logqlmetric/bin_op_test.go +++ b/internal/logql/logqlengine/logqlmetric/bin_op_test.go @@ -15,9 +15,9 @@ func TestMergeBinOp(t *testing.T) { { Timestamp: 1, Samples: []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, } @@ -25,9 +25,9 @@ func TestMergeBinOp(t *testing.T) { { Timestamp: 1, Samples: []Sample{ - {Data: 10, Set: &testLabels{"sample": "1"}}, - {Data: 30, Set: &testLabels{"sample": "3"}}, - {Data: 40, Set: &testLabels{"sample": "4"}}, + {Data: 10, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 30, Set: mapLabels(map[string]string{"sample": "3"})}, + {Data: 40, Set: mapLabels(map[string]string{"sample": "4"})}, }, }, } @@ -44,25 +44,25 @@ func TestMergeBinOp(t *testing.T) { leftSteps, rightSteps, &logql.BinOpExpr{Op: logql.OpAnd}, []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, { leftSteps, rightSteps, &logql.BinOpExpr{Op: logql.OpOr}, []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, - {Data: 40, Set: &testLabels{"sample": "4"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, + {Data: 40, Set: mapLabels(map[string]string{"sample": "4"})}, }, }, { leftSteps, rightSteps, &logql.BinOpExpr{Op: logql.OpUnless}, []Sample{ - {Data: 2, Set: &testLabels{"sample": "2"}}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, }, }, @@ -71,25 +71,25 @@ func TestMergeBinOp(t *testing.T) { rightSteps, leftSteps, // notice the swap &logql.BinOpExpr{Op: logql.OpAnd}, []Sample{ - {Data: 10, Set: &testLabels{"sample": "1"}}, - {Data: 30, Set: &testLabels{"sample": "3"}}, + {Data: 10, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 30, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, { rightSteps, leftSteps, &logql.BinOpExpr{Op: logql.OpOr}, []Sample{ - {Data: 10, Set: &testLabels{"sample": "1"}}, - {Data: 30, Set: &testLabels{"sample": "3"}}, - {Data: 40, Set: &testLabels{"sample": "4"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, + {Data: 10, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 30, Set: mapLabels(map[string]string{"sample": "3"})}, + {Data: 40, Set: mapLabels(map[string]string{"sample": "4"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, }, }, { rightSteps, leftSteps, &logql.BinOpExpr{Op: logql.OpUnless}, []Sample{ - {Data: 40, Set: &testLabels{"sample": "4"}}, + {Data: 40, Set: mapLabels(map[string]string{"sample": "4"})}, }, }, @@ -103,27 +103,27 @@ func TestMergeBinOp(t *testing.T) { leftSteps, emptyStep, &logql.BinOpExpr{Op: logql.OpOr}, []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, { emptyStep, leftSteps, &logql.BinOpExpr{Op: logql.OpOr}, []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, { leftSteps, emptyStep, &logql.BinOpExpr{Op: logql.OpUnless}, []Sample{ - {Data: 1, Set: &testLabels{"sample": "1"}}, - {Data: 2, Set: &testLabels{"sample": "2"}}, - {Data: 3, Set: &testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, { diff --git a/internal/logql/logqlengine/logqlmetric/label_replace_test.go b/internal/logql/logqlengine/logqlmetric/label_replace_test.go index b51d96c1..a084de09 100644 --- a/internal/logql/logqlengine/logqlmetric/label_replace_test.go +++ b/internal/logql/logqlengine/logqlmetric/label_replace_test.go @@ -22,9 +22,9 @@ func TestLabelReplace(t *testing.T) { { Timestamp: 1, Samples: []Sample{ - {Data: 1, Set: testLabels{"sample": "1"}}, - {Data: 2, Set: testLabels{"sample": "2"}}, - {Data: 3, Set: testLabels{"sample": "3"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3"})}, }, }, }, @@ -35,9 +35,9 @@ func TestLabelReplace(t *testing.T) { Re: regexp.MustCompile(`(?P\d+)`), }, []Sample{ - {Data: 1, Set: testLabels{"sample": "1", "dst_label": "1 th"}}, - {Data: 2, Set: testLabels{"sample": "2", "dst_label": "2 th"}}, - {Data: 3, Set: testLabels{"sample": "3", "dst_label": "3 th"}}, + {Data: 1, Set: mapLabels(map[string]string{"sample": "1", "dst_label": "1 th"})}, + {Data: 2, Set: mapLabels(map[string]string{"sample": "2", "dst_label": "2 th"})}, + {Data: 3, Set: mapLabels(map[string]string{"sample": "3", "dst_label": "3 th"})}, }, }, } diff --git a/internal/logql/logqlengine/logqlmetric/labels.go b/internal/logql/logqlengine/logqlmetric/labels.go deleted file mode 100644 index 4669a971..00000000 --- a/internal/logql/logqlengine/logqlmetric/labels.go +++ /dev/null @@ -1,45 +0,0 @@ -package logqlmetric - -import ( - "regexp" - - "github.com/cespare/xxhash/v2" - - "github.com/go-faster/oteldb/internal/logql" - "github.com/go-faster/oteldb/internal/lokiapi" -) - -// GroupingKey is a key to group metrics by label. -type GroupingKey = uint64 - -// AggregatedLabels is a set of labels. -type AggregatedLabels interface { - // By returns new set of labels containing only given list of labels. - By(...logql.Label) AggregatedLabels - // Without returns new set of labels without given list of labels. - Without(...logql.Label) AggregatedLabels - // Key computes grouping key from set of labels. - Key() GroupingKey - // Replace replaces labels using given regexp. - Replace(dstLabel, replacement, srcLabel string, re *regexp.Regexp) AggregatedLabels - - // AsLokiAPI returns API structure for label set. - AsLokiAPI() lokiapi.LabelSet -} - -// EmptyAggregatedLabels returns empty set of aggregated labels. -func EmptyAggregatedLabels() AggregatedLabels { - return emptyAggregatedLabels -} - -var emptyAggregatedLabels = new(emptyLabels) - -type emptyLabels struct{} - -var zeroHash = xxhash.New().Sum64() - -func (l *emptyLabels) By(...logql.Label) AggregatedLabels { return l } -func (l *emptyLabels) Without(...logql.Label) AggregatedLabels { return l } -func (l *emptyLabels) Key() GroupingKey { return zeroHash } -func (l *emptyLabels) Replace(_, _, _ string, _ *regexp.Regexp) AggregatedLabels { return l } -func (l *emptyLabels) AsLokiAPI() lokiapi.LabelSet { return lokiapi.LabelSet{} } diff --git a/internal/logql/logqlengine/logqlmetric/logqlmetric.go b/internal/logql/logqlengine/logqlmetric/logqlmetric.go index 084939a6..d5f67efb 100644 --- a/internal/logql/logqlengine/logqlmetric/logqlmetric.go +++ b/internal/logql/logqlengine/logqlmetric/logqlmetric.go @@ -8,6 +8,7 @@ import ( "golang.org/x/exp/maps" "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -25,7 +26,7 @@ type StepIterator = iterators.Iterator[Step] func ReadStepResponse(iter iterators.Iterator[Step], instant bool) (s lokiapi.QueryResponseData, _ error) { var ( agg Step - matrixSeries map[GroupingKey]lokiapi.Series + matrixSeries map[logqlabels.GroupingKey]lokiapi.Series ) for { if !iter.Next(&agg) { @@ -55,7 +56,7 @@ func ReadStepResponse(iter iterators.Iterator[Step], instant bool) (s lokiapi.Qu } if matrixSeries == nil { - matrixSeries = map[GroupingKey]lokiapi.Series{} + matrixSeries = map[logqlabels.GroupingKey]lokiapi.Series{} } for _, s := range agg.Samples { key := s.Set.Key() diff --git a/internal/logql/logqlengine/logqlmetric/logqlmetric_test.go b/internal/logql/logqlengine/logqlmetric/logqlmetric_test.go new file mode 100644 index 00000000..e3744a29 --- /dev/null +++ b/internal/logql/logqlengine/logqlmetric/logqlmetric_test.go @@ -0,0 +1,11 @@ +package logqlmetric + +import "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" + +func emptyLabels() logqlabels.AggregatedLabels { + return logqlabels.EmptyAggregatedLabels() +} + +func mapLabels(m map[string]string) logqlabels.AggregatedLabels { + return logqlabels.AggregatedLabelsFromMap(m) +} diff --git a/internal/logql/logqlengine/logqlmetric/metric.go b/internal/logql/logqlengine/logqlmetric/metric.go index 7a4560a1..2c3163fd 100644 --- a/internal/logql/logqlengine/logqlmetric/metric.go +++ b/internal/logql/logqlengine/logqlmetric/metric.go @@ -3,6 +3,7 @@ package logqlmetric import ( "math" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -15,7 +16,7 @@ type FPoint struct { // Sample is a metric sample extracted from logs. type Sample struct { Data float64 - Set AggregatedLabels + Set logqlabels.AggregatedLabels } // Less compares two samples by value. @@ -31,12 +32,12 @@ func (a Sample) Greater(b Sample) bool { // Series is a grouped set of metric points. type Series struct { Data []FPoint - Set AggregatedLabels + Set logqlabels.AggregatedLabels } // SampledEntry is a sampled log entry. type SampledEntry struct { Sample float64 Timestamp otelstorage.Timestamp - Set AggregatedLabels + Set logqlabels.AggregatedLabels } diff --git a/internal/logql/logqlengine/logqlmetric/query_test.go b/internal/logql/logqlengine/logqlmetric/query_test.go index 796aa212..050e9e9d 100644 --- a/internal/logql/logqlengine/logqlmetric/query_test.go +++ b/internal/logql/logqlengine/logqlmetric/query_test.go @@ -3,16 +3,12 @@ package logqlmetric import ( "cmp" "fmt" - "hash/fnv" - "io" - "regexp" "slices" "testing" "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" - "golang.org/x/exp/maps" "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" @@ -67,16 +63,16 @@ func TestInstantAggregation(t *testing.T) { } testSamples = []SampledEntry{ // Would not be used. - {Sample: 10000, Timestamp: 1700000002_000000000, Set: &emptyLabels{}}, + {Sample: 10000, Timestamp: 1700000002_000000000, Set: emptyLabels()}, // Step 1. // 2s Window. - {Sample: 2, Timestamp: 1700000003_000000000, Set: &emptyLabels{}}, - {Sample: 3, Timestamp: 1700000004_000000000, Set: &emptyLabels{}}, + {Sample: 2, Timestamp: 1700000003_000000000, Set: emptyLabels()}, + {Sample: 3, Timestamp: 1700000004_000000000, Set: emptyLabels()}, // Window ends. // Would not be used. - {Sample: 10000, Timestamp: 1700000005_000000000, Set: &emptyLabels{}}, + {Sample: 10000, Timestamp: 1700000005_000000000, Set: emptyLabels()}, } ) @@ -191,17 +187,17 @@ func TestRangeAggregationStep(t *testing.T) { return pcommon.NewTimestampFromTime(time.Unix(1700000000+s, ns)) } testSamples = []SampledEntry{ - {Timestamp: ts(2, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(5, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(6, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(10, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(10, 1), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(11, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(35, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(35, 1), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(40, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(100, 0), Set: &emptyLabels{}, Sample: 1.}, - {Timestamp: ts(100, 1), Set: &emptyLabels{}, Sample: 1.}, + {Timestamp: ts(2, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(5, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(6, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(10, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(10, 1), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(11, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(35, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(35, 1), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(40, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(100, 0), Set: emptyLabels(), Sample: 1.}, + {Timestamp: ts(100, 1), Set: emptyLabels(), Sample: 1.}, } ) @@ -304,25 +300,25 @@ func TestRangeAggregation(t *testing.T) { // No samples. // Step 2. - {Sample: 1, Timestamp: 1700000001_000000000, Set: &emptyLabels{}}, + {Sample: 1, Timestamp: 1700000001_000000000, Set: emptyLabels()}, // 2s window. - {Sample: 2, Timestamp: 1700000002_000000000, Set: &emptyLabels{}}, - {Sample: 3, Timestamp: 1700000003_000000000, Set: &emptyLabels{}}, + {Sample: 2, Timestamp: 1700000002_000000000, Set: emptyLabels()}, + {Sample: 3, Timestamp: 1700000003_000000000, Set: emptyLabels()}, // Window ends. // Step 3. - {Sample: 4, Timestamp: 1700000004_000000000, Set: &emptyLabels{}}, + {Sample: 4, Timestamp: 1700000004_000000000, Set: emptyLabels()}, // 2s window. - {Sample: 5, Timestamp: 1700000005_000000000, Set: &emptyLabels{}}, - {Sample: 6, Timestamp: 1700000006_000000000, Set: &emptyLabels{}}, + {Sample: 5, Timestamp: 1700000005_000000000, Set: emptyLabels()}, + {Sample: 6, Timestamp: 1700000006_000000000, Set: emptyLabels()}, // Window ends. // Step 4. - {Sample: 1, Timestamp: 1700000007_000000000, Set: &emptyLabels{}}, + {Sample: 1, Timestamp: 1700000007_000000000, Set: emptyLabels()}, // 2s window. - {Sample: 2, Timestamp: 1700000008_000000000, Set: &emptyLabels{}}, - {Sample: 3, Timestamp: 1700000008_100000000, Set: &emptyLabels{}}, - {Sample: 4, Timestamp: 1700000009_000000000, Set: &emptyLabels{}}, + {Sample: 2, Timestamp: 1700000008_000000000, Set: emptyLabels()}, + {Sample: 3, Timestamp: 1700000008_100000000, Set: emptyLabels()}, + {Sample: 4, Timestamp: 1700000009_000000000, Set: emptyLabels()}, // Window ends. } ) @@ -363,78 +359,6 @@ func TestRangeAggregation(t *testing.T) { } } -type testLabels map[string]string - -// By returns new set of labels containing only given list of labels. -func (l testLabels) By(labels ...logql.Label) AggregatedLabels { - return l.sub(labels, false) -} - -// Without returns new set of labels without given list of labels. -func (l testLabels) Without(labels ...logql.Label) AggregatedLabels { - return l.sub(labels, true) -} - -func (l testLabels) sub(labels []logql.Label, without bool) AggregatedLabels { - oldLabels := l - - if without { - newLabels := maps.Clone(oldLabels) - - for _, label := range labels { - delete(newLabels, string(label)) - } - - return newLabels - } - - newLabels := testLabels{} - for _, label := range labels { - k := string(label) - if v, ok := oldLabels[k]; ok { - newLabels[k] = v - } - } - return newLabels -} - -// Key computes grouping key from set of labels. -func (l testLabels) Key() GroupingKey { - h := fnv.New64() - - keys := maps.Keys(l) - slices.Sort(keys) - for _, k := range keys { - v := l[k] - io.WriteString(h, k) - io.WriteString(h, v) - } - - return h.Sum64() -} - -// Replace replaces labels using given regexp. -func (l testLabels) Replace(dstLabel, replacement, srcLabel string, re *regexp.Regexp) AggregatedLabels { - src := l[srcLabel] - - idxs := re.FindStringSubmatchIndex(src) - if idxs == nil { - return l - } - - newLabels := maps.Clone(l) - delete(newLabels, dstLabel) - if dst := re.ExpandString([]byte{}, replacement, src, idxs); len(dst) > 0 { - newLabels[dstLabel] = string(dst) - } - return newLabels -} - -// AsLokiAPI returns API structure for label set. -func (l testLabels) AsLokiAPI() lokiapi.LabelSet { - return lokiapi.LabelSet(maps.Clone(l)) -} - func TestGroupedAggregation(t *testing.T) { var ( testParams = EvalParams{ @@ -445,33 +369,33 @@ func TestGroupedAggregation(t *testing.T) { testSamples = []SampledEntry{ // Step 1. // foo=a - {Sample: 1, Timestamp: 1700000001_000000000, Set: testLabels{"foo": "a", "method": "POST"}}, - {Sample: 2, Timestamp: 1700000002_000000000, Set: testLabels{"foo": "a", "method": "GET"}}, - {Sample: 3, Timestamp: 1700000003_000000000, Set: testLabels{"foo": "a", "method": "GET"}}, + {Sample: 1, Timestamp: 1700000001_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "POST"})}, + {Sample: 2, Timestamp: 1700000002_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "GET"})}, + {Sample: 3, Timestamp: 1700000003_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "GET"})}, // foo=b - {Sample: 10, Timestamp: 1700000001_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, - {Sample: 20, Timestamp: 1700000002_000000000, Set: testLabels{"foo": "b", "method": "POST"}}, - {Sample: 30, Timestamp: 1700000003_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, + {Sample: 10, Timestamp: 1700000001_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, + {Sample: 20, Timestamp: 1700000002_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "POST"})}, + {Sample: 30, Timestamp: 1700000003_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, // Step 2. // foo=a - {Sample: 5, Timestamp: 1700000005_000000000, Set: testLabels{"foo": "a", "method": "POST"}}, - {Sample: 6, Timestamp: 1700000006_000000000, Set: testLabels{"foo": "a", "method": "POST"}}, - {Sample: 7, Timestamp: 1700000007_000000000, Set: testLabels{"foo": "a", "method": "GET"}}, + {Sample: 5, Timestamp: 1700000005_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "POST"})}, + {Sample: 6, Timestamp: 1700000006_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "POST"})}, + {Sample: 7, Timestamp: 1700000007_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "GET"})}, // foo=b - {Sample: 50, Timestamp: 1700000005_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, - {Sample: 60, Timestamp: 1700000006_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, - {Sample: 70, Timestamp: 1700000007_000000000, Set: testLabels{"foo": "b", "method": "POST"}}, + {Sample: 50, Timestamp: 1700000005_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, + {Sample: 60, Timestamp: 1700000006_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, + {Sample: 70, Timestamp: 1700000007_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "POST"})}, // Step 3. // foo=a - {Sample: 10, Timestamp: 1700000009_000000000, Set: testLabels{"foo": "a", "method": "POST"}}, - {Sample: 20, Timestamp: 1700000010_000000000, Set: testLabels{"foo": "a", "method": "GET"}}, - {Sample: 30, Timestamp: 1700000011_000000000, Set: testLabels{"foo": "a", "method": "GET"}}, + {Sample: 10, Timestamp: 1700000009_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "POST"})}, + {Sample: 20, Timestamp: 1700000010_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "GET"})}, + {Sample: 30, Timestamp: 1700000011_000000000, Set: mapLabels(map[string]string{"foo": "a", "method": "GET"})}, // foo=b - {Sample: 100, Timestamp: 1700000009_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, - {Sample: 200, Timestamp: 1700000010_000000000, Set: testLabels{"foo": "b", "method": "POST"}}, - {Sample: 300, Timestamp: 1700000011_000000000, Set: testLabels{"foo": "b", "method": "GET"}}, + {Sample: 100, Timestamp: 1700000009_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, + {Sample: 200, Timestamp: 1700000010_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "POST"})}, + {Sample: 300, Timestamp: 1700000011_000000000, Set: mapLabels(map[string]string{"foo": "b", "method": "GET"})}, } ) @@ -664,12 +588,12 @@ func TestKHeapAggregation(t *testing.T) { Step: 6 * time.Second, } testSamples = []SampledEntry{ - {Sample: 4, Timestamp: 1700000001_000000000, Set: testLabels{"key": "a", "sample": "1"}}, - {Sample: 5, Timestamp: 1700000002_000000000, Set: testLabels{"key": "a", "sample": "2"}}, - {Sample: 6, Timestamp: 1700000003_000000000, Set: testLabels{"key": "a", "sample": "3"}}, - {Sample: 3, Timestamp: 1700000004_000000000, Set: testLabels{"key": "a", "sample": "4"}}, - {Sample: 2, Timestamp: 1700000005_000000000, Set: testLabels{"key": "a", "sample": "5"}}, - {Sample: 1, Timestamp: 1700000006_000000000, Set: testLabels{"key": "a", "sample": "6"}}, + {Sample: 4, Timestamp: 1700000001_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "1"})}, + {Sample: 5, Timestamp: 1700000002_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "2"})}, + {Sample: 6, Timestamp: 1700000003_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "3"})}, + {Sample: 3, Timestamp: 1700000004_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "4"})}, + {Sample: 2, Timestamp: 1700000005_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "5"})}, + {Sample: 1, Timestamp: 1700000006_000000000, Set: mapLabels(map[string]string{"key": "a", "sample": "6"})}, } ) diff --git a/internal/logql/logqlengine/logqlmetric/range_agg.go b/internal/logql/logqlengine/logqlmetric/range_agg.go index 682cccdb..a9298be2 100644 --- a/internal/logql/logqlengine/logqlmetric/range_agg.go +++ b/internal/logql/logqlengine/logqlmetric/range_agg.go @@ -7,12 +7,13 @@ import ( "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) -type grouperFunc = func(AggregatedLabels, ...logql.Label) AggregatedLabels +type grouperFunc = func(logqlabels.AggregatedLabels, ...logql.Label) logqlabels.AggregatedLabels -var nopGrouper = func(al AggregatedLabels, _ ...logql.Label) AggregatedLabels { +var nopGrouper = func(al logqlabels.AggregatedLabels, _ ...logql.Label) logqlabels.AggregatedLabels { return al } @@ -26,7 +27,7 @@ type rangeAggIterator struct { grouper grouperFunc groupLabels []logql.Label // window state - window map[GroupingKey]Series + window map[logqlabels.GroupingKey]Series interval time.Duration entry SampledEntry // buffered whether last entry is buffered @@ -56,9 +57,9 @@ func RangeAggregation( if g := expr.Grouping; g != nil { groupLabels = g.Labels if g.Without { - grouper = AggregatedLabels.Without + grouper = logqlabels.AggregatedLabels.Without } else { - grouper = AggregatedLabels.By + grouper = logqlabels.AggregatedLabels.By } } @@ -71,7 +72,7 @@ func RangeAggregation( grouper: grouper, groupLabels: groupLabels, - window: map[GroupingKey]Series{}, + window: map[logqlabels.GroupingKey]Series{}, interval: expr.Range.Range, }, nil } diff --git a/internal/logql/logqlengine/logqlmetric/vector.go b/internal/logql/logqlengine/logqlmetric/vector.go index 46f9c094..2e4626da 100644 --- a/internal/logql/logqlengine/logqlmetric/vector.go +++ b/internal/logql/logqlengine/logqlmetric/vector.go @@ -4,6 +4,7 @@ import ( "time" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -34,7 +35,7 @@ func (i *vectorIterator) Next(r *Step) bool { r.Timestamp = otelstorage.NewTimestampFromTime(current) r.Samples = append(r.Samples[:0], Sample{ Data: i.value, - Set: &emptyLabels{}, + Set: logqlabels.EmptyAggregatedLabels(), }) return true } diff --git a/internal/logql/logqlengine/logqlmetric/vector_agg.go b/internal/logql/logqlengine/logqlmetric/vector_agg.go index 16e917ea..d4b09879 100644 --- a/internal/logql/logqlengine/logqlmetric/vector_agg.go +++ b/internal/logql/logqlengine/logqlmetric/vector_agg.go @@ -8,6 +8,7 @@ import ( "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) // VectorAggregation creates new Vector aggregation step iterator. @@ -22,9 +23,9 @@ func VectorAggregation( if g := expr.Grouping; g != nil { groupLabels = g.Labels if g.Without { - grouper = AggregatedLabels.Without + grouper = logqlabels.AggregatedLabels.Without } else { - grouper = AggregatedLabels.By + grouper = logqlabels.AggregatedLabels.By } } @@ -80,10 +81,10 @@ func (i *vectorAggIterator) Next(r *Step) bool { } type group struct { - metric AggregatedLabels + metric logqlabels.AggregatedLabels agg Aggregator } - result := map[GroupingKey]*group{} + result := map[logqlabels.GroupingKey]*group{} for _, s := range step.Samples { metric := i.grouper(s.Set, i.groupLabels...) @@ -142,10 +143,10 @@ func (i *vectorAggHeapIterator) Next(r *Step) bool { } type group struct { - metric AggregatedLabels + metric logqlabels.AggregatedLabels heap *sampleHeap } - result := map[GroupingKey]*group{} + result := map[logqlabels.GroupingKey]*group{} for _, s := range step.Samples { metric := i.grouper(s.Set, i.groupLabels...) diff --git a/internal/logql/logqlengine/logqlmetric/vector_agg_test.go b/internal/logql/logqlengine/logqlmetric/vector_agg_test.go index 0dba1a35..3220e68d 100644 --- a/internal/logql/logqlengine/logqlmetric/vector_agg_test.go +++ b/internal/logql/logqlengine/logqlmetric/vector_agg_test.go @@ -19,10 +19,10 @@ func TestSortVectorAggregation(t *testing.T) { { Timestamp: 1, Samples: []Sample{ - {Data: 3, Set: &emptyLabels{}}, - {Data: 1, Set: &emptyLabels{}}, - {Data: 4, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, + {Data: 3, Set: emptyLabels()}, + {Data: 1, Set: emptyLabels()}, + {Data: 4, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, }, }, } @@ -34,49 +34,49 @@ func TestSortVectorAggregation(t *testing.T) { { &logql.VectorAggregationExpr{Op: logql.VectorOpBottomk, Parameter: ptrTo(2)}, []Sample{ - {Data: 1, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, + {Data: 1, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, }, }, { &logql.VectorAggregationExpr{Op: logql.VectorOpBottomk, Parameter: ptrTo(3)}, []Sample{ - {Data: 1, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, - {Data: 3, Set: &emptyLabels{}}, + {Data: 1, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, + {Data: 3, Set: emptyLabels()}, }, }, { &logql.VectorAggregationExpr{Op: logql.VectorOpTopk, Parameter: ptrTo(2)}, []Sample{ - {Data: 4, Set: &emptyLabels{}}, - {Data: 3, Set: &emptyLabels{}}, + {Data: 4, Set: emptyLabels()}, + {Data: 3, Set: emptyLabels()}, }, }, { &logql.VectorAggregationExpr{Op: logql.VectorOpTopk, Parameter: ptrTo(3)}, []Sample{ - {Data: 4, Set: &emptyLabels{}}, - {Data: 3, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, + {Data: 4, Set: emptyLabels()}, + {Data: 3, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, }, }, { &logql.VectorAggregationExpr{Op: logql.VectorOpSort}, []Sample{ - {Data: 1, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, - {Data: 3, Set: &emptyLabels{}}, - {Data: 4, Set: &emptyLabels{}}, + {Data: 1, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, + {Data: 3, Set: emptyLabels()}, + {Data: 4, Set: emptyLabels()}, }, }, { &logql.VectorAggregationExpr{Op: logql.VectorOpSortDesc}, []Sample{ - {Data: 4, Set: &emptyLabels{}}, - {Data: 3, Set: &emptyLabels{}}, - {Data: 2, Set: &emptyLabels{}}, - {Data: 1, Set: &emptyLabels{}}, + {Data: 4, Set: emptyLabels()}, + {Data: 3, Set: emptyLabels()}, + {Data: 2, Set: emptyLabels()}, + {Data: 1, Set: emptyLabels()}, }, }, diff --git a/internal/logql/logqlengine/pattern.go b/internal/logql/logqlengine/pattern.go index 8aafbfb0..afeddab4 100644 --- a/internal/logql/logqlengine/pattern.go +++ b/internal/logql/logqlengine/pattern.go @@ -5,6 +5,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlpattern" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -26,7 +27,7 @@ func buildPatternExtractor(stage *logql.PatternLabelParser) (Processor, error) { } // Process implements Processor. -func (e *PatternExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (e *PatternExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { logqlpattern.Match(e.pattern, line, func(l logql.Label, s string) { set.Set(l, pcommon.NewValueStr(s)) }) diff --git a/internal/logql/logqlengine/pattern_test.go b/internal/logql/logqlengine/pattern_test.go index 7235ff94..c07ad15e 100644 --- a/internal/logql/logqlengine/pattern_test.go +++ b/internal/logql/logqlengine/pattern_test.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestPatternExtractor(t *testing.T) { @@ -40,7 +41,7 @@ func TestPatternExtractor(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/processor.go b/internal/logql/logqlengine/processor.go index d78d8cab..33ab9128 100644 --- a/internal/logql/logqlengine/processor.go +++ b/internal/logql/logqlengine/processor.go @@ -6,13 +6,14 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors" "github.com/go-faster/oteldb/internal/otelstorage" ) // Processor is a log record processor. type Processor interface { - Process(ts otelstorage.Timestamp, line string, labels LabelSet) (newLine string, keep bool) + Process(ts otelstorage.Timestamp, line string, labels logqlabels.LabelSet) (newLine string, keep bool) } // NopProcessor is a processor that does nothing. @@ -21,7 +22,7 @@ var NopProcessor = &nopProcessor{} type nopProcessor struct{} // Process implements Processor. -func (*nopProcessor) Process(_ otelstorage.Timestamp, line string, _ LabelSet) (string, bool) { +func (*nopProcessor) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (string, bool) { return line, true } @@ -84,7 +85,7 @@ func buildStage(stage logql.PipelineStage) (Processor, error) { } // Process implements Processor. -func (p *Pipeline) Process(ts otelstorage.Timestamp, line string, attrs LabelSet) (_ string, keep bool) { +func (p *Pipeline) Process(ts otelstorage.Timestamp, line string, attrs logqlabels.LabelSet) (_ string, keep bool) { for _, s := range p.Stages { line, keep = s.Process(ts, line, attrs) if !keep { diff --git a/internal/logql/logqlengine/regexp.go b/internal/logql/logqlengine/regexp.go index 70fe9ab3..0959d183 100644 --- a/internal/logql/logqlengine/regexp.go +++ b/internal/logql/logqlengine/regexp.go @@ -6,6 +6,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -23,7 +24,7 @@ func buildRegexpExtractor(stage *logql.RegexpLabelParser) (Processor, error) { } // Process implements Processor. -func (e *RegexpExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (e *RegexpExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { for i, match := range e.re.FindStringSubmatch(line) { label, ok := e.mapping[i] if !ok { diff --git a/internal/logql/logqlengine/regexp_test.go b/internal/logql/logqlengine/regexp_test.go index 37925fa8..af519128 100644 --- a/internal/logql/logqlengine/regexp_test.go +++ b/internal/logql/logqlengine/regexp_test.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestRegexpExtractor(t *testing.T) { @@ -52,7 +53,7 @@ func TestRegexpExtractor(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, tt.input, newLine) diff --git a/internal/logql/logqlengine/sampler.go b/internal/logql/logqlengine/sampler.go index 5c623328..649784b5 100644 --- a/internal/logql/logqlengine/sampler.go +++ b/internal/logql/logqlengine/sampler.go @@ -9,6 +9,7 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" ) @@ -84,6 +85,20 @@ func newSampleIterator(iter EntryIterator, expr *logql.RangeAggregationExpr) (*s }, nil } +func buildSet[K ~string](r map[string]struct{}, input ...K) map[string]struct{} { + if len(input) == 0 { + return r + } + + if r == nil { + r = make(map[string]struct{}, len(input)) + } + for _, k := range input { + r[string(k)] = struct{}{} + } + return r +} + func (i *sampleIterator) Next(s *logqlmetric.SampledEntry) bool { var e Entry for { @@ -98,7 +113,7 @@ func (i *sampleIterator) Next(s *logqlmetric.SampledEntry) bool { s.Timestamp = e.Timestamp s.Sample = v - s.Set = AggregatedLabelsFromSet(e.Set, i.by, i.without) + s.Set = logqlabels.AggregatedLabelsFromSet(e.Set, i.by, i.without) return true } } diff --git a/internal/logql/logqlengine/sampler_test.go b/internal/logql/logqlengine/sampler_test.go index 6fe03e53..64154d15 100644 --- a/internal/logql/logqlengine/sampler_test.go +++ b/internal/logql/logqlengine/sampler_test.go @@ -254,9 +254,7 @@ func TestSampleExtractor(t *testing.T) { }) require.NoError(t, err) - set := NewLabelSet() - set.labels = tt.input.labels - + set := newLabelSet(tt.input.labels) got, gotOk := e.Extract(Entry{ Timestamp: 1, Line: tt.input.line, diff --git a/internal/logql/logqlengine/unpack.go b/internal/logql/logqlengine/unpack.go index 61d567e8..f10add24 100644 --- a/internal/logql/logqlengine/unpack.go +++ b/internal/logql/logqlengine/unpack.go @@ -6,6 +6,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -17,7 +18,7 @@ func buildUnpackExtractor(*logql.UnpackLabelParser) (Processor, error) { } // Process implements Processor. -func (e *UnpackExtractor) Process(_ otelstorage.Timestamp, line string, set LabelSet) (string, bool) { +func (e *UnpackExtractor) Process(_ otelstorage.Timestamp, line string, set logqlabels.LabelSet) (string, bool) { newLine, err := parsePackEntry(line, set) if err != nil { set.SetError("unpack JSON parsing error", err) @@ -26,7 +27,7 @@ func (e *UnpackExtractor) Process(_ otelstorage.Timestamp, line string, set Labe return newLine, true } -func parsePackEntry(oldLine string, set LabelSet) (string, error) { +func parsePackEntry(oldLine string, set logqlabels.LabelSet) (string, error) { var ( d = jx.DecodeStr(oldLine) line = oldLine @@ -47,7 +48,7 @@ func parsePackEntry(oldLine string, set LabelSet) (string, error) { return nil } - if err := logql.IsValidLabel(key, set.allowDots()); err != nil { + if err := logql.IsValidLabel(key, set.AllowDots()); err != nil { return errors.Wrapf(err, "invalid label %q", key) } set.Set(logql.Label(key), pcommon.NewValueStr(parsed)) diff --git a/internal/logql/logqlengine/unpack_test.go b/internal/logql/logqlengine/unpack_test.go index 8c1666ac..a741a008 100644 --- a/internal/logql/logqlengine/unpack_test.go +++ b/internal/logql/logqlengine/unpack_test.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" ) func TestUnpackExtractor(t *testing.T) { @@ -58,7 +59,7 @@ func TestUnpackExtractor(t *testing.T) { e, err := buildUnpackExtractor(&logql.UnpackLabelParser{}) require.NoError(t, err) - set := NewLabelSet() + set := logqlabels.NewLabelSet() newLine, ok := e.Process(0, tt.input, set) // Ensure that extractor does not change the line. require.Equal(t, newLine, tt.expectLine) From f4f52af3bf4e682098b7b5073f2173a37700b23a Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 11:55:24 +0300 Subject: [PATCH 4/8] refactor(chstorage): use new `logqlabels` package --- internal/chstorage/querier_logs_node.go | 3 ++- internal/chstorage/querier_logs_query.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/chstorage/querier_logs_node.go b/internal/chstorage/querier_logs_node.go index 87380786..13ac4b00 100644 --- a/internal/chstorage/querier_logs_node.go +++ b/internal/chstorage/querier_logs_node.go @@ -5,6 +5,7 @@ import ( "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logstorage" ) @@ -53,7 +54,7 @@ func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalPar } func entryMapper(r logstorage.Record) (logqlengine.Entry, error) { - set := logqlengine.NewLabelSet() + set := logqlabels.NewLabelSet() e := logqlengine.Entry{ Timestamp: r.Timestamp, Line: r.Body, diff --git a/internal/chstorage/querier_logs_query.go b/internal/chstorage/querier_logs_query.go index c9285983..4ffaa81d 100644 --- a/internal/chstorage/querier_logs_query.go +++ b/internal/chstorage/querier_logs_query.go @@ -21,6 +21,7 @@ import ( "github.com/go-faster/oteldb/internal/iterators" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlmetric" "github.com/go-faster/oteldb/internal/logstorage" "github.com/go-faster/oteldb/internal/otelstorage" @@ -253,7 +254,7 @@ func (v *SampleQuery) Eval(ctx context.Context, q *Querier) (_ logqlengine.Sampl result = append(result, logqlmetric.SampledEntry{ Timestamp: pcommon.NewTimestampFromTime(timestamp), Sample: sample, - Set: logqlengine.AggregatedLabelsFromMap(labels), + Set: logqlabels.AggregatedLabelsFromMap(labels), }) } return nil From b781f381f03944517db798c8205c616076285d75 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 12:24:25 +0300 Subject: [PATCH 5/8] feat(logqlmetric): implement `absent_over_time` --- .../logqlengine/logqlmetric/aggregator.go | 9 ++++ .../logqlengine/logqlmetric/range_agg.go | 42 +++++++++++++++---- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/internal/logql/logqlengine/logqlmetric/aggregator.go b/internal/logql/logqlengine/logqlmetric/aggregator.go index b27c590b..e95c297a 100644 --- a/internal/logql/logqlengine/logqlmetric/aggregator.go +++ b/internal/logql/logqlengine/logqlmetric/aggregator.go @@ -55,6 +55,7 @@ func buildBatchAggregator(expr *logql.RangeAggregationExpr) (BatchAggregator, er case logql.RangeOpLast: return &LastOverTime{}, nil case logql.RangeOpAbsent: + return &absentOverTime{}, nil default: return nil, errors.Errorf("unexpected range operation %q", expr.Op) } @@ -132,3 +133,11 @@ func (LastOverTime) Aggregate(points []FPoint) (last float64) { } return points[len(points)-1].Value } + +// absentOverTime implements `absent_over_time` aggregation. +type absentOverTime struct{} + +// Aggregate implements BatchAggregator. +func (absentOverTime) Aggregate([]FPoint) float64 { + return 1. +} diff --git a/internal/logql/logqlengine/logqlmetric/range_agg.go b/internal/logql/logqlengine/logqlmetric/range_agg.go index a9298be2..236e3cea 100644 --- a/internal/logql/logqlengine/logqlmetric/range_agg.go +++ b/internal/logql/logqlengine/logqlmetric/range_agg.go @@ -20,7 +20,8 @@ var nopGrouper = func(al logqlabels.AggregatedLabels, _ ...logql.Label) logqlabe type rangeAggIterator struct { iter iterators.Iterator[SampledEntry] - agg BatchAggregator + agg BatchAggregator + absentLabels logqlabels.AggregatedLabels // step state stepper stepper @@ -49,6 +50,21 @@ func RangeAggregation( if err != nil { return nil, errors.Wrap(err, "build aggregator") } + var absentLabels logqlabels.AggregatedLabels + if expr.Op == logql.RangeOpAbsent { + matchers := expr.Range.Sel.Matchers + if len(matchers) == 0 { + absentLabels = logqlabels.EmptyAggregatedLabels() + } else { + labels := map[string]string{} + for _, m := range matchers { + if _, ok := labels[string(m.Label)]; !ok && m.Op == logql.OpEq { + labels[string(m.Label)] = m.Value + } + } + absentLabels = logqlabels.AggregatedLabelsFromMap(labels) + } + } var ( grouper = nopGrouper @@ -66,8 +82,9 @@ func RangeAggregation( return &rangeAggIterator{ iter: iter, - agg: agg, - stepper: newStepper(start, end, step), + agg: agg, + absentLabels: absentLabels, + stepper: newStepper(start, end, step), grouper: grouper, groupLabels: groupLabels, @@ -91,11 +108,20 @@ func (i *rangeAggIterator) Next(r *Step) bool { // Aggregate the window. r.Timestamp = otelstorage.NewTimestampFromTime(current) r.Samples = r.Samples[:0] - for _, s := range i.window { - r.Samples = append(r.Samples, Sample{ - Data: i.agg.Aggregate(s.Data), - Set: s.Set, - }) + if set := i.absentLabels; set != nil { + if len(i.window) == 0 { + r.Samples = append(r.Samples, Sample{ + Data: i.agg.Aggregate(nil), + Set: set, + }) + } + } else { + for _, s := range i.window { + r.Samples = append(r.Samples, Sample{ + Data: i.agg.Aggregate(s.Data), + Set: s.Set, + }) + } } return true From 49eae85fa54002f81a291e5432d3e50e0bb9e651 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 12:25:09 +0300 Subject: [PATCH 6/8] fix(lokicompliance): exclude `absent_over_time` from simple range aggregations --- internal/lokicompliance/expand.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/lokicompliance/expand.go b/internal/lokicompliance/expand.go index 9076e6f3..0054618e 100644 --- a/internal/lokicompliance/expand.go +++ b/internal/lokicompliance/expand.go @@ -19,7 +19,6 @@ var testVariantArgs = map[string][]string{ "simpleRangeAggOp": { "count_over_time", "rate", - "absent_over_time", "bytes_over_time", "bytes_rate", }, From fd40d8d863c13d3714291f4da4b6cfc919e7ede4 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 12:26:17 +0300 Subject: [PATCH 7/8] feat(lokicompliance): add unexpected non-empty result to report --- internal/lokicompliance/compare.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/internal/lokicompliance/compare.go b/internal/lokicompliance/compare.go index e52f0ba7..502f7b32 100644 --- a/internal/lokicompliance/compare.go +++ b/internal/lokicompliance/compare.go @@ -126,16 +126,6 @@ func (c *Comparer) Compare(ctx context.Context, tc *TestCase) (*Result, error) { return &Result{TestCase: tc}, nil } - if err := checkEmpty(refResult.Data, tc.ShouldBeEmpty); err != nil { - return nil, err - } - if err := checkEmpty(testResult.Data, tc.ShouldBeEmpty); err != nil { - return &Result{ - TestCase: tc, - UnexpectedFailure: err.Error(), - }, nil - } - // Sort responses before comparing. sortResponse(&refResult.Data) sortResponse(&testResult.Data) @@ -144,12 +134,23 @@ func (c *Comparer) Compare(ctx context.Context, tc *TestCase) (*Result, error) { if err != nil { return nil, err } - got, err := json.Marshal(testResult.Data) if err != nil { return nil, err } + if err := checkEmpty(refResult.Data, tc.ShouldBeEmpty); err != nil { + return nil, err + } + if err := checkEmpty(testResult.Data, tc.ShouldBeEmpty); err != nil { + return &Result{ + TestCase: tc, + UnexpectedFailure: err.Error(), + Expected: expected, + Got: got, + }, nil + } + return &Result{ TestCase: tc, Diff: cmp.Diff(refResult, testResult, c.compareOptions), From e970d710a7042cfe278940a384b5f388cbc497c0 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 26 Apr 2024 12:26:42 +0300 Subject: [PATCH 8/8] ci(ch-logql-compliance): add `absent_over_time` tests --- .../ch-logql-compliance/logql-test-queries.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dev/local/ch-logql-compliance/logql-test-queries.yml b/dev/local/ch-logql-compliance/logql-test-queries.yml index 8ca92fc5..85417083 100644 --- a/dev/local/ch-logql-compliance/logql-test-queries.yml +++ b/dev/local/ch-logql-compliance/logql-test-queries.yml @@ -59,6 +59,21 @@ test_cases: {job="varlogs"} [{{ .range }}] ) variant_args: ["simpleRangeAggOp", "range"] + # Absent over time range aggregation. + # NOTE(tdakkota): We do a separate test because `absent_over_time` returns non-empty result + # only if there is NO samples in step and vice versa. + - query: |- + absent_over_time( + {job="varlogs"} [{{ .range }}] + ) + variant_args: ["range"] + should_be_empty: true + - query: |- + absent_over_time( + {job="varlogs"} |= "no way line would contain this message" [{{ .range }}] + ) + variant_args: ["range"] + # Unwrap sampler. - query: |- {{ .unwrapRangeAggOp }}( {job="varlogs"} | json | unwrap status @@ -66,6 +81,7 @@ test_cases: ) variant_args: ["unwrapRangeAggOp", "range"] skip_comparison: true # It seems, there is some issues with unwrap. + # Vector aggregation. - query: |- {{ .simpleVecAggOp }} by (filename) (