Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spEnrichedFilter #60

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ba71200
Add spEnrichedFilter
colmsnowplow Jun 16, 2021
fc495fd
Address type coercion in valueFound and add test case for it
colmsnowplow Jun 23, 2021
d9cd827
Remove unnecessary note
colmsnowplow Jun 23, 2021
cd5e8a4
Move initial evaluation of filterConfig outside returned function and…
colmsnowplow Jun 28, 2021
39c4c8c
Add missing copyright notice
colmsnowplow Jun 28, 2021
d09af8a
Move interpretation of intermediateState into utility function
colmsnowplow Jun 28, 2021
1469b99
Use utility function to handle intermediateState for legacy implement…
colmsnowplow Jun 28, 2021
5a5002a
Refactor filtering
colmsnowplow Jul 7, 2021
03793bf
Cleanup
colmsnowplow Jul 7, 2021
c5f7064
Break transformation loop if message is filtered
colmsnowplow Jul 8, 2021
0e019f7
Update observer model to accommodate filtered messages
colmsnowplow Jul 8, 2021
e98d953
Fix bug which fails messages where the requested field is empty
colmsnowplow Jul 8, 2021
8428a31
Cleanup
colmsnowplow Jul 8, 2021
3b10912
Improve readability of filtering logic
colmsnowplow Jul 13, 2021
2d46288
Improve invalid filter config error message
colmsnowplow Jul 13, 2021
79ebc62
Fix doc for observer Filtered function
colmsnowplow Jul 13, 2021
09a8e0d
Patch broken config test
colmsnowplow Jul 13, 2021
9e49dfd
Rename intermediateAsParsed to intermediateAsSpEnrichedParsed
colmsnowplow Jul 13, 2021
83f36db
Fix lint issues
colmsnowplow Jul 13, 2021
1c40689
Remove unnecessary else block in intermediateAsSpEnrichedParsed
colmsnowplow Jul 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,11 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er
funcs = append(funcs, transform.SpEnrichedToJson)
case "spEnrichedSetPk":
funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1]))
case "spEnrichedFilter":
colmsnowplow marked this conversation as resolved.
Show resolved Hide resolved
funcs = append(funcs, transform.NewSpEnrichedFilterFunction(funcOpts[1]))
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got '%s'", c.Transformation))
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Transformation))
}
}
return transform.NewTransformation(funcs...), nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestNewConfig_InvalidTransformation(t *testing.T) {
transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got 'fake'", err.Error())
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fake'", err.Error())
}

