Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syslog drain app error messages in app log stream #633

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
24 changes: 11 additions & 13 deletions src/cmd/syslog-agent/app/syslog_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,8 @@ func NewSyslogAgent(
cfg Config,
m Metrics,
l *log.Logger,
factory syslog.AppLogEmitterFactory,
) *SyslogAgent {
internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
writerFactory := syslog.NewWriterFactory(
internalTlsConfig,
externalTlsConfig,
syslog.NetworkTimeoutConfig{
Keepalive: 10 * time.Second,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
},
m,
)

ingressTLSConfig, err := loggregator.NewIngressTLSConfig(
cfg.GRPC.CAFile,
cfg.GRPC.CertFile,
Expand All @@ -81,17 +70,25 @@ func NewSyslogAgent(
logClient, err := loggregator.NewIngressClient(
ingressTLSConfig,
loggregator.WithLogger(log.New(os.Stderr, "", log.LstdFlags)),
loggregator.WithAddr(fmt.Sprintf("127.0.0.1:%d", cfg.GRPC.Port)),
)
if err != nil {
l.Panicf("failed to create log client for syslog connector: %q", err)
}

internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
writerFactory := syslog.NewWriterFactory(internalTlsConfig, externalTlsConfig, syslog.NetworkTimeoutConfig{
Keepalive: 10 * time.Second,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}, m)

connector := syslog.NewSyslogConnector(
cfg.DrainSkipCertVerify,
timeoutwaitgroup.New(time.Minute),
writerFactory,
m,
syslog.WithLogClient(logClient, "syslog_agent"),
syslog.WithAppLogEmitter(factory.NewAppLogEmitter(logClient, "syslog_agent")),
)

var cacheClient *cache.CacheClient
Expand All @@ -112,6 +109,7 @@ func NewSyslogAgent(
m,
cfg.WarnOnInvalidDrains,
l,
factory.NewAppLogEmitter(logClient, "syslog_agent"),
)
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
}
Expand Down
9 changes: 8 additions & 1 deletion src/cmd/syslog-agent/app/syslog_agent_mtls_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app_test

import (
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -41,6 +42,8 @@ var _ = Describe("SyslogAgent with mTLS", func() {
agentMetrics *metricsHelpers.SpyMetricsRegistry
agentLogr *log.Logger
agent *app.SyslogAgent

factory syslog.AppLogEmitterFactory
)

BeforeEach(func() {
Expand Down Expand Up @@ -141,6 +144,9 @@ var _ = Describe("SyslogAgent with mTLS", func() {
}
agentMetrics = metricsHelpers.NewMetricsRegistry()
agentLogr = log.New(GinkgoWriter, "", log.LstdFlags)

defaultFactory := syslog.NewDefaultAppLogEmitterFactory()
factory = &defaultFactory
})

JustBeforeEach(func() {
Expand All @@ -154,7 +160,7 @@ var _ = Describe("SyslogAgent with mTLS", func() {
agentCfg.Cache.PollingInterval = 10 * time.Millisecond
}

agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr)
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, factory)
go agent.Run()
})

Expand Down Expand Up @@ -216,6 +222,7 @@ var _ = Describe("SyslogAgent with mTLS", func() {
})

It("will not be able to connect with those drains", func() {
//todo check if pr check runs
ctx, cancel := context.WithCancel(context.Background())
emitLogs(ctx, appIDs, grpcPort, agentCerts)
defer cancel()
Expand Down
18 changes: 16 additions & 2 deletions src/cmd/syslog-agent/app/syslog_agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app_test

import (
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
"context"
"crypto/tls"
"fmt"
Expand Down Expand Up @@ -48,6 +49,8 @@ var _ = Describe("SyslogAgent", func() {
agentMetrics *metricsHelpers.SpyMetricsRegistry
agentLogr *log.Logger
agent *app.SyslogAgent

factory syslog.AppLogEmitterFactory
)

BeforeEach(func() {
Expand Down Expand Up @@ -121,6 +124,9 @@ var _ = Describe("SyslogAgent", func() {
}
agentMetrics = metricsHelpers.NewMetricsRegistry()
agentLogr = log.New(GinkgoWriter, "", log.LstdFlags)

defaultFactory := syslog.NewDefaultAppLogEmitterFactory()
factory = &defaultFactory
})

JustBeforeEach(func() {
Expand All @@ -134,7 +140,7 @@ var _ = Describe("SyslogAgent", func() {
agentCfg.Cache.PollingInterval = 10 * time.Millisecond
}

agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr)
agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, factory)
go agent.Run()
})

Expand Down Expand Up @@ -238,6 +244,14 @@ var _ = Describe("SyslogAgent", func() {
Eventually(agentMetrics.GetDebugMetricsEnabled).Should(BeFalse())
})

It("configures appLogEmitter", func() {
spyFactory := testhelper.SpyAppLogEmitterFactory{}
app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &spyFactory)

Expect(spyFactory.SourceIndex()).Should(Equal("syslog_agent"))
Expect(spyFactory.LogClient()).ShouldNot(BeNil())
})

Context("when debug configuration is enabled", func() {
BeforeEach(func() {
agentCfg.MetricsServer.DebugMetrics = true
Expand Down Expand Up @@ -423,7 +437,7 @@ var _ = Describe("SyslogAgent", func() {
cfgCopy.GRPC.KeyFile = "invalid"

msg := `failed to configure client TLS: "failed to load keypair: open invalid: no such file or directory"`
Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr) }).To(PanicWith(msg))
Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr, factory) }).To(PanicWith(msg))
})
})
})
Expand Down
5 changes: 4 additions & 1 deletion src/cmd/syslog-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
"log"
_ "net/http/pprof" //nolint:gosec
"os"
Expand Down Expand Up @@ -33,5 +34,7 @@ func main() {
),
)

