Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full auto_date_histogram #1184

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"time"
)

// TODO: only bucketsNr=1 is supported for now. Implement other cases.
type AutoDateHistogram struct {
ctx context.Context
field model.ColumnRef // name of the field, e.g. timestamp
bucketsNr int
key int64
ctx context.Context
field model.ColumnRef // name of the field, e.g. timestamp
bucketsNr int
lowerBoundInMs *int64 // nil == no bound
upperBoundInMs *int64 // nil == no bound
key int64 // needed still? check
}

// NewAutoDateHistogram creates a new AutoDateHistogram aggregation, during parsing.
Expand Down Expand Up @@ -54,6 +55,6 @@ func (query *AutoDateHistogram) GetField() model.ColumnRef {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
func (query *AutoDateHistogram) SetBounds(key int64) {
query.key = key
}
65 changes: 43 additions & 22 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,65 @@
package model

import (
"fmt"
"math"
"quesma/util"
"strings"
)

// FindTimestampLowerBound returns y if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression.
// FindTimestampBounds returns y if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression.
// (I mean by that a>=0 is fine, a>=0 AND expr2 [AND ...]] is also fine (AND between all), but a>=0 OR e2 is not fine.
// a>=0 AND (expr2 OR expr3) is also fine, as on top level it's only an AND.
// We achieve that by only descending for AND operators.
// If there are multiple such expressions, we return the smallest one.
//
// TODO: add upper bound here too, when bucket_nr=1 in auto_date_histogram (only use case of this function), it's not needed.
func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (timestampInMillis int64, found bool) {
timestampInMillis = math.MaxInt64
func FindTimestampBounds(field ColumnRef, whereClause Expr) (lowerBoundInMs int64, lowerBoundFound bool,
upperBoundInMs int64, upperBoundFound bool) {

lowerBoundInMs = math.MaxInt64
upperBoundInMs = math.MaxInt64
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if columnRef, ok := e.Left.(ColumnRef); ok && columnRef == field && e.Op == ">=" || e.Op == ">" {
if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp64Milli" && len(fun.Args) == 1 {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
if rhsInt64, ok := util.ExtractInt64Maybe(rhs.Value); ok {
timestampInMillis = min(timestampInMillis, rhsInt64)
found = true
}
}
} else if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp" && len(fun.Args) == 1 {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
if rhsInt64, ok := util.ExtractInt64Maybe(rhs.Value); ok {
timestampInMillis = min(timestampInMillis, rhsInt64*1000) // seconds -> milliseconds
found = true
}
}
}
} else if e.Op == "AND" {
if strings.ToUpper(e.Op) == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
return nil
}

columnRef, ok := e.Left.(ColumnRef)
goodField := ok && columnRef == field
goodOp := e.Op == ">=" || e.Op == ">" || e.Op == "<" || e.Op == "<="
fun, ok := e.Right.(FunctionExpr)
goodFun := ok && len(fun.Args) == 1
if !goodField || !goodOp || !goodFun {
return nil
}

var value int64
var found bool
if fun.Name == "fromUnixTimestamp64Milli" {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
value, found = util.ExtractInt64Maybe(rhs.Value)
}
} else if fun.Name == "fromUnixTimestamp" {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
value, found = util.ExtractInt64Maybe(rhs.Value)
value *= 1000
}
}

if found && (e.Op == ">=" || e.Op == ">") {
lowerBoundInMs = min(lowerBoundInMs, value)
lowerBoundFound = true
}
if found && (e.Op == "<" || e.Op == "<=") {
upperBoundInMs = min(upperBoundInMs, value)
upperBoundFound = true
}

return nil
}

whereClause.Accept(visitor)
fmt.Println("lowerBoundInMs: ", lowerBoundInMs, "upperBoundInMs: ", upperBoundInMs)
return
}
4 changes: 4 additions & 0 deletions quesma/queryparser/pancake_sql_query_generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func TestPancakeQueryGeneration(t *testing.T) {
t.Skip("Fix filters")
}

if i != 70 {
t.Skip()
}