func TestNewConfig_InvalidTarget(t *testing.T) {
Expand Down
80 changes: 80 additions & 0 deletions pkg/transform/snowplow_enriched_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.

package transform

import (
"fmt"
"strings"

"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/snowplow/snowplow-golang-analytics-sdk/analytics"
)

//

// NewSpEnrichedFilter returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
// The filterconfig should describe the conditions for including a message.
// For example "aid=abc|def" includes all events with app IDs of abc or def, and filters out the rest.
// aid!=abc|def includes all events whose app IDs do not match abc or def, and filters out the rest.
func NewSpEnrichedFilterFunction(filterConfig string) TransformationFunction {
return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, interface{}) {

// Check for a negation condition first
keyValues := strings.SplitN(filterConfig, "!=", 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first read the PR words, I thought it was multiple statements split with | but now I realise it's a single field with multiple options. I think it might be worth adding multiple field filters rather than multiple values per field, as that offers more flexibility without too much additional complexity (a little more though).

e.g.
Rather than MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{value2}|... we have MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{field}=={value2}|{field2}=={value25}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So following our chat - some clarification. An individual filter can only deal with one field, but can match n values. So an individual filter can either:

  • Include the event if the value is one of a set of values,
    OR
  • Exclude the event if the value is one of a set of values

However, we can apply as many filters as we like in a row - essentially chaining the above one after another. So your above example would be configured like so:

MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{value2},spEnrichedFilter:{field2}=={value25}

So if we have conditions which depend on multiple fields, we can do it as long as we require both conditions to be met. If we require that one or the other condition is met, we currently cannot configure that. So, if the example requirement was:

[field == value1|value2] OR [field2 == value25]

Then we'd be out of luck.


var keepMessage bool
if len(keyValues) > 1 {
// If negation condition is found, default to keep the message, and change this when match found
keepMessage = true
} else {
// Otherwise, look for affirmation condition, default to drop the message and change when match found
keyValues = strings.SplitN(filterConfig, "==", 2)
keepMessage = false
}
// TODO: Design - Should there be validation of the input here, or perhaps in the config? Or at all?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should definitely make sure the input is valid. It'd be pretty easy to make a mistake when configuring this. Even if it's just ensuring all the characters are valid and it's been parsed resonably, if not then responding with an example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - needs testing but as we discussed moving this section of code out of the returned function and into the body of NewSpEnrichedFilterFunction should achieve what we need.


// Todo: make this its own function and DRY across all the transformations?
var parsedMessage, ok = intermediateState.(analytics.ParsedEvent)
var parseErr error
if ok {
parsedMessage = intermediateState.(analytics.ParsedEvent)
} else {
parsedMessage, parseErr = analytics.ParseEvent(string(message.Data))
if parseErr != nil {
message.SetError(parseErr)
return nil, message, nil
}
intermediateState = parsedMessage
}

valueFound, err := parsedMessage.GetValue(keyValues[0])
if err != nil {
message.SetError(err)
return nil, message, nil
}

evaluation:
for _, valueToMatch := range strings.Split(keyValues[1], "|") {
if valueToMatch == fmt.Sprintf("%v", valueFound) { // coerce to string as valueFound may be any type found in a Snowplow event
keepMessage = !keepMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if isNegationFilter {
  shouldKeepMessage = false
} else {
  shouldKeepMessage = true
} 

For me this makes it clearer what this does rather than flipping a boolean. I think this is worth the extra check for the sake of readability, especially since will only be called once per filter match. If you don't want the if then shouldKeepMessage = !isNegationFilter is clearer than flipping the same boolean.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the brevity of shouldKeepMessage = !isNegationFilter, but I think you're right that the if statement is much better for readability so going with that. :)

break evaluation
// Once config value is matched once, change keepMessage then break out of the loop to avoid reverting back when we have two matches
}
}

// If message is not to be kept, ack it and return a nil result.
if !keepMessage {
if message.AckFunc != nil {
message.AckFunc()
}
return nil, nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite clear, marking it as neither a success or a failure. It seems a little odd that a "transformation" can Ack a message though, it feels like a bit of an unexpected side effect of a transformation.
I wonder if we can take the same logic of the transformation section and pull it into a filter section in the code base to keep the two concerns separated? This might also afford us some future flexibility in filtering without having to try and squeeze it into the same shape as a transformation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree, but would consider it out of scope of what we're doing here - which is to implement whatever sensible filtering we can in the short term.

I think it makes sense to separate the concept of a filter from the concept of a transformation. The only time I can really see this becoming a problem is if a filter were to depend on a transformation... But current implementation doesn't allow for this anyway... So I think certainly food for thought and a more nuanced design is well worth thinking over.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a stop gap how do you feel about extending the interface to return a filtered message list so that we can handle what to do with the filtered set outside of the transformations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth considering... I did knock the idea about in my head, but ultimately figured that if we're gonna just ack them and ignore them, it's more efficient to do it straight away rather than pass more data around.

Having said that I'm not opposed to doing it that way for this instrumentation!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further reflection on doing this - I think I am actually opposed to doing it this way, for the simple reason that it involves a lot of changes in a lot of places that we'll presumably want to revert back should we redesign it out:

  • Change the model for transformations
  • Change each transformation to suit the new model
  • Change the transformationApplyFunction to handle slices of filtered data
  • Change the main function code in cmd/serverless and cmd/cli to ack the filtered data

Ultimately, it's not that these aren't worth doing, but that's as much work or more than just implementing Paul's suggestion that filters are done separately from transformations - and if we're to decide that's the best design, we'd need to undo all of the above in order to re-implement against that design.

Thoughts @jbeemster ?

Copy link
Collaborator Author

@colmsnowplow colmsnowplow Jul 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just popping an update here for clarity since I'm working on another branch atm, and we've progressed the discussion elsewhere.

We have discussed this and leaving things as they are isn't really an option - so we'll need to refactor something. At the moment am leaning towards Josh's suggestion of modifying the transformationResult model to allow a 'filtered' slice of messages, which are subsequently acked outside of the function.

Separating filters out completely means that we can't share information across a filter and a transformation, but filters can be much more powerful if they're allowed to depend on transformations IMO.

}

// Otherwise, return the message and intermediateState for further processing.
return message, nil, intermediateState
}
}
155 changes: 155 additions & 0 deletions pkg/transform/snowplow_enriched_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package transform

