Skip to content

Commit

Permalink
Merge pull request #65 from ripienaar/64
Browse files Browse the repository at this point in the history
(#64) support publishing data to the choria adapter framework
  • Loading branch information
ripienaar authored Nov 18, 2018
2 parents 527b7f7 + 29b446a commit 61c3861
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 5 deletions.
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ While a similar outcome can be achieved with a side car model - simply write a R
* Health Check interface
* Shutdown interface
* Ability to switch running log levels of your applications
* Ability to publish data from your app to the Choria Data Adapters that can convert the data to streaming data systems
* Standardised configuration
* TLS using PuppetCA or manual configuration
* Expose Facts to the Choria discovery system
Expand Down Expand Up @@ -251,6 +252,51 @@ func (a *App) Version() string {
}
```

### Publishing Data

You can publish data from your application to the [Choria Data Adapter](https://choria.io/docs/adapters/) system which can receive the data in a scalable manner and transform it to Streaming Data system.

This is useful for publishing IoT environmental data or other similar data to a network, your data will traverse the network maintained by the backplane and will be secured using PKI and TLS.

When you start the backplane (full details below) pass the `backplane.StartDataPublisher()` option to `backplane.Run()`.

From your code you can publish any data:

```go
// initialize the backplane, full example in "Starting the Server" section
opts := []backplane.Option{
// other options
backplane.StartDataPublisher(),
}

pb, err := backplane.Run(ctx, wg, a.config.Management, opts...)
if err != nil {
panic(err)
}

// dat is a []byte with any information you wish to publish
dat := gatherEnvironmentData()

// publishes data on a NATS topica called acme.iot
pb.DataOutbox() <- &backplane.DataItem{
Data: dat,
Destination: "acme.iot",
}
```

You can configure the Choria Broker to receive this data and publish it to NATS Streaming:

```ini
plugin.choria.adapters = iot
plugin.choria.adapter.iot.type = nats_stream
plugin.choria.adapter.iot.stream.servers = stan1:4222,stan2:4222
plugin.choria.adapter.iot.stream.clusterid = prod
plugin.choria.adapter.iot.ingest.topic = acme.iot
plugin.choria.adapter.iot.ingest.protocol = reply
```

This ingest the `acme.iot` topic, rewrite the data and publish it to NATS Streaming `prod` cluster on `stan1:4222` and `stan2:4222`.

### Configure Choria

You have to supply some basic configuration to the Choria framework, you need to implement the `ConfigProvider` interface, you're welcome to do this yourself but we provide one you can use. We recommend you use this one so that all backplane managed interface have the same configuration format:
Expand Down Expand Up @@ -333,6 +379,7 @@ func (a *App) startBackPlane(ctx context.Context, wg *sync.Waitgroup) error {
backplane.ManageHealthCheck(a),
backplane.ManageStopable(a),
backplane.ManageLogLevel(a),
backplane.StartDataPublisher(),
}
_, err := backplane.Run(ctx, wg, a.config.Management, opts...)
Expand All @@ -347,7 +394,7 @@ func (a *App) startBackPlane(ctx context.Context, wg *sync.Waitgroup) error {

Once you call `startBackPlane()` in your startup cycle it will start a Choria instance with the `discovery`, `choria_util` and `backplane` agents, the `backplane` agent will have all the actions listed in the earlier table, your config will be shown in the `info` action and you can discovery it using any of the facts.

If you only supply some of `ManageInfoSource`, `ManagePausable`, `ManageHealthCheck`, `ManageLogLevel` and `ManageStopable` the features of the agent will be selectively disabled as per the table earlier.
If you only supply some of `ManageInfoSource`, `ManagePausable`, `ManageHealthCheck`, `ManageLogLevel`, `StartDataPublisher` and `ManageStopable` the features of the agent will be selectively disabled as per the table earlier.

All backplane managed services will use the `backplane` agent name, to differentiate the `name` will be used to construct a sub collective name so each app is effectively contained. The upcoming CLI will be built around this design.

Expand Down Expand Up @@ -496,3 +543,5 @@ Performing info... ✓ 2 / 2
```
1 is paused and 1 is not, your logs should also confirm.
Additionally on every `doing work` line data gets published to the NATS network topic `myapp.data` in the Choria format. You can view these using the a [nats client](https://github.com/nats-io/go-nats/tree/master/examples/nats-sub) or had this been a Choria Broker you could adapt these messages to a NATS Stream using the Choria Adapter Framework.
13 changes: 11 additions & 2 deletions backplane/backplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ type Management struct {
factsMu *sync.Mutex
log *logrus.Entry
agent *mcorpc.Agent
outbox chan *DataItem
}

// Run creates a new instance of the backplane
func Run(ctx context.Context, wg *sync.WaitGroup, conf ConfigProvider, opts ...Option) (m *Management, err error) {
m = &Management{
mu: &sync.Mutex{},
mu: &sync.Mutex{},
outbox: make(chan *DataItem, 1),
}

m.cfg, err = newConfig("backplane", conf, opts...)
Expand Down Expand Up @@ -69,5 +71,12 @@ func Run(ctx context.Context, wg *sync.WaitGroup, conf ConfigProvider, opts ...O
return nil, fmt.Errorf("could not start backplane agents: %s", err)
}

return
if m.cfg.publishdata {
err = m.startDataPublisher(ctx, wg)
if err != nil {
return nil, fmt.Errorf("could not start data publisher: %s", err)
}
}

return m, nil
}
9 changes: 9 additions & 0 deletions backplane/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
factInterval time.Duration
maxStopDelay time.Duration

publishdata bool
pausable Pausable
infosource InfoSource
healthcheckable HealthCheckable
Expand Down Expand Up @@ -131,6 +132,7 @@ func newConfig(name string, cfg ConfigProvider, opts ...Option) (c *Config, err
c.ccfg.LogLevel = c.loglevel
c.ccfg.Choria.UseSRVRecords = false
c.ccfg.Choria.MiddlewareHosts = c.brokers
c.ccfg.RegistrationCollective = c.appname

if c.tls != nil {
c.ccfg.DisableTLS = false
Expand Down Expand Up @@ -220,3 +222,10 @@ func MaxStopDelay(i time.Duration) Option {
c.maxStopDelay = i
}
}

// StartDataPublisher starts a publisher for data to the Choria adapter system
func StartDataPublisher() Option {
return func(c *Config) {
c.publishdata = true
}
}
46 changes: 46 additions & 0 deletions backplane/registration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package backplane

import (
"context"
"sync"

"github.com/choria-io/go-choria/server/data"
)

// DataItem contains a single data message
type DataItem struct {
// Data is the raw data to publish
Data []byte

// Destination let you set custom NATS targets, when this is not set
// the TargetAgent will be used to create a normal agent target
Destination string

// TargetAgent lets you pick where to send the data as a request
TargetAgent string
}

// DataOutbox returns the channel to use for publishing data to the network from the backplane
func (m *Management) DataOutbox() chan *DataItem {
return m.outbox
}

// StartRegistration implements registration.RegistrationDataProvider
func (m *Management) StartRegistration(ctx context.Context, wg *sync.WaitGroup, interval int, output chan *data.RegistrationItem) {
for {
select {
case msg := <-m.outbox:
output <- &data.RegistrationItem{
Data: &msg.Data,
Destination: msg.Destination,
TargetAgent: msg.TargetAgent,
}
case <-ctx.Done():
return
}
}
}

func (m *Management) startDataPublisher(ctx context.Context, wg *sync.WaitGroup) error {
return m.cserver.RegisterRegistrationProvider(ctx, wg, m)
}
25 changes: 23 additions & 2 deletions example/myapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"context"
"encoding/json"
"io/ioutil"
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
Expand All @@ -26,6 +28,7 @@ type Config struct {
// a circuit breaker, health check and shutdown backplane
type App struct {
config *Config
bp *backplane.Management
paused bool
configured bool
}
Expand All @@ -43,13 +46,30 @@ func (a *App) work(ctx context.Context, wg *sync.WaitGroup) {
continue
}

log.Println(a.config.Name + ": doing work")
dat, err := a.data()
if err != nil {
log.Printf("Could not generate data: %s", err)
continue
}

a.bp.DataOutbox() <- &backplane.DataItem{Data: dat, Destination: "myapp.data"}

log.Println(a.config.Name + ": doing work - published " + string(dat))
case <-ctx.Done():
return
}
}
}

func (a *App) data() ([]byte, error) {
d := make(map[string]string)
d["name"] = a.config.Name
d["timestamp"] = strconv.Itoa(int(time.Now().Unix()))
d["work"] = "sample work"

return json.Marshal(&d)
}

func main() {
if _, err := os.Stat("myapp.yaml"); err != nil {
log.Fatal("Cannot find myapp.yaml")
Expand Down Expand Up @@ -93,9 +113,10 @@ func main() {
backplane.ManageHealthCheck(app),
backplane.ManageStopable(app),
backplane.ManageLogLevel(app),
backplane.StartDataPublisher(),
}

_, err = backplane.Run(ctx, wg, app.config.Management, opts...)
app.bp, err = backplane.Run(ctx, wg, app.config.Management, opts...)
if err != nil {
log.Fatalf("Could not start backplane: %s", err)
}
Expand Down

0 comments on commit 61c3861

Please sign in to comment.