Skip to content

Commit

Permalink
Merge pull request #386 from go-faster/feat/logql-planner
Browse files Browse the repository at this point in the history
refactor(logqlengine): introduce planner
  • Loading branch information
tdakkota authored Apr 24, 2024
2 parents 1c49dc2 + 30590bc commit 655c25e
Show file tree
Hide file tree
Showing 47 changed files with 1,860 additions and 881 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/compliance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ jobs:
-end 1m -range 1m \
-config-file logql-test-queries.yml -config-file test-oteldb.yml \
-output-format json -output-file result.oteldb.json \
-output-passing -output-unsupported \
-query-parallelism 2 \
-target 82.97 || true
-target 81.98
- name: Upload artifact
uses: actions/upload-artifact@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
env:
E2E: "1"
TESTCONTAINERS_RYUK_DISABLED: "true"
run: go test -timeout 15m -race -v -coverpkg=./... -coverprofile=profile.out ./...
run: go test -timeout 15m -race -v -coverpkg=./... -coverprofile=profile.out ./integration/...

- name: Emit coverage
run: go tool cover -func profile.out
Expand Down Expand Up @@ -71,4 +71,4 @@ jobs:
- name: Send coverage
uses: codecov/codecov-action@v4
with:
file: profile.out
file: profile.out
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ linters-settings:
- octalLiteral
- typeDefFirst
- unnamedResult
- ptrToRefParam

linters:
disable-all: true
Expand Down
4 changes: 4 additions & 0 deletions cmd/logql-compliance-tester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type OutputConfig struct {
OutputFile string
OutputFormat string
OutputPassing bool
OutputUnsupported bool
MinimumPercentage float64
}

Expand All @@ -41,6 +42,7 @@ type Flags struct {
OutputFile string
OutputFormat string
OutputPassing bool
OutputUnsupported bool
MinimumPercentage float64
}

Expand All @@ -64,6 +66,7 @@ func (f *Flags) Register(set *flag.FlagSet) {
set.StringVar(&f.OutputFile, "output-file", "", "Path to output file")
set.StringVar(&f.OutputFormat, "output-format", "json", "The comparison output format. Valid values: [json]")
set.BoolVar(&f.OutputPassing, "output-passing", false, "Whether to also include passing test cases in the output.")
set.BoolVar(&f.OutputUnsupported, "output-unsupported", false, "Whether to also include unsupported test cases in the output.")
set.Float64Var(&f.MinimumPercentage, "target", math.NaN(), "Minimum compliance percentage")
}