if test.TestName == "Line, Y-axis: Min, Buckets: Date Range, X-Axis: Terms, Split Chart: Date Histogram(file:kibana-visualize/agg_req,nr:9)" {
t.Skip("Date range is broken, fix in progress (PR #971)")
}
Expand Down
8 changes: 6 additions & 2 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,12 @@ func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLa
for _, layer := range layers {
if layer.nextBucketAggregation != nil {
if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok {
if tsLowerBound, found := model.FindTimestampLowerBound(autoDateHistogram.GetField(), whereClause); found {
autoDateHistogram.SetKey(tsLowerBound)
// think if if/else is correct here, maybe something better
if lowerBoundInMs, lbFound, upperBoundInMs, ubFound := model.FindTimestampBounds(autoDateHistogram.GetField(), whereClause); lbFound || ubFound {
autoDateHistogram.SetBounds(lowerBoundInMs, lbFound, upperBoundInMs, ubFound)
if sql, sqlNeeded := autoDateHistogram.GenerateSql(); sqlNeeded {
layer.nextBucketAggregation.selectedColumns = append(layer.nextBucketAggregation.selectedColumns, sql)
}
} else {
logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound (field: %v, where clause: %v)",
autoDateHistogram.GetField(), whereClause)
Expand Down
108 changes: 108 additions & 0 deletions quesma/testdata/aggregation_requests_2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4692,6 +4692,7 @@ var AggregationTests2 = []AggregationTestCase{
LIMIT 4`,
},
{ // [70]
<<<<<<< Updated upstream
TestName: "simplest terms with exclude (array of values)",
QueryRequestJson: `
{
Expand Down Expand Up @@ -5176,11 +5177,44 @@ var AggregationTests2 = []AggregationTestCase{
"field": "Carrier",
"size": 2
}
=======
TestName: "auto_date_histogram, buckets>1, simplest case: we take bounds from query part",
QueryRequestJson: `
{
"_source": {
"excludes": []
},
"aggs": {
"2": {
"auto_date_histogram": {
"field": "@timestamp",
"buckets": 5
}
}
},
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"format": "strict_date_optional_time",
"gte": "2024-11-20T19:46:30.033Z",
"lte": "2024-12-05T19:46:30.033Z"
}
}
}
],
"must": [],
"must_not": [],
"should": []
>>>>>>> Stashed changes
}
},
"size": 0,
"track_total_hits": true
}`,
<<<<<<< Updated upstream
// I omit "took", "timed_out", "_shards", and "hits" from the response for brevity (they can also be easily unit-tested)
ExpectedResponse: `
{
Expand Down Expand Up @@ -5325,11 +5359,47 @@ var AggregationTests2 = []AggregationTestCase{
"doc_count": 2
}
]
=======
ExpectedResponse: `
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 2781,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"2": {
"buckets": [
{
"key_as_string": "2024-11-24T00:00:00.000Z",
"key": 1732406400000,
"doc_count": 1635
},
{
"key_as_string": "2024-12-01T00:00:00.000Z",
"key": 1733011200000,
"doc_count": 1146
}
],
"interval": "7d"
>>>>>>> Stashed changes
}
}
}`,
ExpectedPancakeResults: []model.QueryResultRow{
{Cols: []model.QueryResultCol{
<<<<<<< Updated upstream
model.NewQueryResultCol("aggr__terms__parent_count", int64(50000)),
model.NewQueryResultCol("aggr__terms__key_0", nil),
model.NewQueryResultCol("aggr__terms__count", int64(12000)),
Expand Down Expand Up @@ -5423,5 +5493,43 @@ var AggregationTests2 = []AggregationTestCase{
"aggr__terms__key_0"
ORDER BY "aggr__terms__count" DESC, "aggr__terms__key_0" ASC
LIMIT 1`,
=======
model.NewQueryResultCol("aggr__my_buckets__key_0", int64(1734220800000/86400000)),
model.NewQueryResultCol("aggr__my_buckets__key_1", false),
model.NewQueryResultCol("aggr__my_buckets__count", int64(177)),
model.NewQueryResultCol("metric__my_buckets__the_avg_col_0", 780.980444956634),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__my_buckets__key_0", int64(1734220800000/86400000)),
model.NewQueryResultCol("aggr__my_buckets__key_1", true),
model.NewQueryResultCol("aggr__my_buckets__count", int64(27)),
model.NewQueryResultCol("metric__my_buckets__the_avg_col_0", 824.6892135054977),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__my_buckets__key_0", int64(1734134400000/86400000)),
model.NewQueryResultCol("aggr__my_buckets__key_1", false),
model.NewQueryResultCol("aggr__my_buckets__count", int64(295)),
model.NewQueryResultCol("metric__my_buckets__the_avg_col_0", 793.5536717301708),
}},
{Cols: []model.QueryResultCol{ // should be erased by us because of size=3
model.NewQueryResultCol("aggr__my_buckets__key_0", int64(1934134400000/86400000)),
model.NewQueryResultCol("aggr__my_buckets__key_1", false),
model.NewQueryResultCol("aggr__my_buckets__count", int64(888)),
model.NewQueryResultCol("metric__my_buckets__the_avg_col_0", 100000000),
}},
},
ExpectedPancakeSQL: `
SELECT toInt64(toUnixTimestamp64Milli("timestamp") / 86400000) AS
"aggr__my_buckets__key_0", "product" AS "aggr__my_buckets__key_1",
count(*) AS "aggr__my_buckets__count",
avgOrNull("price") AS "metric__my_buckets__the_avg_col_0"
FROM __quesma_table_name
GROUP BY toInt64(toUnixTimestamp64Milli("timestamp") / 86400000) AS
"aggr__my_buckets__key_0",
"product" AS "aggr__my_buckets__key_1"
ORDER BY "aggr__my_buckets__count" DESC, "aggr__my_buckets__key_0" ASC,
"aggr__my_buckets__key_1" ASC
LIMIT 4`,
>>>>>>> Stashed changes
},
}