Skip to content

Commit

Permalink
feat(traceql): parse pipeline operations
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 29, 2023
1 parent b27386e commit 4c1959c
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 18 deletions.
128 changes: 123 additions & 5 deletions internal/traceql/parser_pipeline.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,86 @@
package traceql

import "github.com/go-faster/oteldb/internal/traceql/lexer"
import (
"github.com/go-faster/errors"

"github.com/go-faster/oteldb/internal/traceql/lexer"
)

func (p *parser) parsePipeline() (stages []PipelineStage, _ error) {
for {
n := 0
for {
if t := p.peek(); t.Type != lexer.OpenParen {
switch t.Type {
case lexer.OpenBrace,
lexer.Integer,
lexer.Number,
lexer.Duration,
lexer.Count,
lexer.Max,
lexer.Min,
lexer.Avg,
lexer.Sum:
default:
if n > 0 {
p.unread()
return stages, p.unexpectedToken(p.next())
}
}
break
}
p.next()
n++
}
unreadParens := func() {
for i := 0; i < n; i++ {
p.unread()
}
}
switch t := p.peek(); t.Type {
case lexer.OpenParen, lexer.OpenBrace:
case lexer.OpenBrace:
unreadParens()
expr, err := p.parseSpansetExpr()
if err != nil {
return stages, err
}
stages = append(stages, expr)
case
lexer.Integer,
case lexer.By:
p.next()

op, err := p.parseGroupOperation()
if err != nil {
return stages, err
}
stages = append(stages, op)
case lexer.Coalesce:
if len(stages) < 1 {
return stages, errors.Errorf("coalesce cannot be first operation: at %s", t.Pos)
}
p.next()

op, err := p.parseCoalesceOperation()
if err != nil {
return stages, err
}
stages = append(stages, op)
case lexer.Select:
p.next()

op, err := p.parseSelectOperation()
if err != nil {
return stages, err
}
stages = append(stages, op)
case lexer.Integer,
lexer.Number,
lexer.Duration,
lexer.Count,
lexer.Max,
lexer.Min,
lexer.Avg,
lexer.Sum:
unreadParens()
expr, err := p.parseScalarFilter()
if err != nil {
return stages, err
Expand Down Expand Up @@ -56,7 +117,7 @@ func (p *parser) parseSpansetExpr1() (SpansetExpr, error) {
if err := p.consume(lexer.CloseParen); err != nil {
return nil, err
}
return expr, nil
return &ParenSpansetExpr{Expr: expr}, nil
case lexer.OpenBrace:
var filter SpansetFilter
if t2 := p.peek(); t2.Type != lexer.CloseBrace {
Expand Down Expand Up @@ -159,3 +220,60 @@ func (p *parser) parseScalarFilter() (*ScalarFilter, error) {

return &ScalarFilter{Left: left, Op: op, Right: right}, nil
}

func (p *parser) parseGroupOperation() (*GroupOperation, error) {
if err := p.consume(lexer.OpenParen); err != nil {
return nil, err
}

field, err := p.parseFieldExpr()
if err != nil {
return nil, err
}

if err := p.consume(lexer.CloseParen); err != nil {
return nil, err
}

return &GroupOperation{By: field}, nil
}

func (p *parser) parseCoalesceOperation() (*CoalesceOperation, error) {
if err := p.consume(lexer.OpenParen); err != nil {
return nil, err
}

if err := p.consume(lexer.CloseParen); err != nil {
return nil, err
}

return &CoalesceOperation{}, nil
}

func (p *parser) parseSelectOperation() (s *SelectOperation, _ error) {
s = new(SelectOperation)

if err := p.consume(lexer.OpenParen); err != nil {
return nil, err
}

for {
field, err := p.parseFieldExpr()
if err != nil {
return nil, err
}
s.Args = append(s.Args, field)

if t := p.peek(); t.Type != lexer.Comma {
break
}
// Consume comma.
p.next()
}

if err := p.consume(lexer.CloseParen); err != nil {
return nil, err
}

return s, nil
}
122 changes: 109 additions & 13 deletions internal/traceql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,31 @@ var tests = []TestCase{
},
false,
},
{
`({ .a } || { .b }) > { .c }`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&BinarySpansetExpr{
Left: &ParenSpansetExpr{
Expr: &BinarySpansetExpr{
Left: &SpansetFilter{
Expr: &Attribute{Name: "a"},
},
Op: SpansetOpUnion,
Right: &SpansetFilter{
Expr: &Attribute{Name: "b"},
},
},
},
Op: SpansetOpChild,
Right: &SpansetFilter{
Expr: &Attribute{Name: "c"},
},
},
},
},
false,
},
{
`{ .a } && { .b } ~ { .c }`,
&SpansetPipeline{
Expand Down Expand Up @@ -394,30 +419,36 @@ var tests = []TestCase{
false,
},
{
`avg(.foo) > count() + sum(.bar)`,
`-2 = -2`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&ScalarFilter{
Left: &AggregateScalarExpr{Op: AggregateOpAvg, Field: &Attribute{Name: "foo"}},
Op: OpGt,
Right: &BinaryScalarExpr{
Left: &AggregateScalarExpr{Op: AggregateOpCount},
Op: OpAdd,
Right: &AggregateScalarExpr{Op: AggregateOpSum, Field: &Attribute{Name: "bar"}},
},
Left: &Static{Type: StaticInteger, Data: uint64(-2 + noConst)},
Op: OpEq,
Right: &Static{Type: StaticInteger, Data: uint64(-2 + noConst)},
},
},
},
false,
},
{
`-2 = -2`,
`(1+2)^3 = 27`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&ScalarFilter{
Left: &Static{Type: StaticInteger, Data: uint64(-2 + noConst)},
Left: &BinaryScalarExpr{
Left: &ParenScalarExpr{
Expr: &BinaryScalarExpr{
Left: &Static{Type: StaticInteger, Data: uint64(1)},
Op: OpAdd,
Right: &Static{Type: StaticInteger, Data: uint64(2)},
},
},
Op: OpPow,
Right: &Static{Type: StaticInteger, Data: uint64(3)},
},
Op: OpEq,
Right: &Static{Type: StaticInteger, Data: uint64(-2 + noConst)},
Right: &Static{Type: StaticInteger, Data: uint64(27)},
},
},
},
Expand Down Expand Up @@ -449,12 +480,29 @@ var tests = []TestCase{
false,
},
{
`avg(.foo) + count() > sum(.bar)`,
`max(.foo) > count() + sum(.bar)`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&ScalarFilter{
Left: &AggregateScalarExpr{Op: AggregateOpMax, Field: &Attribute{Name: "foo"}},
Op: OpGt,
Right: &BinaryScalarExpr{
Left: &AggregateScalarExpr{Op: AggregateOpCount},
Op: OpAdd,
Right: &AggregateScalarExpr{Op: AggregateOpSum, Field: &Attribute{Name: "bar"}},
},
},
},
},
false,
},
{
`min(.foo) + count() > sum(.bar)`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&ScalarFilter{
Left: &BinaryScalarExpr{
Left: &AggregateScalarExpr{Op: AggregateOpAvg, Field: &Attribute{Name: "foo"}},
Left: &AggregateScalarExpr{Op: AggregateOpMin, Field: &Attribute{Name: "foo"}},
Op: OpAdd,
Right: &AggregateScalarExpr{Op: AggregateOpCount},
},
Expand All @@ -465,13 +513,61 @@ var tests = []TestCase{
},
false,
},
{
`{ .a } | by(.b) | coalesce() | select(.c, .d) | avg(duration) = 1s`,
&SpansetPipeline{
Pipeline: []PipelineStage{
&SpansetFilter{
Expr: &Attribute{Name: "a"},
},
&GroupOperation{
By: &Attribute{Name: "b"},
},
&CoalesceOperation{},
&SelectOperation{
Args: []FieldExpr{
&Attribute{Name: "c"},
&Attribute{Name: "d"},
},
},
&ScalarFilter{
Left: &AggregateScalarExpr{
Op: AggregateOpAvg,
Field: &Attribute{Prop: SpanDuration},
},
Op: OpEq,
Right: &Static{Type: StaticDuration, Data: uint64(time.Second)},
},
},
},
false,
},

// Invalid syntax.
{`{`, nil, true},
{`{ 1+ }`, nil, true},
{`{ -- }`, nil, true},
{`{ (1+) }`, nil, true},
{`{ (1+1 }`, nil, true},
{`{} | `, nil, true},
{`{} | by`, nil, true},
{`{} | coalesce`, nil, true},
{`{} | select`, nil, true},
{`{} | by(.foo`, nil, true},
{`{} | coalesce(`, nil, true},
{`{} | select(.foo`, nil, true},
// Parameter is required,
{`{} | max()`, nil, true},
{`{} | min()`, nil, true},
{`{} | avg()`, nil, true},
{`{} | sum()`, nil, true},
{`{} | by()`, nil, true},
{`{} | select()`, nil, true},
{`{} | select(.foo,)`, nil, true},
// Parameter is not allowed.
{`{} | count(.foo) = 10`, nil, true},
// Stage `coalesce`` cannot be first stage.
{`coalesce()`, nil, true},
}