Expand Down Expand Up @@ -95,6 +98,7 @@ func parseConfig() (c Config, _ error) {
OutputFile: flags.OutputFile,
OutputFormat: flags.OutputFormat,
OutputPassing: flags.OutputPassing,
OutputUnsupported: flags.OutputUnsupported,
MinimumPercentage: flags.MinimumPercentage,
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/logql-compliance-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func run(ctx context.Context) error {
results = make([]*lokicompliance.Result, len(cfg.TestCases))
progressBar = pb.StartNew(len(results))
)
// Progress bar messes up Github Actions logs, so keep it static.
if os.Getenv("GITHUB_ACTIONS") == "true" {
progressBar.Set(pb.Static, true)
}

grp, grpCtx := errgroup.WithContext(ctx)
if n := cfg.Parallelism; n > 0 {
Expand All @@ -64,7 +68,7 @@ func run(ctx context.Context) error {
if err := grp.Wait(); err != nil {
return errors.Wrap(err, "run queries")
}
progressBar.Finish()
progressBar.Finish().Write()

return printOutput(results, cfg.Output)
}
Expand Down
34 changes: 21 additions & 13 deletions cmd/logql-compliance-tester/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func printOutput(results []*lokicompliance.Result, cfg OutputConfig) error {
_ = f.Close()
}()

if err := outp(f, results, cfg.OutputPassing); err != nil {
if err := outp(f, results, cfg); err != nil {
return err
}
}

return verifyTargetCompliance(results, cfg.MinimumPercentage)
}

func outp(output io.Writer, results []*lokicompliance.Result, includePassing bool) error {
func outp(output io.Writer, results []*lokicompliance.Result, cfg OutputConfig) error {
for _, r := range results {
if d := r.Diff; d != "" && !r.Unsupported {
fmt.Printf("%q:\n%s\n", r.TestCase.Query, d)
Expand All @@ -44,25 +44,33 @@ func outp(output io.Writer, results []*lokicompliance.Result, includePassing boo

// JSONResult is the JSON output format.
type JSONResult struct {
TotalResults int `json:"totalResults"`
Results []*lokicompliance.Result `json:"results,omitempty"`
IncludePassing bool `json:"includePassing"`
TotalResults int `json:"totalResults"`
Results []*lokicompliance.Result `json:"results,omitempty"`
IncludePassing bool `json:"includePassing"`
IncludeUnsupported bool `json:"includeUnsupported"`
}
total := len(results)
if !includePassing {
var failed []*lokicompliance.Result
if !cfg.OutputPassing || !cfg.OutputUnsupported {
var filter []*lokicompliance.Result
for _, r := range results {
if !r.Success() {
failed = append(failed, r)
if r.Success() && !cfg.OutputPassing {
// Exclude passing.
continue
}
if r.Unsupported && !cfg.OutputUnsupported {
// Exclude unsupported.
continue
}
filter = append(filter, r)
}
results = failed
results = filter
}

buf, err := json.MarshalIndent(JSONResult{
TotalResults: total,
Results: results,
IncludePassing: includePassing,
TotalResults: total,
Results: results,
IncludePassing: cfg.OutputPassing,
IncludeUnsupported: cfg.OutputUnsupported,
}, "", "\t")
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cmd/oteldb/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/chstorage"
"github.com/go-faster/oteldb/internal/httpmiddleware"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
Expand Down Expand Up @@ -175,11 +176,15 @@ func (app *App) trySetupLoki() error {
cfg := app.cfg.LokiConfig
cfg.setDefaults()

var optimizers []logqlengine.Optimizer
optimizers = append(optimizers, logqlengine.DefaultOptimizers()...)
optimizers = append(optimizers, &chstorage.ClickhouseOptimizer{})
engine := logqlengine.NewEngine(q, logqlengine.Options{
ParseOptions: logql.ParseOptions{
AllowDots: true,
},
LookbackDuration: cfg.LookbackDelta,
Optimizers: optimizers,
TracerProvider: app.metrics.TracerProvider(),
})
loki := lokihandler.NewLokiAPI(q, engine)
Expand Down
8 changes: 8 additions & 0 deletions dev/local/ch-logql-compliance/logql-test-queries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ test_cases:
variant_args: ["unwrapRangeAggOp", "range"]
skip_comparison: true # It seems, there is some issues with unwrap.
# Vector aggregation.
- query: |-
{{ .simpleVecAggOp }} by (filename) (
{{ .simpleRangeAggOp }}(
{job="varlogs"}
[{{ .range }}]
)
)
variant_args: ["simpleVecAggOp", "simpleRangeAggOp", "range"]
- query: |-
{{ .simpleVecAggOp }} by (method) (
{{ .simpleRangeAggOp }}(
Expand Down
18 changes: 12 additions & 6 deletions integration/lokie2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-faster/oteldb/integration"
"github.com/go-faster/oteldb/integration/lokie2e"
"github.com/go-faster/oteldb/internal/chstorage"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logstorage"
Expand Down Expand Up @@ -53,9 +54,12 @@ func setupDB(ctx context.Context, t *testing.T, provider *integration.Provider,

gold.Str(t, out.String(), "logs.yml")

var optimizers []logqlengine.Optimizer
optimizers = append(optimizers, logqlengine.DefaultOptimizers()...)
optimizers = append(optimizers, &chstorage.ClickhouseOptimizer{})
engine := logqlengine.NewEngine(engineQuerier, logqlengine.Options{
ParseOptions: logql.ParseOptions{AllowDots: true},
OTELAdapter: true,
Optimizers: optimizers,
TracerProvider: provider,
})

Expand Down Expand Up @@ -203,8 +207,6 @@ func runTest(ctx context.Context, t *testing.T, provider *integration.Provider,
// Negative line matcher.
{`{http_method=~".+"} != "HEAD"`, len(set.Records) - 22},
{`{http_method=~".+"} !~ "HEAD"`, len(set.Records) - 22},
// IP line filter.
{`{http_method="HEAD"} |= ip("236.7.233.166")`, 1},
// Trace to logs.
{`{http_method=~".+"} |= "af36000000000000c517000000000003"`, 1},

Expand Down Expand Up @@ -328,6 +330,12 @@ type LogQueryTest struct {

func testLogQuery(c *lokiapi.Client, params LogQueryTest) func(*testing.T) {
return func(t *testing.T) {
defer func() {
if t.Failed() {
t.Logf("Query: %s", params.Query)
}
}()

ctx := context.Background()

if d, ok := t.Deadline(); ok {
Expand Down Expand Up @@ -370,9 +378,7 @@ func testLogQuery(c *lokiapi.Client, params LogQueryTest) func(*testing.T) {
record, ok := params.Set.Records[pcommon.Timestamp(entry.T)]
require.Truef(t, ok, "can't find log record %d", entry.T)

line := logqlengine.LineFromRecord(
logstorage.NewRecordFromOTEL(pcommon.NewResource(), pcommon.NewInstrumentationScope(), record),
)
line := record.Body().AsString()
assert.Equal(t, line, entry.V)

labelSetHasAttrs(t, stream.Stream.Value, record.Attributes())
Expand Down
20 changes: 20 additions & 0 deletions internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chstorage

import (
"fmt"
"strings"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
Expand Down Expand Up @@ -156,6 +157,25 @@ func attrSelector(name, key string) string {
)
}

func firstAttrSelector(label string) string {
quoted := singleQuoted(label)
var sb strings.Builder
sb.WriteString("coalesce(")
for i, column := range []string{
colAttrs,
colScope,
colResource,
} {
if i != 0 {
sb.WriteString(",")
}
fmt.Fprintf(&sb, "JSONExtract(%s, %s, 'Nullable(String)')", column, quoted)
}
sb.WriteString(",''")
sb.WriteString(")")
return sb.String()
}

// Append adds a new map of attributes.
func (a *Attributes) Append(kv otelstorage.Attrs) {
a.Value.Append(kv)
Expand Down
6 changes: 4 additions & 2 deletions internal/chstorage/columns_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func setStrOrEmpty(col proto.ColumnOf[string], m pcommon.Map, k string) {
col.Append(v.AsString())
}

func (c *logColumns) ForEach(f func(r logstorage.Record)) error {
func (c *logColumns) ForEach(f func(r logstorage.Record) error) error {
for i := 0; i < c.timestamp.Rows(); i++ {
r := logstorage.Record{
Timestamp: otelstorage.NewTimestampFromTime(c.timestamp.Row(i)),
Expand Down Expand Up @@ -103,7 +103,9 @@ func (c *logColumns) ForEach(f func(r logstorage.Record)) error {
// Default just to timestamp.
r.ObservedTimestamp = r.Timestamp
}
f(r)
if err := f(r); err != nil {
return err
}
}
return nil
}
Expand Down
Loading

0 comments on commit 655c25e

Please sign in to comment.