import (
"testing"

"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/stretchr/testify/assert"
)

func TestNewSpEnrichedFilterFunction(t *testing.T) {
assert := assert.New(t)

var messageGood = models.Message{
Data: snowplowTsv3,
PartitionKey: "some-key",
}

// Single value cases
aidFilterFuncKeep := NewSpEnrichedFilterFunction("app_id==test-data3")

aidFilteredIn, fail, _ := aidFilterFuncKeep(&messageGood, nil)

assert.Equal(snowplowTsv3, aidFilteredIn.Data)
assert.Nil(fail)

aidFilterFuncDiscard := NewSpEnrichedFilterFunction("app_id==failThis")

aidFilteredOut, fail2, _ := aidFilterFuncDiscard(&messageGood, nil)

assert.Nil(aidFilteredOut)
assert.Nil(fail2)

// int value
urlPrtFilterFuncKeep := NewSpEnrichedFilterFunction("page_urlport==80")

urlPrtFilteredIn, fail, _ := urlPrtFilterFuncKeep(&messageGood, nil)

assert.Equal(snowplowTsv3, urlPrtFilteredIn.Data)
assert.Nil(fail)

// Multiple value cases
aidFilterFuncKeepWithMultiple := NewSpEnrichedFilterFunction("app_id==someotherValue|test-data3")

aidFilteredKeptWithMultiple, fail3, _ := aidFilterFuncKeepWithMultiple(&messageGood, nil)

assert.Equal(snowplowTsv3, aidFilteredKeptWithMultiple.Data)
assert.Nil(fail3)

aidFilterFuncDiscardWithMultiple := NewSpEnrichedFilterFunction("app_id==someotherValue|failThis")

aidFilteredDiscardedWithMultiple, fail3, _ := aidFilterFuncDiscardWithMultiple(&messageGood, nil)

assert.Nil(aidFilteredDiscardedWithMultiple)
assert.Nil(fail3)

// Single value negation cases

aidFilterFuncNegationDiscard := NewSpEnrichedFilterFunction("app_id!=test-data3")

aidFilteredOutNegated, fail4, _ := aidFilterFuncNegationDiscard(&messageGood, nil)

assert.Nil(aidFilteredOutNegated)
assert.Nil(fail4)

aidFilterFuncNegationKeep := NewSpEnrichedFilterFunction("app_id!=failThis")

aidFilteredInNegated, fail5, _ := aidFilterFuncNegationKeep(&messageGood, nil)

assert.Equal(snowplowTsv3, aidFilteredInNegated.Data)
assert.Nil(fail5)

// Multiple value negation cases
aidFilterFuncNegationDiscardMultiple := NewSpEnrichedFilterFunction("app_id!=someotherValue|test-data1|test-data2|test-data3")

aidFilteredDiscardedWithMultiple, fail6, _ := aidFilterFuncNegationDiscardMultiple(&messageGood, nil)

assert.Nil(aidFilteredDiscardedWithMultiple)
assert.Nil(fail6)

aidFilterFuncNegationKeptMultiple := NewSpEnrichedFilterFunction("app_id!=someotherValue|failThis")

aidFilteredKeptWithMultiple, fail7, _ := aidFilterFuncNegationKeptMultiple(&messageGood, nil)

assert.Equal(snowplowTsv3, aidFilteredKeptWithMultiple.Data)
assert.Nil(fail7)
}