func TestParse(t *testing.T) {
Expand Down
23 changes: 23 additions & 0 deletions internal/traceql/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ type PipelineStage interface {
}

func (*BinarySpansetExpr) pipelineStage() {}
func (*ParenSpansetExpr) pipelineStage() {}
func (*SpansetFilter) pipelineStage() {}
func (*ScalarFilter) pipelineStage() {}
func (*GroupOperation) pipelineStage() {}
func (*CoalesceOperation) pipelineStage() {}
func (*SelectOperation) pipelineStage() {}

// SpansetExpr is a spanset expression.
type SpansetExpr interface {
Expand All @@ -16,6 +20,7 @@ type SpansetExpr interface {
}

func (*BinarySpansetExpr) spansetExpr() {}
func (*ParenSpansetExpr) spansetExpr() {}
func (*SpansetFilter) spansetExpr() {}

// BinarySpansetExpr is a binary operation between two spanset expressions.
Expand All @@ -25,6 +30,11 @@ type BinarySpansetExpr struct {
Right SpansetExpr
}

// ParenSpansetExpr is a parenthesized spanset expression.
type ParenSpansetExpr struct {
Expr SpansetExpr
}

// SpansetFilter is a spanset filter.
type SpansetFilter struct {
Expr FieldExpr // if filter is empty, expr is True
Expand All @@ -36,3 +46,16 @@ type ScalarFilter struct {
Op BinaryOp
Right ScalarExpr
}

// GroupOperation is a `by()` operation.
type GroupOperation struct {
By FieldExpr
}

// CoalesceOperation is a `colaesce()` operation.
type CoalesceOperation struct{}

// SelectOperation is a `select()` operation.
type SelectOperation struct {
Args []FieldExpr
}

0 comments on commit 4c1959c

Please sign in to comment.