Skip to content

Commit

Permalink
add _gte and _lte operators to timestamp (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Nov 7, 2024
1 parent 347d930 commit e51590c
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 130 deletions.
4 changes: 2 additions & 2 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,13 @@ func findNativeQueryVariables(nq metadata.NativeQuery) (map[string]metadata.Nati

if match[0] > 0 && nq.Query[match[0]-1] == '[' {
// duration variables should be bounded by square brackets
if match[1] >= queryLength-1 || nq.Query[match[1]] != ']' {
if match[1] >= queryLength || nq.Query[match[1]] != ']' {
return nil, fmt.Errorf("invalid promQL range syntax")
}
argumentInfo.Type = string(metadata.ScalarDuration)
} else if match[0] > 0 && nq.Query[match[0]-1] == '"' {
// duration variables should be bounded by double quotes
if match[1] >= queryLength-1 || nq.Query[match[1]] != '"' {
if match[1] >= queryLength || nq.Query[match[1]] != '"' {
return nil, fmt.Errorf("invalid promQL string syntax")
}
argumentInfo.Type = string(metadata.ScalarString)
Expand Down
103 changes: 23 additions & 80 deletions connector/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ import (
type UnixTimeUnit string

const (
UnixTimeSecond UnixTimeUnit = "s"
UnixTimeMillisecond UnixTimeUnit = "ms"
UnixTimeSecond UnixTimeUnit = "s"
UnixTimeMilli UnixTimeUnit = "ms"
UnixTimeMicro UnixTimeUnit = "us"
UnixTimeNano UnixTimeUnit = "ns"
)

// Duration returns the duration of the unit
func (ut UnixTimeUnit) Duration() time.Duration {
switch ut {
case UnixTimeMillisecond:
case UnixTimeMilli:
return time.Millisecond
case UnixTimeMicro:
return time.Microsecond
case UnixTimeNano:
return time.Nanosecond
default:
return time.Second
}
Expand All @@ -44,40 +50,6 @@ func evalStepFromRange(start time.Time, end time.Time) time.Duration {
}
}

// ParseDuration parses duration from an unknown value
func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(value))
if !ok {
return 0, nil
}

return parseDurationReflection(reflectValue, reflectValue.Kind(), unixTimeUnit)
}

// parseDurationReflection parses duration from a reflection value
func parseDurationReflection(reflectValue reflect.Value, kind reflect.Kind, unixTimeUnit UnixTimeUnit) (time.Duration, error) {
switch kind {
case reflect.Invalid:
return 0, nil
case reflect.String:
strValue := reflectValue.String()
if d, err := model.ParseDuration(strValue); err == nil {
return time.Duration(d), nil
} else {
return 0, fmt.Errorf("unable to parse duration from string %s: %s", strValue, err)
}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
// or as a number in seconds
return time.Duration(reflectValue.Int()) * unixTimeUnit.Duration(), nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return time.Duration(reflectValue.Uint()) * unixTimeUnit.Duration(), nil
case reflect.Float32, reflect.Float64:
return time.Duration(int64(reflectValue.Float() * float64(unixTimeUnit.Duration()))), nil
default:
return 0, fmt.Errorf("unable to parse duration from kind %v", kind)
}
}

// RangeResolution represents the given range and resolution with format xx:xx
type RangeResolution struct {
Range model.Duration
Expand All @@ -92,6 +64,18 @@ func (rr RangeResolution) String() string {
return fmt.Sprintf("%s:%s", rr.Range.String(), rr.Resolution.String())
}

// ParseDuration parses duration from an unknown value
func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error) {
result, err := utils.DecodeNullableDuration(value, utils.WithBaseUnix(unixTimeUnit.Duration()))
if err != nil {
return 0, err
}
if result == nil {
return 0, nil
}
return *result, nil
}

// ParseRangeResolution parses the range resolution from a string
func ParseRangeResolution(input any, unixTimeUnit UnixTimeUnit) (*RangeResolution, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(input))
Expand All @@ -101,7 +85,7 @@ func ParseRangeResolution(input any, unixTimeUnit UnixTimeUnit) (*RangeResolutio

kind := reflectValue.Kind()
if kind != reflect.String {
rng, err := parseDurationReflection(reflectValue, kind, unixTimeUnit)
rng, err := utils.DecodeDuration(reflectValue.Interface(), utils.WithBaseUnix(unixTimeUnit.Duration()))
if err != nil {
return nil, fmt.Errorf("invalid range resolution %v: %s", input, err)
}
Expand Down Expand Up @@ -133,48 +117,7 @@ func ParseRangeResolution(input any, unixTimeUnit UnixTimeUnit) (*RangeResolutio

// ParseTimestamp parses timestamp from an unknown value
func ParseTimestamp(s any, unixTimeUnit UnixTimeUnit) (*time.Time, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(s))
if !ok {
return nil, nil
}

baseMs := int64(unixTimeUnit.Duration() / time.Millisecond)
kind := reflectValue.Kind()
switch kind {
case reflect.Invalid:
return nil, nil
case reflect.String:
strValue := reflectValue.String()
if strValue == "now" {
now := time.Now()
return &now, nil
}
// Input timestamps may be provided either in RFC3339 format
for _, format := range []string{time.RFC3339, "2006-01-02T15:04:05Z0700", "2006-01-02T15:04:05-0700", time.RFC3339Nano, time.DateOnly} {
if t, err := time.Parse(format, strValue); err == nil {
return &t, nil
}
}
if d, err := time.ParseDuration(strValue); err == nil {
result := time.Now().Add(-d)
return &result, nil
} else {
return nil, fmt.Errorf("unable to parse timestamp from string %s: %s", strValue, err)
}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
// or as a Unix timestamp,
// with optional decimal places for sub-second precision
result := time.UnixMilli(reflectValue.Int() * baseMs)
return &result, nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
result := time.UnixMilli(int64(reflectValue.Uint()) * baseMs)
return &result, nil
case reflect.Float32, reflect.Float64:
result := time.UnixMilli(int64(reflectValue.Float() * float64(baseMs)))
return &result, nil
default:
return nil, fmt.Errorf("unable to parse timestamp from kind %v", kind)
}
return utils.DecodeNullableDateTime(s, utils.WithBaseUnix(unixTimeUnit.Duration()))
}

type apiResponse struct {
Expand Down
8 changes: 4 additions & 4 deletions connector/internal/collection_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (pr *CollectionRequest) evalQueryPredicate(expression schema.Expression) er
return errors.New("unsupported multiple equality for the timestamp")
}
pr.Timestamp = expr.Value
case metadata.Least:
case metadata.Least, metadata.LeastOrEqual:
if pr.End != nil {
return errors.New("unsupported multiple _lt expressions for the timestamp")
return errors.New("unsupported multiple _lt or _lte expressions for the timestamp")
}
pr.End = expr.Value
case metadata.Greater:
case metadata.Greater, metadata.GreaterOrEqual:
if pr.Start != nil {
return errors.New("unsupported multiple _gt expressions for the timestamp")
return errors.New("unsupported multiple _gt or _gt expressions for the timestamp")
}
pr.Start = expr.Value
default:
Expand Down
2 changes: 1 addition & 1 deletion connector/internal/native_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*NativeQueryReques
}

if unresolvedArguments := metadata.FindNativeQueryVariableNames(queryString); len(unresolvedArguments) > 0 {
return nil, "", schema.BadRequestError(fmt.Sprintf("unresolved variables %v in the Prometheus query", unresolvedArguments), map[string]any{
return nil, "", schema.UnprocessableContentError(fmt.Sprintf("unresolved variables %v in the query", unresolvedArguments), map[string]any{
"collection": nqe.Request.Collection,
"query": queryString,
})
Expand Down
22 changes: 14 additions & 8 deletions connector/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"math"
"slices"
"strconv"
"time"

"github.com/hasura/ndc-prometheus/connector/client"
"github.com/hasura/ndc-prometheus/connector/metadata"
"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
Expand All @@ -17,7 +17,7 @@ import (
func createQueryResultsFromVector(vector model.Vector, labels map[string]metadata.LabelInfo, runtime *metadata.RuntimeSettings, flat bool) []map[string]any {
results := make([]map[string]any, len(vector))
for i, item := range vector {
ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp)
value := formatValue(item.Value, runtime.Format)
r := map[string]any{
metadata.TimestampKey: ts,
Expand Down Expand Up @@ -65,7 +65,7 @@ func createGroupQueryResultsFromMatrix(matrix model.Matrix, labels map[string]me
valuesLen := len(item.Values)
values := make([]map[string]any, valuesLen)
for i, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp)
v := formatValue(value.Value, runtime.Format)
values[i] = map[string]any{
metadata.TimestampKey: ts,
Expand All @@ -89,7 +89,7 @@ func createFlatQueryResultsFromMatrix(matrix model.Matrix, labels map[string]met

for _, item := range matrix {
for _, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp)
v := formatValue(value.Value, runtime.Format)
r := map[string]any{
metadata.LabelsKey: item.Metric,
Expand All @@ -109,12 +109,18 @@ func createFlatQueryResultsFromMatrix(matrix model.Matrix, labels map[string]met
return results
}

func formatTimestamp(ts model.Time, format metadata.TimestampFormat, unixTime client.UnixTimeUnit) any {
func formatTimestamp(ts model.Time, format metadata.TimestampFormat) any {
switch format {
case metadata.TimestampRFC3339:
return ts.Time().Format(time.RFC3339)
case metadata.TimestampUnix:
return ts.Unix()
case metadata.TimestampUnixMilli:
return ts.Time().UnixMilli()
case metadata.TimestampUnixMicro:
return ts.Time().UnixMicro()
case metadata.TimestampUnixNano:
return strconv.FormatInt(ts.UnixNano(), 10)
default:
return ts.Unix() * int64(time.Second/unixTime.Duration())
return ts.Time().Format(time.RFC3339)
}
}

Expand Down
14 changes: 11 additions & 3 deletions connector/metadata/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ type GeneratorSettings struct {
type TimestampFormat string

const (
// Represents the timestamp as a Unix timestamp in RFC3339 string.
TimestampRFC3339 TimestampFormat = "rfc3339"
TimestampUnix TimestampFormat = "unix"
// Represents the timestamp as a Unix timestamp in seconds.
TimestampUnix TimestampFormat = "unix"
// Represents the timestamp as a Unix timestamp in milliseconds.
TimestampUnixMilli TimestampFormat = "unix_ms"
// Represents the timestamp as a Unix timestamp in microseconds.
TimestampUnixMicro TimestampFormat = "unix_us"
// Represents the timestamp as a Unix timestamp in nanoseconds.
TimestampUnixNano TimestampFormat = "unix_ns"
)

// ValueFormat the format for value serialization
Expand All @@ -77,7 +85,7 @@ const (
// RuntimeFormatSettings format settings for timestamps and values in runtime
type RuntimeFormatSettings struct {
// The serialization format for timestamp
Timestamp TimestampFormat `json:"timestamp" yaml:"timestamp" jsonschema:"enum=rfc3339,enum=unix,default=unix"`
Timestamp TimestampFormat `json:"timestamp" yaml:"timestamp" jsonschema:"enum=rfc3339,enum=unix,enum=unix_ms,enum=unix_us,enum=unix_ns,default=unix"`
// The serialization format for value
Value ValueFormat `json:"value" yaml:"value" jsonschema:"enum=string,enum=float64,default=string"`
// The serialization format for not-a-number values
Expand All @@ -93,7 +101,7 @@ type RuntimeSettings struct {
// Flatten value points to the root array
Flat bool `json:"flat" yaml:"flat"`
// The default unit for unix timestamp
UnixTimeUnit client.UnixTimeUnit `json:"unix_time_unit" yaml:"unix_time_unit" jsonschema:"enum=s,enum=ms,default=s"`
UnixTimeUnit client.UnixTimeUnit `json:"unix_time_unit" yaml:"unix_time_unit" jsonschema:"enum=s,enum=ms,enum=us,enum=ns,default=s"`
// The serialization format for response fields
Format RuntimeFormatSettings `json:"format" yaml:"format"`
// The concurrency limit of queries if there are many variables in a single query
Expand Down
8 changes: 5 additions & 3 deletions connector/metadata/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ var defaultScalars = map[string]schema.ScalarType{
string(ScalarTimestamp): {
AggregateFunctions: schema.ScalarTypeAggregateFunctions{},
ComparisonOperators: map[string]schema.ComparisonOperatorDefinition{
Equal: schema.NewComparisonOperatorEqual().Encode(),
Least: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
Greater: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
Equal: schema.NewComparisonOperatorEqual().Encode(),
Least: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
LeastOrEqual: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
Greater: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
GreaterOrEqual: schema.NewComparisonOperatorCustom(schema.NewNamedType(string(ScalarTimestamp))).Encode(),
},
Representation: schema.NewTypeRepresentationTimestamp().Encode(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
"type": "binary_comparison_operator",
"column": { "type": "column", "name": "timestamp", "path": [] },
"operator": "_gt",
"value": { "type": "scalar", "value": "2024-09-29T00:00:00Z" }
"value": { "type": "scalar", "value": "2024-10-29T00:00:00Z" }
}
},
"arguments": {
"fn": {
"type": "literal",
"value": [{ "increase": "1m" }, { "sum": ["job"] }, { "avg": ["job"] }]
},
"step": { "type": "literal", "value": "5m" },
"step": { "type": "literal", "value": "1d" },
"timeout": { "type": "literal", "value": "1m" },
"flat": { "type": "literal", "value": true }
},
Expand Down
Loading

0 comments on commit e51590c

Please sign in to comment.