func TestSpEnrichedFilterFunction_Slice(t *testing.T) {
assert := assert.New(t)

var expectedFilter1 = []*models.Message{
{
Data: snowplowTsv1,
PartitionKey: "some-key",
},
}

filter1 := NewTransformation(NewSpEnrichedFilterFunction("app_id==test-data1"))
filter1Res := filter1(messages)

assert.Equal(len(expectedFilter1), len(filter1Res.Result))
assert.Equal(1, len(filter1Res.Invalid))

var expectedFilter2 = []*models.Message{
{
Data: snowplowTsv1,
PartitionKey: "some-key",
},
{
Data: snowplowTsv2,
PartitionKey: "some-key1",
},
}

filter2 := NewTransformation(NewSpEnrichedFilterFunction("app_id==test-data1|test-data2"))
filter2Res := filter2(messages)

assert.Equal(len(expectedFilter2), len(filter2Res.Result))
assert.Equal(1, len(filter2Res.Invalid))

var expectedFilter3 = []*models.Message{
{
Data: snowplowTsv3,
PartitionKey: "some-key3",
},
}

filter3 := NewTransformation(NewSpEnrichedFilterFunction("app_id!=test-data1|test-data2"))
filter3Res := filter3(messages)

assert.Equal(len(expectedFilter3), len(filter3Res.Result))
assert.Equal(1, len(filter3Res.Invalid))

/*
for index, value := range enrichJsonRes.Result {
assert.Equal(expectedGood[index].Data, value.Data)
assert.Equal(expectedGood[index].PartitionKey, value.PartitionKey)
assert.NotNil(expectedGood[index].TimeTransformed)

// assertions to ensure we don't accidentally modify the input
assert.NotEqual(messages[index].Data, value.Data)
// assert can't seem to deal with comparing zero value to non-zero value, so assert that it's still zero instead
assert.Equal(time.Time{}, messages[index].TimeTransformed)
}

// Not matching equivalence of whole object because error stacktrace makes it unfeasible. Doing each component part instead.
assert.Equal(1, len(enrichJsonRes.Invalid))
assert.Equal(int64(1), enrichJsonRes.InvalidCount)
assert.Equal("Cannot parse tsv event - wrong number of fields provided: 4", enrichJsonRes.Invalid[0].GetError().Error())
assert.Equal([]byte("not a snowplow event"), enrichJsonRes.Invalid[0].Data)
assert.Equal("some-key4", enrichJsonRes.Invalid[0].PartitionKey)
*/
}

// TODO: add tests checking slice of messages against output.
4 changes: 2 additions & 2 deletions pkg/transform/snowplow_enriched_set_pk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestNewSpEnrichedSetPkFunction(t *testing.T) {

stringAsPk, fail, intermediate := aidSetPkFunc(&messageGood, nil)

assert.Equal("test-data", stringAsPk.PartitionKey)
assert.Equal("test-data3", stringAsPk.PartitionKey)
assert.Equal(spTsv3Parsed, intermediate)
assert.Nil(fail)

Expand Down Expand Up @@ -68,7 +68,7 @@ func TestNewSpEnrichedSetPkFunction(t *testing.T) {

expected := models.Message{
Data: snowplowTsv1,
PartitionKey: "test-data",
PartitionKey: "test-data1",
}
incompatibleIntermediate := "Incompatible intermediate state"

Expand Down
43 changes: 21 additions & 22 deletions pkg/transform/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,6 @@ import (
"github.com/stretchr/testify/assert"
)

var messages = []*models.Message{
{
Data: snowplowTsv1,
PartitionKey: "some-key",
},
{
Data: snowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: snowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}

// To test a function which creates a function, we're creating the function then testing that. Not sure if there's a better way?
func TestNewTransformation_Passthrough(t *testing.T) {
assert := assert.New(t)
Expand Down Expand Up @@ -104,21 +85,39 @@ func TestNewTransformation_EnrichedToJson(t *testing.T) {
assert.Equal("some-key4", enrichJsonRes.Invalid[0].PartitionKey)
}

func Benchmark_Transform_EnrichToJson(b *testing.B) {
tranformEnrichJson := NewTransformation(SpEnrichedToJson)
for i := 0; i < b.N; i++ {
tranformEnrichJson(messages)
}
}

func testfunc(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, interface{}) {
return message, nil, nil
}

func Benchmark_Transform_Passthrough(b *testing.B) {
tranformPassthrough := NewTransformation(testfunc)
for i := 0; i < b.N; i++ {
tranformPassthrough(messages)
}
}

func TestNewTransformation_Multiple(t *testing.T) {
assert := assert.New(t)

var expectedGood = []*models.Message{
{
Data: snowplowJson1,
PartitionKey: "test-data",
PartitionKey: "test-data1",
},
{
Data: snowplowJson2,
PartitionKey: "test-data",
PartitionKey: "test-data2",
},
{
Data: snowplowJson3,
PartitionKey: "test-data",
PartitionKey: "test-data3",
},
}

Expand Down
Loading