Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Napa #1601

Open
wants to merge 10 commits into
base: napa
Choose a base branch
from
Prev Previous commit
Next Next commit
feat(interfaces/service.go app/service.go): add TriggerMqttClient() (…
…*pahoMqtt.Client, error)
  • Loading branch information
ADAM\Adam.King committed Aug 17, 2024
commit f3184e3a93a5cb4e8557ce8489b47d5f3abd642d
66 changes: 45 additions & 21 deletions internal/app/service.go
Original file line number Diff line number Diff line change
@@ -202,13 +202,13 @@ func (svc *Service) Stop() {
// Run initializes and starts the trigger as specified in the
// configuration. It will also configure the webserver and start listening on
// the specified port.
func (svc *Service) Run() (*pahoMqtt.Client, error) {
func (svc *Service) Run() error {

config := container.ConfigurationFrom(svc.dic.Get)

err := initializeStoreClient(config, svc)
if err != nil {
return nil, err
return err
}

runCtx, stop := context.WithCancel(context.Background())
@@ -220,24 +220,24 @@ func (svc *Service) Run() (*pahoMqtt.Client, error) {

svc.webserver.StartWebServer(httpErrors)

// determine input type and create trigger for it
t := svc.setupTrigger(svc.config)
if t == nil {
return nil, errors.New("failed to create Trigger")
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
if err != nil {
svc.lc.Error(err.Error())
return nil, errors.New("failed to initialize Trigger")
}

x := mqttTrigger.MqttClient
//x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}")

// deferred is a function that needs to be called when services exits.
svc.addDeferred(deferred)
//// determine input type and create trigger for it
//t := svc.setupTrigger(svc.config)
//if t == nil {
// return nil, errors.New("failed to create Trigger")
//}
//
//// Initialize the trigger (i.e. start a web server, or connect to message bus)
//mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
//if err != nil {
// svc.lc.Error(err.Error())
// return nil, errors.New("failed to initialize Trigger")
//}
//
//x := mqttTrigger.MqttClient
////x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}")
//
//// deferred is a function that needs to be called when services exits.
//svc.addDeferred(deferred)

if svc.config.Writable.StoreAndForward.Enabled {
svc.startStoreForward()
@@ -277,7 +277,7 @@ func (svc *Service) Run() (*pahoMqtt.Client, error) {
deferredFunc()
}

return &x, err
return err
}

// LoadConfigurableFunctionPipelines return the configured function pipelines (default and per topic) from configuration.
@@ -640,6 +640,30 @@ func (svc *Service) RegistryClient() registry.Client {
return bootstrapContainer.RegistryFrom(svc.dic.Get)
}

// TriggerMqttClient returns the pahoMqtt.Client.
func (svc *Service) TriggerMqttClient() (*pahoMqtt.Client, error) {
// determine input type and create trigger for it
t := svc.setupTrigger(svc.config)
if t == nil {
return nil, errors.New("failed to create Trigger")
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
mqttTrigger, deferred, err := t.Initialize(svc.ctx.appWg, svc.ctx.appCtx, svc.backgroundPublishChannel)
if err != nil {
svc.lc.Error(err.Error())
return nil, errors.New("failed to initialize Trigger")
}

x := mqttTrigger.MqttClient
//x.Publish("/sys/xpsYHExTKPFaQMS7/0005002403260001/s/event/rawReport", 0, false, "{\"good\":\"good\"}")

// deferred is a function that needs to be called when services exits.
svc.addDeferred(deferred)

return &x, nil
}

// EventClient returns the Event client, which may be nil, from the dependency injection container
func (svc *Service) EventClient() clientInterfaces.EventClient {
return bootstrapContainer.EventClientFrom(svc.dic.Get)
4 changes: 3 additions & 1 deletion pkg/interfaces/service.go
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ type ApplicationService interface {
// the service is stopped or Stop() is called.
// An error is returned if the trigger can not be created or initialized or if the internal webserver
// encounters an error.
Run() (*pahoMqtt.Client, error)
Run() error
// Stop stops the configured trigger so that the functions pipeline no longer executes.
// An error is returned
Stop()
@@ -177,6 +177,8 @@ type ApplicationService interface {
// RegistryClient returns the Registry client. Note the registry must been enable, otherwise this will return nil.
// Useful if service needs to add additional health checks or needs to get endpoint of another registered service
RegistryClient() registry.Client
//TriggerMqttClient returns the pahoMqtt.Client.
TriggerMqttClient() (*pahoMqtt.Client, error)
// MetricsManager returns the Metrics Manager used to register counter, gauge, gaugeFloat64 or timer metric types from
// github.com/rcrowley/go-metrics
MetricsManager() bootstrapInterfaces.MetricsManager