Skip to content

Commit

Permalink
Add jq function for epoch in seconds (#367)
Browse files Browse the repository at this point in the history
* Rename `epoch` to `epochMillis`

* Add second-granularity epoch function

Including a test of chaining jq commands
  • Loading branch information
colmsnowplow authored Sep 11, 2024
1 parent 32006eb commit b3c04cb
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
19 changes: 18 additions & 1 deletion pkg/transform/jq.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) {
return nil, fmt.Errorf("error parsing jq command: %s", err)
}

// epoch converts a time.Time to an epoch in seconds, as integer type.
// It must be an integer in order to chain with jq-native time functions
withEpochFunction := gojq.WithFunction("epoch", 0, 1, func(a1 any, a2 []any) any {
if a1 == nil {
return nil
Expand All @@ -122,10 +124,25 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) {
return errors.New("Not a valid time input to 'epoch' function")
}

return int(validTime.Unix())
})

// epochMillis converts a time.Time to an epoch in milliseconds
withEpochMillisFunction := gojq.WithFunction("epochMillis", 0, 1, func(a1 any, a2 []any) any {
if a1 == nil {
return nil
}

validTime, ok := a1.(time.Time)

if !ok {
return errors.New("Not a valid time input to 'epochMillis' function")
}

return validTime.UnixMilli()
})

code, err := gojq.Compile(query, withEpochFunction)
code, err := gojq.Compile(query, withEpochMillisFunction, withEpochFunction)
if err != nil {
return nil, fmt.Errorf("error compiling jq query: %s", err)
}
Expand Down
87 changes: 86 additions & 1 deletion pkg/transform/jq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ func TestJQRunFunction_SpMode_true(t *testing.T) {
ExpInterState interface{}
Error error
}{
{
Scenario: "test_timestamp_to_epochMillis",
JQCommand: `{ foo: .collector_tstamp | epochMillis }`,
InputMsg: &models.Message{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"foo":1557499235972}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epoch",
JQCommand: `{ foo: .collector_tstamp | epoch }`,
Expand All @@ -43,7 +62,26 @@ func TestJQRunFunction_SpMode_true(t *testing.T) {
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"foo":1557499235972}`),
Data: []byte(`{"foo":1557499235}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epoch_chained",
JQCommand: `{ foo: .collector_tstamp | epoch | todateiso8601 }`,
InputMsg: &models.Message{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"foo":"2019-05-10T14:40:35Z"}`),
PartitionKey: "some-key",
},
"filtered": nil,
Expand Down Expand Up @@ -211,6 +249,30 @@ func TestJQRunFunction_SpMode_false(t *testing.T) {
ExpInterState: nil,
Error: nil,
},
{
Scenario: "epochMillis_on_nullable",
JQCommand: `
{
explicit_null: .explicit | epochMillis,
no_such_field: .nonexistent | epochMillis,
non_null: .non_null
}`,
InputMsg: &models.Message{
Data: []byte(`{"explicit": null, "non_null": "hello"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"non_null":"hello"}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "epoch_on_nullable",
JQCommand: `
Expand Down Expand Up @@ -480,6 +542,29 @@ func TestJQRunFunction_errors(t *testing.T) {
ExpInterState: nil,
Error: errors.New("jq query got no output"),
},
{
Scenario: "epochMillis_on_non_time_type",
JQConfig: &JQMapperConfig{
JQCommand: `.str | epochMillis`,
RunTimeoutMs: 100,
SpMode: false,
},
InputMsg: &models.Message{
Data: []byte(`{"str": "value"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": nil,
"filtered": nil,
"failed": {
Data: []byte(`{"str": "value"}`),
PartitionKey: "some-key",
},
},
ExpInterState: nil,
Error: errors.New("Not a valid time input to 'epochMillis' function"),
},
{
Scenario: "epoch_on_non_time_type",
JQConfig: &JQMapperConfig{
Expand Down

0 comments on commit b3c04cb

Please sign in to comment.