diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index 65b4c198..04699b72 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -46,13 +46,12 @@ const ( appCopyright = "(c) 2020-present Snowplow Analytics Ltd. All rights reserved." ) -// RunCli runs the app +// RunCli allows running application from cli func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) { - cfg, sentryEnabled, err := cmd.Init() + config, sentryEnabled, err := cmd.Init() if err != nil { exitWithError(err, sentryEnabled) } - app := cli.NewApp() app.Name = appName app.Usage = appUsage @@ -85,100 +84,107 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation }() } - s, err := sourceconfig.GetSource(cfg, supportedSources) - if err != nil { - return err - } + return RunApp(config, supportedSources, supportedTransformations) + } - tr, err := transformconfig.GetTransformations(cfg, supportedTransformations) + app.ExitErrHandler = func(context *cli.Context, err error) { if err != nil { - return err + exitWithError(err, sentryEnabled) } + } - t, err := cfg.GetTarget() - if err != nil { - return err - } - t.Open() + app.Run(os.Args) +} - ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion) - if err != nil { - return err - } - ft.Open() +// RunApp runs application (without cli stuff) +func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) error { + s, err := sourceconfig.GetSource(cfg, supportedSources) + if err != nil { + return err + } - tags, err := cfg.GetTags() - if err != nil { - return err - } - o, err := cfg.GetObserver(tags) - if err != nil { - return err - } - o.Start() + tr, err := transformconfig.GetTransformations(cfg, supportedTransformations) + if err != nil { + return err + } - stopTelemetry := telemetry.InitTelemetryWithCollector(cfg) + t, err := cfg.GetTarget() + if err != nil { + return err + } + t.Open() - // Handle SIGTERM - sig := make(chan os.Signal) - signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill) - go func() { - <-sig - log.Warn("SIGTERM called, cleaning up and closing application ...") + ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion) + if err != nil { + return err + } + ft.Open() - stop := make(chan struct{}, 1) - go func() { - s.Stop() - stop <- struct{}{} - }() + tags, err := cfg.GetTags() + if err != nil { + return err + } + o, err := cfg.GetObserver(tags) + if err != nil { + return err + } + o.Start() - select { - case <-stop: - log.Debug("source.Stop() finished successfully!") + stopTelemetry := telemetry.InitTelemetryWithCollector(cfg) - stopTelemetry() - if err != nil { - log.Debugf(`error deleting tmp directory: %v`, err) - } - case <-time.After(5 * time.Second): - log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...") + // Handle SIGTERM + sig := make(chan os.Signal) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill) + go func() { + <-sig + log.Warn("SIGTERM called, cleaning up and closing application ...") - t.Close() - ft.Close() - o.Stop() - stopTelemetry() + stop := make(chan struct{}, 1) + go func() { + s.Stop() + stop <- struct{}{} + }() - if err != nil { - log.Debugf(`error deleting tmp directory: %v`, err) - } + select { + case <-stop: + log.Debug("source.Stop() finished successfully!") - os.Exit(1) + stopTelemetry() + if err != nil { + log.Debugf(`error deleting tmp directory: %v`, err) } - }() + case <-time.After(5 * time.Second): + log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...") - // Callback functions for the source to leverage when writing data - sf := sourceiface.SourceFunctions{ - WriteToTarget: sourceWriteFunc(t, ft, tr, o), - } + t.Close() + ft.Close() + o.Stop() + stopTelemetry() - // Read is a long running process and will only return when the source - // is exhausted or if an error occurs - err = s.Read(&sf) - if err != nil { - return err + if err != nil { + log.Debugf(`error deleting tmp directory: %v`, err) + } + + os.Exit(1) } + }() - t.Close() - ft.Close() - o.Stop() - return nil + // Callback functions for the source to leverage when writing data + sf := sourceiface.SourceFunctions{ + WriteToTarget: sourceWriteFunc(t, ft, tr, o), } - err1 := app.Run(os.Args) - if err1 != nil { - exitWithError(err1, sentryEnabled) + // Read is a long running process and will only return when the source + // is exhausted or if an error occurs + err = s.Read(&sf) + if err != nil { + return err } + t.Close() + ft.Close() + o.Stop() + return nil } // sourceWriteFunc builds the function which wraps the different objects together to handle: diff --git a/pkg/source/inmemory/in_memory_source.go b/pkg/source/inmemory/in_memory_source.go new file mode 100644 index 00000000..55ec9aaf --- /dev/null +++ b/pkg/source/inmemory/in_memory_source.go @@ -0,0 +1,117 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package inmemory + +import ( + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/models" + "github.com/snowplow/snowbridge/pkg/source/sourceiface" + "github.com/twinj/uuid" +) + +// ConfigPair is passed to configuration to determine when to build in memory source. +func ConfigPair(messages chan []string) config.ConfigurationPair { + return config.ConfigurationPair{ + Name: "inMemory", + Handle: adapterGenerator(configfunction(messages)), + } +} + +type configuration struct{} + +type inMemorySource struct { + messages chan []string + log *log.Entry + exitSignal chan struct{} +} + +func configfunction(messages chan []string) func(c *configuration) (sourceiface.Source, error) { + return func(c *configuration) (sourceiface.Source, error) { + return newInMemorySource(messages) + } +} + +type adapter func(i interface{}) (interface{}, error) + +func (f adapter) Create(i interface{}) (interface{}, error) { + return f(i) +} + +func (f adapter) ProvideDefault() (interface{}, error) { + cfg := &configuration{} + + return cfg, nil +} + +func adapterGenerator(f func(c *configuration) (sourceiface.Source, error)) adapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*configuration) + if !ok { + return nil, errors.New("invalid input") + } + + return f(cfg) + } +} + +func newInMemorySource(messages chan []string) (*inMemorySource, error) { + return &inMemorySource{ + log: log.WithFields(log.Fields{"source": "in_memory"}), + messages: messages, + exitSignal: make(chan struct{}), + }, nil +} + +func (ss *inMemorySource) Read(sf *sourceiface.SourceFunctions) error { + ss.log.Infof("Reading messages from in memory buffer") + +processing: + for { + select { + case <-ss.exitSignal: + break processing + case input := <-ss.messages: + timeNow := time.Now().UTC() + var messages []*models.Message + for _, single := range input { + message := models.Message{ + Data: []byte(single), + PartitionKey: uuid.NewV4().String(), + TimeCreated: timeNow, + TimePulled: timeNow, + } + messages = append(messages, &message) + } + + err := sf.WriteToTarget(messages) + if err != nil { + ss.log.WithFields(log.Fields{"error": err}).Error(err) + } + } + } + + ss.log.Infof("Done with processing") + return nil +} + +func (ss *inMemorySource) Stop() { + ss.log.Warn("Stopping in memory source") + ss.exitSignal <- struct{}{} +} + +func (ss *inMemorySource) GetID() string { + return "inMemory" +} diff --git a/pkg/source/inmemory/in_memory_source_test.go b/pkg/source/inmemory/in_memory_source_test.go new file mode 100644 index 00000000..c8c0a98d --- /dev/null +++ b/pkg/source/inmemory/in_memory_source_test.go @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package inmemory + +import ( + "sync" + "testing" + + "github.com/snowplow/snowbridge/pkg/models" + "github.com/snowplow/snowbridge/pkg/source/sourceiface" + "github.com/stretchr/testify/assert" +) + +func TestInMemorySource_ReadSuccess(t *testing.T) { + assert := assert.New(t) + + wg := sync.WaitGroup{} + inputChannel := make(chan []string) + source, err := newInMemorySource(inputChannel) + assert.NotNil(source) + assert.Nil(err) + assert.Equal("inMemory", source.GetID()) + defer source.Stop() + + var out []string + + writeFunc := func(messages []*models.Message) error { + for _, msg := range messages { + out = append(out, string(msg.Data)) + wg.Done() + } + return nil + } + + sf := sourceiface.SourceFunctions{ + WriteToTarget: writeFunc, + } + + go func() { + err1 := source.Read(&sf) + assert.Nil(err1) + }() + + wg.Add(6) + inputChannel <- []string{"m1", "m2"} + inputChannel <- []string{"m3", "m4", "m5"} + inputChannel <- []string{"m6"} + wg.Wait() + + assert.Equal([]string{"m1", "m2", "m3", "m4", "m5", "m6"}, out) +}