Skip to content

Commit

Permalink
Add in memory source
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 26, 2024
1 parent a16dfc9 commit 80eacaa
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 74 deletions.
154 changes: 80 additions & 74 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ const (
appCopyright = "(c) 2020-present Snowplow Analytics Ltd. All rights reserved."
)

// RunCli runs the app
// RunCli allows running application from cli
func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) {
cfg, sentryEnabled, err := cmd.Init()
config, sentryEnabled, err := cmd.Init()
if err != nil {
exitWithError(err, sentryEnabled)
}

app := cli.NewApp()
app.Name = appName
app.Usage = appUsage
Expand Down Expand Up @@ -85,100 +84,107 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
}()
}

s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}
return RunApp(config, supportedSources, supportedTransformations)
}

tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
app.ExitErrHandler = func(context *cli.Context, err error) {
if err != nil {
return err
exitWithError(err, sentryEnabled)
}
}

t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()
app.Run(os.Args)
}

ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()
// RunApp runs application (without cli stuff)
func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) error {
s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}

tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()
tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
if err != nil {
return err
}

stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)
t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()

// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")
ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()

stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()
tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()

select {
case <-stop:
log.Debug("source.Stop() finished successfully!")
stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")
// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")

t.Close()
ft.Close()
o.Stop()
stopTelemetry()
stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()

if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
select {
case <-stop:
log.Debug("source.Stop() finished successfully!")

os.Exit(1)
stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
}()
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}
t.Close()
ft.Close()
o.Stop()
stopTelemetry()

// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}

os.Exit(1)
}
}()

t.Close()
ft.Close()
o.Stop()
return nil
// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}

err1 := app.Run(os.Args)
if err1 != nil {
exitWithError(err1, sentryEnabled)
// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
}

t.Close()
ft.Close()
o.Stop()
return nil
}

// sourceWriteFunc builds the function which wraps the different objects together to handle:
Expand Down
117 changes: 117 additions & 0 deletions pkg/source/inmemory/in_memory_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package inmemory

import (
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/source/sourceiface"
"github.com/twinj/uuid"
)

// ConfigPair is passed to configuration to determine when to build in memory source.
func ConfigPair(messages chan []string) config.ConfigurationPair {
return config.ConfigurationPair{
Name: "inMemory",
Handle: adapterGenerator(configfunction(messages)),
}
}

type configuration struct{}

type inMemorySource struct {
messages chan []string
log *log.Entry
exitSignal chan struct{}
}

func configfunction(messages chan []string) func(c *configuration) (sourceiface.Source, error) {
return func(c *configuration) (sourceiface.Source, error) {
return newInMemorySource(messages)
}
}

type adapter func(i interface{}) (interface{}, error)

func (f adapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

func (f adapter) ProvideDefault() (interface{}, error) {
cfg := &configuration{}

return cfg, nil
}

func adapterGenerator(f func(c *configuration) (sourceiface.Source, error)) adapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*configuration)
if !ok {
return nil, errors.New("invalid input")
}

return f(cfg)
}
}

func newInMemorySource(messages chan []string) (*inMemorySource, error) {
return &inMemorySource{
log: log.WithFields(log.Fields{"source": "in_memory"}),
messages: messages,
exitSignal: make(chan struct{}),
}, nil
}

func (ss *inMemorySource) Read(sf *sourceiface.SourceFunctions) error {
ss.log.Infof("Reading messages from in memory buffer")

processing:
for {
select {
case <-ss.exitSignal:
break processing
case input := <-ss.messages:
timeNow := time.Now().UTC()
var messages []*models.Message
for _, single := range input {
message := models.Message{
Data: []byte(single),
PartitionKey: uuid.NewV4().String(),
TimeCreated: timeNow,
TimePulled: timeNow,
}
messages = append(messages, &message)
}

err := sf.WriteToTarget(messages)
if err != nil {
ss.log.WithFields(log.Fields{"error": err}).Error(err)
}
}
}

ss.log.Infof("Done with processing")
return nil
}

func (ss *inMemorySource) Stop() {
ss.log.Warn("Stopping in memory source")
ss.exitSignal <- struct{}{}
}

func (ss *inMemorySource) GetID() string {
return "inMemory"
}
60 changes: 60 additions & 0 deletions pkg/source/inmemory/in_memory_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package inmemory

import (
"sync"
"testing"

"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/source/sourceiface"
"github.com/stretchr/testify/assert"
)

func TestInMemorySource_ReadSuccess(t *testing.T) {
assert := assert.New(t)

wg := sync.WaitGroup{}
inputChannel := make(chan []string)
source, err := newInMemorySource(inputChannel)
assert.NotNil(source)
assert.Nil(err)
assert.Equal("inMemory", source.GetID())
defer source.Stop()

var out []string

writeFunc := func(messages []*models.Message) error {
for _, msg := range messages {
out = append(out, string(msg.Data))
wg.Done()
}
return nil
}

sf := sourceiface.SourceFunctions{
WriteToTarget: writeFunc,
}

go func() {
err1 := source.Read(&sf)
assert.Nil(err1)
}()

wg.Add(6)
inputChannel <- []string{"m1", "m2"}
inputChannel <- []string{"m3", "m4", "m5"}
inputChannel <- []string{"m6"}
wg.Wait()

assert.Equal([]string{"m1", "m2", "m3", "m4", "m5", "m6"}, out)
}

0 comments on commit 80eacaa

Please sign in to comment.