diff --git a/README.md b/README.md index d7ff8df..85799e3 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ While a similar outcome can be achieved with a side car model - simply write a R * Health Check interface * Shutdown interface * Ability to switch running log levels of your applications + * Ability to publish data from your app to the Choria Data Adapters that can convert the data to streaming data systems * Standardised configuration * TLS using PuppetCA or manual configuration * Expose Facts to the Choria discovery system @@ -251,6 +252,51 @@ func (a *App) Version() string { } ``` +### Publishing Data + +You can publish data from your application to the [Choria Data Adapter](https://choria.io/docs/adapters/) system which can receive the data in a scalable manner and transform it to Streaming Data system. + +This is useful for publishing IoT environmental data or other similar data to a network, your data will traverse the network maintained by the backplane and will be secured using PKI and TLS. + +When you start the backplane (full details below) pass the `backplane.StartDataPublisher()` option to `backplane.Run()`. + +From your code you can publish any data: + +```go + // initialize the backplane, full example in "Starting the Server" section + opts := []backplane.Option{ + // other options + backplane.StartDataPublisher(), + } + + pb, err := backplane.Run(ctx, wg, a.config.Management, opts...) + if err != nil { + panic(err) + } + + // dat is a []byte with any information you wish to publish + dat := gatherEnvironmentData() + + // publishes data on a NATS topica called acme.iot + pb.DataOutbox() <- &backplane.DataItem{ + Data: dat, + Destination: "acme.iot", + } +``` + +You can configure the Choria Broker to receive this data and publish it to NATS Streaming: + +```ini +plugin.choria.adapters = iot +plugin.choria.adapter.iot.type = nats_stream +plugin.choria.adapter.iot.stream.servers = stan1:4222,stan2:4222 +plugin.choria.adapter.iot.stream.clusterid = prod +plugin.choria.adapter.iot.ingest.topic = acme.iot +plugin.choria.adapter.iot.ingest.protocol = reply +``` + +This ingest the `acme.iot` topic, rewrite the data and publish it to NATS Streaming `prod` cluster on `stan1:4222` and `stan2:4222`. + ### Configure Choria You have to supply some basic configuration to the Choria framework, you need to implement the `ConfigProvider` interface, you're welcome to do this yourself but we provide one you can use. We recommend you use this one so that all backplane managed interface have the same configuration format: @@ -333,6 +379,7 @@ func (a *App) startBackPlane(ctx context.Context, wg *sync.Waitgroup) error { backplane.ManageHealthCheck(a), backplane.ManageStopable(a), backplane.ManageLogLevel(a), + backplane.StartDataPublisher(), } _, err := backplane.Run(ctx, wg, a.config.Management, opts...) @@ -347,7 +394,7 @@ func (a *App) startBackPlane(ctx context.Context, wg *sync.Waitgroup) error { Once you call `startBackPlane()` in your startup cycle it will start a Choria instance with the `discovery`, `choria_util` and `backplane` agents, the `backplane` agent will have all the actions listed in the earlier table, your config will be shown in the `info` action and you can discovery it using any of the facts. -If you only supply some of `ManageInfoSource`, `ManagePausable`, `ManageHealthCheck`, `ManageLogLevel` and `ManageStopable` the features of the agent will be selectively disabled as per the table earlier. +If you only supply some of `ManageInfoSource`, `ManagePausable`, `ManageHealthCheck`, `ManageLogLevel`, `StartDataPublisher` and `ManageStopable` the features of the agent will be selectively disabled as per the table earlier. All backplane managed services will use the `backplane` agent name, to differentiate the `name` will be used to construct a sub collective name so each app is effectively contained. The upcoming CLI will be built around this design. @@ -496,3 +543,5 @@ Performing info... ✓ 2 / 2 ``` 1 is paused and 1 is not, your logs should also confirm. + +Additionally on every `doing work` line data gets published to the NATS network topic `myapp.data` in the Choria format. You can view these using the a [nats client](https://github.com/nats-io/go-nats/tree/master/examples/nats-sub) or had this been a Choria Broker you could adapt these messages to a NATS Stream using the Choria Adapter Framework. \ No newline at end of file diff --git a/backplane/backplane.go b/backplane/backplane.go index c26ec12..623ff15 100644 --- a/backplane/backplane.go +++ b/backplane/backplane.go @@ -35,12 +35,14 @@ type Management struct { factsMu *sync.Mutex log *logrus.Entry agent *mcorpc.Agent + outbox chan *DataItem } // Run creates a new instance of the backplane func Run(ctx context.Context, wg *sync.WaitGroup, conf ConfigProvider, opts ...Option) (m *Management, err error) { m = &Management{ - mu: &sync.Mutex{}, + mu: &sync.Mutex{}, + outbox: make(chan *DataItem, 1), } m.cfg, err = newConfig("backplane", conf, opts...) @@ -69,5 +71,12 @@ func Run(ctx context.Context, wg *sync.WaitGroup, conf ConfigProvider, opts ...O return nil, fmt.Errorf("could not start backplane agents: %s", err) } - return + if m.cfg.publishdata { + err = m.startDataPublisher(ctx, wg) + if err != nil { + return nil, fmt.Errorf("could not start data publisher: %s", err) + } + } + + return m, nil } diff --git a/backplane/config.go b/backplane/config.go index 2980af3..b08ab57 100644 --- a/backplane/config.go +++ b/backplane/config.go @@ -27,6 +27,7 @@ type Config struct { factInterval time.Duration maxStopDelay time.Duration + publishdata bool pausable Pausable infosource InfoSource healthcheckable HealthCheckable @@ -131,6 +132,7 @@ func newConfig(name string, cfg ConfigProvider, opts ...Option) (c *Config, err c.ccfg.LogLevel = c.loglevel c.ccfg.Choria.UseSRVRecords = false c.ccfg.Choria.MiddlewareHosts = c.brokers + c.ccfg.RegistrationCollective = c.appname if c.tls != nil { c.ccfg.DisableTLS = false @@ -220,3 +222,10 @@ func MaxStopDelay(i time.Duration) Option { c.maxStopDelay = i } } + +// StartDataPublisher starts a publisher for data to the Choria adapter system +func StartDataPublisher() Option { + return func(c *Config) { + c.publishdata = true + } +} diff --git a/backplane/registration.go b/backplane/registration.go new file mode 100644 index 0000000..5ef16c2 --- /dev/null +++ b/backplane/registration.go @@ -0,0 +1,46 @@ +package backplane + +import ( + "context" + "sync" + + "github.com/choria-io/go-choria/server/data" +) + +// DataItem contains a single data message +type DataItem struct { + // Data is the raw data to publish + Data []byte + + // Destination let you set custom NATS targets, when this is not set + // the TargetAgent will be used to create a normal agent target + Destination string + + // TargetAgent lets you pick where to send the data as a request + TargetAgent string +} + +// DataOutbox returns the channel to use for publishing data to the network from the backplane +func (m *Management) DataOutbox() chan *DataItem { + return m.outbox +} + +// StartRegistration implements registration.RegistrationDataProvider +func (m *Management) StartRegistration(ctx context.Context, wg *sync.WaitGroup, interval int, output chan *data.RegistrationItem) { + for { + select { + case msg := <-m.outbox: + output <- &data.RegistrationItem{ + Data: &msg.Data, + Destination: msg.Destination, + TargetAgent: msg.TargetAgent, + } + case <-ctx.Done(): + return + } + } +} + +func (m *Management) startDataPublisher(ctx context.Context, wg *sync.WaitGroup) error { + return m.cserver.RegisterRegistrationProvider(ctx, wg, m) +} diff --git a/example/myapp.go b/example/myapp.go index f6271e3..3f71adc 100644 --- a/example/myapp.go +++ b/example/myapp.go @@ -2,10 +2,12 @@ package main import ( "context" + "encoding/json" "io/ioutil" "log" "os" "os/signal" + "strconv" "sync" "syscall" "time" @@ -26,6 +28,7 @@ type Config struct { // a circuit breaker, health check and shutdown backplane type App struct { config *Config + bp *backplane.Management paused bool configured bool } @@ -43,13 +46,30 @@ func (a *App) work(ctx context.Context, wg *sync.WaitGroup) { continue } - log.Println(a.config.Name + ": doing work") + dat, err := a.data() + if err != nil { + log.Printf("Could not generate data: %s", err) + continue + } + + a.bp.DataOutbox() <- &backplane.DataItem{Data: dat, Destination: "myapp.data"} + + log.Println(a.config.Name + ": doing work - published " + string(dat)) case <-ctx.Done(): return } } } +func (a *App) data() ([]byte, error) { + d := make(map[string]string) + d["name"] = a.config.Name + d["timestamp"] = strconv.Itoa(int(time.Now().Unix())) + d["work"] = "sample work" + + return json.Marshal(&d) +} + func main() { if _, err := os.Stat("myapp.yaml"); err != nil { log.Fatal("Cannot find myapp.yaml") @@ -93,9 +113,10 @@ func main() { backplane.ManageHealthCheck(app), backplane.ManageStopable(app), backplane.ManageLogLevel(app), + backplane.StartDataPublisher(), } - _, err = backplane.Run(ctx, wg, app.config.Management, opts...) + app.bp, err = backplane.Run(ctx, wg, app.config.Management, opts...) if err != nil { log.Fatalf("Could not start backplane: %s", err) }