app.NewSyslogAgent(cfg, m, logger).Run()
factory := syslog.NewDefaultAppLogEmitterFactory()

app.NewSyslogAgent(cfg, m, logger, &factory).Run()
}
34 changes: 34 additions & 0 deletions src/internal/testhelper/spy_app_log_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package testhelper

import "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"

type SpyAppLogEmitter struct {
}

func (emitter *SpyAppLogEmitter) EmitLog(appID string, message string) {

}

func NewSpyAppEmitter() SpyAppLogEmitter {
return SpyAppLogEmitter{}
}

type SpyAppLogEmitterFactory struct {
logClient syslog.LogClient
sourceIndex string
}

func (factory *SpyAppLogEmitterFactory) LogClient() syslog.LogClient {
return factory.logClient
}

func (factory *SpyAppLogEmitterFactory) SourceIndex() string {
return factory.sourceIndex
}

func (factory *SpyAppLogEmitterFactory) NewAppLogEmitter(logClient syslog.LogClient, sourceIndex string) syslog.AppLogEmitter {
factory.logClient = logClient
factory.sourceIndex = sourceIndex
emitter := NewSpyAppEmitter()
return &emitter
}
82 changes: 82 additions & 0 deletions src/internal/testhelper/spy_log_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package testhelper

import (
"code.cloudfoundry.org/go-loggregator/v10"
v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"sync"
)

type spyLogClient struct {
mu sync.Mutex
_message []string
_appID []string

// We use maps to ensure that we can query the keys
_sourceType map[string]struct{}
_sourceInstance map[string]struct{}
}

func NewSpyLogClient() *spyLogClient {
return &spyLogClient{
_sourceType: make(map[string]struct{}),
_sourceInstance: make(map[string]struct{}),
}
}

func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) {
s.mu.Lock()
defer s.mu.Unlock()

env := &v2.Envelope{
Tags: make(map[string]string),
}

for _, o := range opts {
o(env)
}

s._message = append(s._message, message)
s._appID = append(s._appID, env.SourceId)
s._sourceType[env.GetTags()["source_type"]] = struct{}{}
s._sourceInstance[env.GetInstanceId()] = struct{}{}
}

func (s *spyLogClient) Message() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._message
}

func (s *spyLogClient) AppID() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._appID
}

func (s *spyLogClient) SourceType() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceType {
m[k] = struct{}{}
}

return m
}

func (s *spyLogClient) SourceInstance() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceInstance {
m[k] = struct{}{}
}

return m
}
59 changes: 59 additions & 0 deletions src/pkg/egress/syslog/app_log_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package syslog

import (
"code.cloudfoundry.org/go-loggregator/v10"
)

// LogClient is used to emit logs.
type LogClient interface {
EmitLog(message string, opts ...loggregator.EmitLogOption)
}

// AppLogEmitter abstracts the sending of a log to the application log stream.
type AppLogEmitter interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a need for an interface here? Who else implements the EmitLog method? Could this interface be deleted and the method attached to the DefaultAppLogEmitter?

EmitLog(appID string, message string)
}

// DefaultAppLogEmitter is an implementation of AppLogEmitter which sends logs to an instance of a LogClient
type DefaultAppLogEmitter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to something else please? The word default caught my eye...

Usually in the Loggregator's code we have a single struct, from which the default implementation is create and then we apply options... We should be consistent here as well with the naming and the implementation.

If the AppLogEmitter interface above is removed we have a good name for the struct.

logClient LogClient
sourceIndex string
}

// EmitLog writes a message in the application log stream using a LogClient.
func (appLogEmitter *DefaultAppLogEmitter) EmitLog(appID string, message string) {
if appLogEmitter.logClient == nil || appID == "" {
return
}

option := loggregator.WithAppInfo(appID, "LGR", "")
appLogEmitter.logClient.EmitLog(message, option)

option = loggregator.WithAppInfo(
appID,
"SYS",
appLogEmitter.sourceIndex,
)
appLogEmitter.logClient.EmitLog(message, option)
}

// AppLogEmitterFactory is used to create new instances of AppLogEmitter
type AppLogEmitterFactory interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this interface and the DefaultAppLogEmitterFactory and use "pure factory method" NewAppLogEmitter as it is used in the other places across the code base. Example: NewSyslogAgent

If you've used interface for better testing, you might also use reflection in the test and check if the created instance has its fields properly configured.

This will also simplify the other code which calls this method and the need of passing another factory around.

NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter
}

// DefaultAppLogEmitterFactory implementation of AppLogEmitterFactory to produce DefaultAppLogEmitter.
type DefaultAppLogEmitterFactory struct {
}

// NewAppLogEmitter creates a new DefaultAppLogEmitter.
func (factory *DefaultAppLogEmitterFactory) NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter {
return &DefaultAppLogEmitter{
logClient: logClient,
sourceIndex: sourceIndex,
}
}

func NewDefaultAppLogEmitterFactory() DefaultAppLogEmitterFactory {
return DefaultAppLogEmitterFactory{}
}
Loading