Skip to content

Commit

Permalink
perf(logqlengine): use index map to avoid result slice allocation
Browse files Browse the repository at this point in the history
```
goos: linux
goarch: amd64
pkg: github.com/go-faster/oteldb/internal/logql/logqlengine
cpu: AMD Ryzen 9 5950X 16-Core Processor
                 │   old.txt   │               new.txt               │
                 │   sec/op    │   sec/op     vs base                │
_groupEntries-32   178.7µ ± 3%   146.6µ ± 3%  -17.95% (p=0.000 n=15)

                 │   old.txt    │               new.txt                │
                 │     B/op     │     B/op      vs base                │
_groupEntries-32   30.88Ki ± 0%   15.27Ki ± 0%  -50.56% (p=0.000 n=15)

                 │   old.txt   │              new.txt               │
                 │  allocs/op  │ allocs/op   vs base                │
_groupEntries-32   191.00 ± 0%   82.00 ± 0%  -57.07% (p=0.000 n=15)
```
  • Loading branch information
tdakkota committed Jul 2, 2024
1 parent 60fcf9a commit b085670
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions internal/logql/logqlengine/engine_log_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/go-faster/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/lokiapi"
Expand Down Expand Up @@ -103,33 +102,38 @@ func (q *LogQuery) eval(ctx context.Context, params EvalParams) (data lokiapi.St

func groupEntries(iter EntryIterator) (s lokiapi.Streams, total int, _ error) {
var (
e Entry
buf = make([]byte, 0, 1024)
streams = map[string]lokiapi.Stream{}
e Entry
buf = make([]byte, 0, 1024)

set = map[string]int{}
streams []lokiapi.Stream
)
for iter.Next(&e) {
buf = e.Set.AppendString(buf[:0])
stream, ok := streams[string(buf)]

idx, ok := set[string(buf)]
if !ok {
stream = lokiapi.Stream{
streams = append(streams, lokiapi.Stream{
Stream: lokiapi.NewOptLabelSet(e.Set.AsLokiAPI()),
}
})
idx = len(streams) - 1
set[string(buf)] = idx
}
stream := &streams[idx]

stream.Values = append(stream.Values, lokiapi.LogEntry{T: uint64(e.Timestamp), V: e.Line})
streams[string(buf)] = stream
total++
}
if err := iter.Err(); err != nil {
return s, 0, err
}

result := maps.Values(streams)
for _, stream := range result {
for _, stream := range streams {
slices.SortFunc(stream.Values, func(a, b lokiapi.LogEntry) int {
return cmp.Compare(a.T, b.T)
})
}
return result, total, nil
return streams, total, nil
}

// ProcessorNode implements [PipelineNode].
Expand Down

0 comments on commit b085670

Please sign in to comment.