Skip to content

Commit

Permalink
Remove needless data copying
Browse files Browse the repository at this point in the history
Original implementation did something silly and inefficient.
  • Loading branch information
colmsnowplow committed Jun 27, 2024
1 parent b3829af commit d486101
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 21 deletions.
5 changes: 2 additions & 3 deletions pkg/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func NewTransformation(tranformFunctions ...TransformationFunction) Transformati
return models.NewTransformationResult(messages, filteredList, failureList)
}

for _, message := range messages {
msg := *message // dereference to avoid amending input
success := &msg // success must be both input and output to a TransformationFunction, so we make this pointer.
for _, success := range messages {
// success is the input and output, so we name it as such here
var failure *models.Message
var filtered *models.Message
var intermediate interface{}
Expand Down
120 changes: 102 additions & 18 deletions pkg/transform/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package transform

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -24,6 +23,25 @@ import (
func TestNewTransformation_Passthrough(t *testing.T) {
assert := assert.New(t)

Msgs := []*models.Message{
{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
{
Data: SnowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: SnowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}

// expected is equal to messages, specifying separately to avoid false positive if we accidentally mutate input.
var expected = []*models.Message{
{
Expand All @@ -46,14 +64,33 @@ func TestNewTransformation_Passthrough(t *testing.T) {

expectedNoTransformRes := models.NewTransformationResult(expected, make([]*models.Message, 0, 0), make([]*models.Message, 0, 0))
noTransform := NewTransformation(make([]TransformationFunction, 0, 0)...)
noTransformResult := noTransform(Messages)
noTransformResult := noTransform(Msgs)

assert.Equal(expectedNoTransformRes, noTransformResult)
}

func TestNewTransformation_EnrichedToJson(t *testing.T) {
assert := assert.New(t)

Msgs := []*models.Message{
{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
{
Data: SnowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: SnowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}

var expectedGood = []*models.Message{
{
Data: snowplowJSON1,
Expand All @@ -69,18 +106,13 @@ func TestNewTransformation_EnrichedToJson(t *testing.T) {
},
}

tranformEnrichJSON := NewTransformation(SpEnrichedToJSON)
enrichJSONRes := tranformEnrichJSON(Messages)
transformEnrichJSON := NewTransformation(SpEnrichedToJSON)
enrichJSONRes := transformEnrichJSON(Msgs)

for index, value := range enrichJSONRes.Result {
assert.JSONEq(string(expectedGood[index].Data), string(value.Data))
assert.Equal(expectedGood[index].PartitionKey, value.PartitionKey)
assert.NotNil(expectedGood[index].TimeTransformed)

// assertions to ensure we don't accidentally modify the input
assert.NotEqual(Messages[index].Data, value.Data)
// assert can't seem to deal with comparing zero value to non-zero value, so assert that it's still zero instead
assert.Equal(time.Time{}, Messages[index].TimeTransformed)
}

// Not matching equivalence of whole object because error stacktrace makes it unfeasible. Doing each component part instead.
Expand All @@ -95,9 +127,28 @@ func TestNewTransformation_EnrichedToJson(t *testing.T) {
}

func Benchmark_Transform_EnrichToJson(b *testing.B) {
// Because we modify input, copy this first
Msgs := []*models.Message{
{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
{
Data: SnowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: SnowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}
tranformEnrichJSON := NewTransformation(SpEnrichedToJSON)
for i := 0; i < b.N; i++ {
tranformEnrichJSON(Messages)
tranformEnrichJSON(Msgs)
}
}

Expand All @@ -106,15 +157,54 @@ func testfunc(message *models.Message, intermediateState interface{}) (*models.M
}

func Benchmark_Transform_Passthrough(b *testing.B) {
// Because transform funcs modify input, copy this first
Msgs := []*models.Message{
{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
{
Data: SnowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: SnowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}
tranformPassthrough := NewTransformation(testfunc)
for i := 0; i < b.N; i++ {
tranformPassthrough(Messages)
tranformPassthrough(Msgs)
}
}

func TestNewTransformation_Multiple(t *testing.T) {
assert := assert.New(t)

// Because transform funcs modify input, copy this first
Msgs := []*models.Message{
{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
{
Data: SnowplowTsv2,
PartitionKey: "some-key1",
},
{
Data: SnowplowTsv3,
PartitionKey: "some-key2",
},
{
Data: nonSnowplowString,
PartitionKey: "some-key4",
},
}

var expectedGood = []*models.Message{
{
Data: snowplowJSON1,
Expand All @@ -133,19 +223,13 @@ func TestNewTransformation_Multiple(t *testing.T) {
setPkToAppID, _ := NewSpEnrichedSetPkFunction("app_id")
tranformMultiple := NewTransformation(setPkToAppID, SpEnrichedToJSON)

enrichJSONRes := tranformMultiple(Messages)
enrichJSONRes := tranformMultiple(Msgs)

for index, value := range enrichJSONRes.Result {
assert.JSONEq(string(expectedGood[index].Data), string(value.Data))
assert.Equal(expectedGood[index].PartitionKey, value.PartitionKey)
assert.NotNil(expectedGood[index].TimeTransformed)
assert.NotNil(value.TimeTransformed)

// assertions to ensure we don't accidentally modify the input
assert.NotEqual(Messages[index].Data, value.Data)
assert.NotEqual(Messages[index].PartitionKey, value.PartitionKey)
// assert can't seem to deal with comparing zero value to non-zero value, so assert that it's still zero instead
assert.Equal(time.Time{}, Messages[index].TimeTransformed)
}

// Not matching equivalence of whole object because error stacktrace makes it unfeasible. Doing each component part instead.
Expand Down

0 comments on commit d486101

Please sign in to comment.