diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index 277bbdeb1..f3d708423 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -56,19 +56,8 @@ func NewSyslogAgent( cfg Config, m Metrics, l *log.Logger, + factory syslog.AppLogEmitterFactory, ) *SyslogAgent { - internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg) - writerFactory := syslog.NewWriterFactory( - internalTlsConfig, - externalTlsConfig, - syslog.NetworkTimeoutConfig{ - Keepalive: 10 * time.Second, - DialTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - }, - m, - ) - ingressTLSConfig, err := loggregator.NewIngressTLSConfig( cfg.GRPC.CAFile, cfg.GRPC.CertFile, @@ -81,17 +70,25 @@ func NewSyslogAgent( logClient, err := loggregator.NewIngressClient( ingressTLSConfig, loggregator.WithLogger(log.New(os.Stderr, "", log.LstdFlags)), + loggregator.WithAddr(fmt.Sprintf("127.0.0.1:%d", cfg.GRPC.Port)), ) if err != nil { l.Panicf("failed to create log client for syslog connector: %q", err) } + internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg) + writerFactory := syslog.NewWriterFactory(internalTlsConfig, externalTlsConfig, syslog.NetworkTimeoutConfig{ + Keepalive: 10 * time.Second, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }, m) + connector := syslog.NewSyslogConnector( cfg.DrainSkipCertVerify, timeoutwaitgroup.New(time.Minute), writerFactory, m, - syslog.WithLogClient(logClient, "syslog_agent"), + syslog.WithAppLogEmitter(factory.NewAppLogEmitter(logClient, "syslog_agent")), ) var cacheClient *cache.CacheClient @@ -112,6 +109,7 @@ func NewSyslogAgent( m, cfg.WarnOnInvalidDrains, l, + factory.NewAppLogEmitter(logClient, "syslog_agent"), ) cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata) } diff --git a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go index 4b98a5b78..0fe80147f 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go @@ -1,6 +1,7 @@ package app_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" "context" "encoding/json" "fmt" @@ -41,6 +42,8 @@ var _ = Describe("SyslogAgent with mTLS", func() { agentMetrics *metricsHelpers.SpyMetricsRegistry agentLogr *log.Logger agent *app.SyslogAgent + + factory syslog.AppLogEmitterFactory ) BeforeEach(func() { @@ -141,6 +144,9 @@ var _ = Describe("SyslogAgent with mTLS", func() { } agentMetrics = metricsHelpers.NewMetricsRegistry() agentLogr = log.New(GinkgoWriter, "", log.LstdFlags) + + defaultFactory := syslog.NewDefaultAppLogEmitterFactory() + factory = &defaultFactory }) JustBeforeEach(func() { @@ -154,7 +160,7 @@ var _ = Describe("SyslogAgent with mTLS", func() { agentCfg.Cache.PollingInterval = 10 * time.Millisecond } - agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr) + agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, factory) go agent.Run() }) @@ -216,6 +222,7 @@ var _ = Describe("SyslogAgent with mTLS", func() { }) It("will not be able to connect with those drains", func() { + //todo check if pr check runs ctx, cancel := context.WithCancel(context.Background()) emitLogs(ctx, appIDs, grpcPort, agentCerts) defer cancel() diff --git a/src/cmd/syslog-agent/app/syslog_agent_test.go b/src/cmd/syslog-agent/app/syslog_agent_test.go index 46a6d2c24..83702e700 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_test.go @@ -1,6 +1,7 @@ package app_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" "context" "crypto/tls" "fmt" @@ -48,6 +49,8 @@ var _ = Describe("SyslogAgent", func() { agentMetrics *metricsHelpers.SpyMetricsRegistry agentLogr *log.Logger agent *app.SyslogAgent + + factory syslog.AppLogEmitterFactory ) BeforeEach(func() { @@ -121,6 +124,9 @@ var _ = Describe("SyslogAgent", func() { } agentMetrics = metricsHelpers.NewMetricsRegistry() agentLogr = log.New(GinkgoWriter, "", log.LstdFlags) + + defaultFactory := syslog.NewDefaultAppLogEmitterFactory() + factory = &defaultFactory }) JustBeforeEach(func() { @@ -134,7 +140,7 @@ var _ = Describe("SyslogAgent", func() { agentCfg.Cache.PollingInterval = 10 * time.Millisecond } - agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr) + agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, factory) go agent.Run() }) @@ -238,6 +244,14 @@ var _ = Describe("SyslogAgent", func() { Eventually(agentMetrics.GetDebugMetricsEnabled).Should(BeFalse()) }) + It("configures appLogEmitter", func() { + spyFactory := testhelper.SpyAppLogEmitterFactory{} + app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &spyFactory) + + Expect(spyFactory.SourceIndex()).Should(Equal("syslog_agent")) + Expect(spyFactory.LogClient()).ShouldNot(BeNil()) + }) + Context("when debug configuration is enabled", func() { BeforeEach(func() { agentCfg.MetricsServer.DebugMetrics = true @@ -423,7 +437,7 @@ var _ = Describe("SyslogAgent", func() { cfgCopy.GRPC.KeyFile = "invalid" msg := `failed to configure client TLS: "failed to load keypair: open invalid: no such file or directory"` - Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr) }).To(PanicWith(msg)) + Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr, factory) }).To(PanicWith(msg)) }) }) }) diff --git a/src/cmd/syslog-agent/main.go b/src/cmd/syslog-agent/main.go index 24fe4f5d8..14d3e7093 100644 --- a/src/cmd/syslog-agent/main.go +++ b/src/cmd/syslog-agent/main.go @@ -1,6 +1,7 @@ package main import ( + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" "log" _ "net/http/pprof" //nolint:gosec "os" @@ -33,5 +34,7 @@ func main() { ), ) - app.NewSyslogAgent(cfg, m, logger).Run() + factory := syslog.NewDefaultAppLogEmitterFactory() + + app.NewSyslogAgent(cfg, m, logger, &factory).Run() } diff --git a/src/internal/testhelper/spy_app_log_emitter.go b/src/internal/testhelper/spy_app_log_emitter.go new file mode 100644 index 000000000..f53d5b659 --- /dev/null +++ b/src/internal/testhelper/spy_app_log_emitter.go @@ -0,0 +1,34 @@ +package testhelper + +import "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + +type SpyAppLogEmitter struct { +} + +func (emitter *SpyAppLogEmitter) EmitLog(appID string, message string) { + +} + +func NewSpyAppEmitter() SpyAppLogEmitter { + return SpyAppLogEmitter{} +} + +type SpyAppLogEmitterFactory struct { + logClient syslog.LogClient + sourceIndex string +} + +func (factory *SpyAppLogEmitterFactory) LogClient() syslog.LogClient { + return factory.logClient +} + +func (factory *SpyAppLogEmitterFactory) SourceIndex() string { + return factory.sourceIndex +} + +func (factory *SpyAppLogEmitterFactory) NewAppLogEmitter(logClient syslog.LogClient, sourceIndex string) syslog.AppLogEmitter { + factory.logClient = logClient + factory.sourceIndex = sourceIndex + emitter := NewSpyAppEmitter() + return &emitter +} diff --git a/src/internal/testhelper/spy_log_client.go b/src/internal/testhelper/spy_log_client.go new file mode 100644 index 000000000..0faba6b91 --- /dev/null +++ b/src/internal/testhelper/spy_log_client.go @@ -0,0 +1,82 @@ +package testhelper + +import ( + "code.cloudfoundry.org/go-loggregator/v10" + v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" + "sync" +) + +type spyLogClient struct { + mu sync.Mutex + _message []string + _appID []string + + // We use maps to ensure that we can query the keys + _sourceType map[string]struct{} + _sourceInstance map[string]struct{} +} + +func NewSpyLogClient() *spyLogClient { + return &spyLogClient{ + _sourceType: make(map[string]struct{}), + _sourceInstance: make(map[string]struct{}), + } +} + +func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { + s.mu.Lock() + defer s.mu.Unlock() + + env := &v2.Envelope{ + Tags: make(map[string]string), + } + + for _, o := range opts { + o(env) + } + + s._message = append(s._message, message) + s._appID = append(s._appID, env.SourceId) + s._sourceType[env.GetTags()["source_type"]] = struct{}{} + s._sourceInstance[env.GetInstanceId()] = struct{}{} +} + +func (s *spyLogClient) Message() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._message +} + +func (s *spyLogClient) AppID() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._appID +} + +func (s *spyLogClient) SourceType() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the orig does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceType { + m[k] = struct{}{} + } + + return m +} + +func (s *spyLogClient) SourceInstance() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the orig does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceInstance { + m[k] = struct{}{} + } + + return m +} diff --git a/src/pkg/egress/syslog/app_log_emitter.go b/src/pkg/egress/syslog/app_log_emitter.go new file mode 100644 index 000000000..ea9182974 --- /dev/null +++ b/src/pkg/egress/syslog/app_log_emitter.go @@ -0,0 +1,59 @@ +package syslog + +import ( + "code.cloudfoundry.org/go-loggregator/v10" +) + +// LogClient is used to emit logs. +type LogClient interface { + EmitLog(message string, opts ...loggregator.EmitLogOption) +} + +// AppLogEmitter abstracts the sending of a log to the application log stream. +type AppLogEmitter interface { + EmitLog(appID string, message string) +} + +// DefaultAppLogEmitter is an implementation of AppLogEmitter which sends logs to an instance of a LogClient +type DefaultAppLogEmitter struct { + logClient LogClient + sourceIndex string +} + +// EmitLog writes a message in the application log stream using a LogClient. +func (appLogEmitter *DefaultAppLogEmitter) EmitLog(appID string, message string) { + if appLogEmitter.logClient == nil || appID == "" { + return + } + + option := loggregator.WithAppInfo(appID, "LGR", "") + appLogEmitter.logClient.EmitLog(message, option) + + option = loggregator.WithAppInfo( + appID, + "SYS", + appLogEmitter.sourceIndex, + ) + appLogEmitter.logClient.EmitLog(message, option) +} + +// AppLogEmitterFactory is used to create new instances of AppLogEmitter +type AppLogEmitterFactory interface { + NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter +} + +// DefaultAppLogEmitterFactory implementation of AppLogEmitterFactory to produce DefaultAppLogEmitter. +type DefaultAppLogEmitterFactory struct { +} + +// NewAppLogEmitter creates a new DefaultAppLogEmitter. +func (factory *DefaultAppLogEmitterFactory) NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter { + return &DefaultAppLogEmitter{ + logClient: logClient, + sourceIndex: sourceIndex, + } +} + +func NewDefaultAppLogEmitterFactory() DefaultAppLogEmitterFactory { + return DefaultAppLogEmitterFactory{} +} diff --git a/src/pkg/egress/syslog/app_log_emitter_test.go b/src/pkg/egress/syslog/app_log_emitter_test.go new file mode 100644 index 000000000..037904169 --- /dev/null +++ b/src/pkg/egress/syslog/app_log_emitter_test.go @@ -0,0 +1,72 @@ +package syslog_test + +import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Loggregator Emitter", func() { + Describe("DefaultAppLogEmitter", func() { + It("emits a log message", func() { + logClient := testhelper.NewSpyLogClient() + factory := syslog.NewDefaultAppLogEmitterFactory() + emitter := factory.NewAppLogEmitter(logClient, "0") + + emitter.EmitLog("app-id", "some-message") + + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() + Expect(messages).To(HaveLen(2)) + Expect(messages[0]).To(Equal("some-message")) + Expect(messages[1]).To(Equal("some-message")) + Expect(appIDs[0]).To(Equal("app-id")) + Expect(appIDs[1]).To(Equal("app-id")) + Expect(sourceTypes).To(HaveKey("LGR")) + Expect(sourceTypes).To(HaveKey("SYS")) + }) + + It("does not emit a log message if the appID is empty", func() { + logClient := testhelper.NewSpyLogClient() + factory := syslog.NewDefaultAppLogEmitterFactory() + emitter := factory.NewAppLogEmitter(logClient, "0") + + emitter.EmitLog("", "some-message") + + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() + Expect(messages).To(HaveLen(0)) + Expect(appIDs).To(HaveLen(0)) + Expect(sourceTypes).ToNot(HaveKey("LGR")) + Expect(sourceTypes).ToNot(HaveKey("SYS")) + }) + }) + + Describe("DefaultAppLogEmitterFactory", func() { + It("produces a DefaultAppLogEmitter", func() { + factory := syslog.NewDefaultAppLogEmitterFactory() + logClient := testhelper.NewSpyLogClient() + sourceIndex := "test-index" + + emitter := factory.NewAppLogEmitter(logClient, sourceIndex) + emitter.EmitLog("app-id", "some-message") + + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() + sourceInstance := logClient.SourceInstance() + Expect(messages).To(HaveLen(2)) + Expect(messages[0]).To(Equal("some-message")) + Expect(messages[1]).To(Equal("some-message")) + Expect(appIDs[0]).To(Equal("app-id")) + Expect(appIDs[1]).To(Equal("app-id")) + Expect(sourceTypes).To(HaveKey("LGR")) + Expect(sourceTypes).To(HaveKey("SYS")) + Expect(sourceInstance).To(HaveKey("")) + Expect(sourceInstance).To(HaveKey("test-index")) + }) + }) +}) diff --git a/src/pkg/egress/syslog/retry_writer.go b/src/pkg/egress/syslog/retry_writer.go index c97e81cb0..055fc0490 100644 --- a/src/pkg/egress/syslog/retry_writer.go +++ b/src/pkg/egress/syslog/retry_writer.go @@ -1,6 +1,7 @@ package syslog import ( + "fmt" "log" "math" "time" @@ -21,6 +22,7 @@ type RetryWriter struct { retryDuration RetryDuration maxRetries int binding *URLBinding + emitter AppLogEmitter } func NewRetryWriter( @@ -28,12 +30,14 @@ func NewRetryWriter( retryDuration RetryDuration, maxRetries int, writer egress.WriteCloser, + emitter AppLogEmitter, ) (egress.WriteCloser, error) { return &RetryWriter{ Writer: writer, retryDuration: retryDuration, maxRetries: maxRetries, binding: urlBinding, + emitter: emitter, }, nil } @@ -55,6 +59,7 @@ func (r *RetryWriter) Write(e *loggregator_v2.Envelope) error { sleepDuration := r.retryDuration(i) log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err) + r.emitter.EmitLog(e.SourceId, fmt.Sprintf(logTemplate, r.binding.URL.Host, sleepDuration, err)) time.Sleep(sleepDuration) } diff --git a/src/pkg/egress/syslog/retry_writer_test.go b/src/pkg/egress/syslog/retry_writer_test.go index 65c4752f2..7a8e99923 100644 --- a/src/pkg/egress/syslog/retry_writer_test.go +++ b/src/pkg/egress/syslog/retry_writer_test.go @@ -1,13 +1,12 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "errors" "net/url" - "sync" "sync/atomic" "time" - "code.cloudfoundry.org/go-loggregator/v10" v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" @@ -165,81 +164,6 @@ func (s *spyWriteCloser) WriteAttempts() int { return int(atomic.LoadInt64(&s.writeAttempts)) } -type spyLogClient struct { - mu sync.Mutex - _message []string - _appID []string - - // We use maps to ensure that we can query the keys - _sourceType map[string]struct{} - _sourceInstance map[string]struct{} -} - -func newSpyLogClient() *spyLogClient { - return &spyLogClient{ - _sourceType: make(map[string]struct{}), - _sourceInstance: make(map[string]struct{}), - } -} - -func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { - s.mu.Lock() - defer s.mu.Unlock() - - env := &v2.Envelope{ - Tags: make(map[string]string), - } - - for _, o := range opts { - o(env) - } - - s._message = append(s._message, message) - s._appID = append(s._appID, env.SourceId) - s._sourceType[env.GetTags()["source_type"]] = struct{}{} - s._sourceInstance[env.GetInstanceId()] = struct{}{} -} - -func (s *spyLogClient) message() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._message -} - -func (s *spyLogClient) appID() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._appID -} - -func (s *spyLogClient) sourceType() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceType { - m[k] = struct{}{} - } - - return m -} - -func (s *spyLogClient) sourceInstance() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceInstance { - m[k] = struct{}{} - } - - return m -} - func buildDelay(multiplier time.Duration) func(int) time.Duration { return func(attempt int) time.Duration { return time.Duration(attempt) * multiplier @@ -252,10 +176,12 @@ func buildRetryWriter( maxRetries int, delayMultiplier time.Duration, ) (egress.WriteCloser, error) { + emitter := testhelper.NewSpyAppEmitter() return syslog.NewRetryWriter( urlBinding, syslog.RetryDuration(buildDelay(delayMultiplier)), maxRetries, w, + &emitter, ) } diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index bb06b34a1..d38609c1a 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -6,10 +6,8 @@ import ( "golang.org/x/net/context" - metrics "code.cloudfoundry.org/go-metric-registry" - "code.cloudfoundry.org/go-diodes" - "code.cloudfoundry.org/go-loggregator/v10" + metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" ) @@ -33,32 +31,20 @@ type Credentials struct { CA string `json:"ca"` } -// LogClient is used to emit logs. -type LogClient interface { - EmitLog(message string, opts ...loggregator.EmitLogOption) -} - -// nullLogClient ensures that the LogClient is in fact optional. -type nullLogClient struct{} - -// EmitLog drops all messages into /dev/null. -func (nullLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { -} - type writerFactory interface { - NewWriter(*URLBinding) (egress.WriteCloser, error) + NewWriter(*URLBinding, AppLogEmitter) (egress.WriteCloser, error) } // SyslogConnector creates the various egress syslog writers. type SyslogConnector struct { skipCertVerify bool - logClient LogClient wg egress.WaitGroup - sourceIndex string writerFactory writerFactory + metricClient metricClient - metricClient metricClient droppedMetric metrics.Counter + + appLogEmitter AppLogEmitter } // NewSyslogConnector configures and returns a new SyslogConnector. @@ -78,7 +64,6 @@ func NewSyslogConnector( sc := &SyslogConnector{ skipCertVerify: skipCertVerify, wg: wg, - logClient: nullLogClient{}, writerFactory: f, metricClient: m, @@ -93,12 +78,11 @@ func NewSyslogConnector( // ConnectorOption allows a syslog connector to be customized. type ConnectorOption func(*SyslogConnector) -// WithLogClient returns a ConnectorOption that will set up logging for any +// WithAppLogEmitter returns a ConnectorOption that will set up logging for any // information about a binding. -func WithLogClient(logClient LogClient, sourceIndex string) ConnectorOption { +func WithAppLogEmitter(emitter AppLogEmitter) ConnectorOption { return func(sc *SyslogConnector) { - sc.logClient = logClient - sc.sourceIndex = sourceIndex + sc.appLogEmitter = emitter } } @@ -110,7 +94,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return nil, err } - writer, err := w.writerFactory.NewWriter(urlBinding) + writer, err := w.writerFactory.NewWriter(urlBinding, w.appLogEmitter) if err != nil { return nil, err } @@ -138,7 +122,9 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer w.droppedMetric.Add(float64(missed)) drainDroppedMetric.Add(float64(missed)) - w.emitLoggregatorErrorLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) + if w.appLogEmitter != nil { + w.appLogEmitter.EmitLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) + } w.emitStandardOutErrorLog(b.AppId, urlBinding.Scheme(), anonymousUrl.String(), missed) }), w.wg) @@ -151,20 +137,6 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return filteredWriter, nil } -func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { - if appID == "" { - return - } - option := loggregator.WithAppInfo(appID, "LGR", "") - w.logClient.EmitLog(message, option) - - option = loggregator.WithAppInfo( - appID, - "SYS", - w.sourceIndex, - ) - w.logClient.EmitLog(message, option) -} func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) if appID == "" { diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index cb58a740d..b12915a4a 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -1,6 +1,7 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "errors" "fmt" "io" @@ -172,13 +173,14 @@ var _ = Describe("SyslogConnector", func() { }) It("emits a LGR and SYS log to the log client about logs that have been dropped", func() { - logClient := newSpyLogClient() + logClient := testhelper.NewSpyLogClient() + factory := syslog.NewDefaultAppLogEmitterFactory() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithAppLogEmitter(factory.NewAppLogEmitter(logClient, "3")), ) binding := syslog.Binding{AppId: "app-id", @@ -201,26 +203,27 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Eventually(logClient.message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) - Eventually(logClient.appID).Should(ContainElement("app-id")) + Eventually(logClient.Message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Eventually(logClient.AppID).Should(ContainElement("app-id")) - Eventually(logClient.sourceType).Should(HaveLen(2)) - Eventually(logClient.sourceType).Should(HaveKey("LGR")) - Eventually(logClient.sourceType).Should(HaveKey("SYS")) + Eventually(logClient.SourceType).Should(HaveLen(2)) + Eventually(logClient.SourceType).Should(HaveKey("LGR")) + Eventually(logClient.SourceType).Should(HaveKey("SYS")) - Eventually(logClient.sourceInstance).Should(HaveLen(2)) - Eventually(logClient.sourceInstance).Should(HaveKey("")) - Eventually(logClient.sourceInstance).Should(HaveKey("3")) + Eventually(logClient.SourceInstance).Should(HaveLen(2)) + Eventually(logClient.SourceInstance).Should(HaveKey("")) + Eventually(logClient.SourceInstance).Should(HaveKey("3")) }) It("doesn't emit LGR and SYS log to the log client about aggregate drains drops", func() { - logClient := newSpyLogClient() + logClient := testhelper.NewSpyLogClient() + factory := syslog.NewDefaultAppLogEmitterFactory() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithAppLogEmitter(factory.NewAppLogEmitter(logClient, "3")), ) binding := syslog.Binding{Drain: syslog.Drain{Url: "dropping://"}} @@ -239,7 +242,7 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Consistently(logClient.message).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Consistently(logClient.Message()).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) }) It("does not panic on unknown dropped metrics", func() { @@ -276,6 +279,7 @@ type stubWriterFactory struct { func (f *stubWriterFactory) NewWriter( urlBinding *syslog.URLBinding, + emitter syslog.AppLogEmitter, ) (egress.WriteCloser, error) { f.called = true return f.writer, f.err diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 8ec8249ab..c8ec7c804 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -52,7 +52,7 @@ func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Conf } } -func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { +func (f WriterFactory) NewWriter(ub *URLBinding, emitter AppLogEmitter) (egress.WriteCloser, error) { tlsCfg := f.externalTlsConfig.Clone() if ub.InternalTls { tlsCfg = f.internalTlsConfig.Clone() @@ -60,7 +60,9 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { if len(ub.Certificate) > 0 && len(ub.PrivateKey) > 0 { cert, err := tls.X509KeyPair(ub.Certificate, ub.PrivateKey) if err != nil { - err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", err.Error()) + errorMessage := err.Error() + err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", errorMessage) + emitter.EmitLog(ub.AppID, fmt.Sprintf("failed to load certificate: %s", errorMessage)) return nil, err } tlsCfg.Certificates = []tls.Certificate{cert} @@ -69,6 +71,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { ok := tlsCfg.RootCAs.AppendCertsFromPEM(ub.CA) if !ok { err := NewWriterFactoryErrorf(ub.URL, "failed to load root CA") + emitter.EmitLog(ub.AppID, "failed to load root CA") return nil, err } } @@ -140,5 +143,6 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { ExponentialDuration, maxRetries, w, + emitter, ) } diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 5707bbf56..746ef5ade 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -1,6 +1,7 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "crypto/tls" "net/url" @@ -13,13 +14,16 @@ import ( var _ = Describe("EgressFactory", func() { var ( - f syslog.WriterFactory - sm *metricsHelpers.SpyMetricsRegistry + f syslog.WriterFactory + sm *metricsHelpers.SpyMetricsRegistry + emitter syslog.AppLogEmitter ) BeforeEach(func() { sm = metricsHelpers.NewMetricsRegistry() f = syslog.NewWriterFactory(&tls.Config{}, &tls.Config{}, syslog.NetworkTimeoutConfig{}, sm) //nolint:gosec + spyEmitter := testhelper.NewSpyAppEmitter() + emitter = &spyEmitter }) Context("when the url begins with https", func() { @@ -30,7 +34,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -49,7 +53,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -68,7 +72,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -87,7 +91,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -106,7 +110,7 @@ var _ = Describe("EgressFactory", func() { Certificate: []byte("invalid-certificate"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) }) }) @@ -120,7 +124,7 @@ var _ = Describe("EgressFactory", func() { PrivateKey: []byte("invalid-private-key"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) }) }) @@ -143,7 +147,7 @@ var _ = Describe("EgressFactory", func() { urlBinding.CA = []byte("invalid-ca") } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, emitter) Expect(err).To(MatchError(expectedErr)) }, @@ -169,7 +173,7 @@ var _ = Describe("EgressFactory", func() { AppID: appID, } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, emitter) Expect(err).ToNot(HaveOccurred()) metric := sm.GetMetric("egress", tags) diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index 0921c95c3..db400de2b 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -1,6 +1,7 @@ package bindings import ( + "fmt" "log" "net" "net/url" @@ -34,9 +35,10 @@ type FilteredBindingFetcher struct { invalidDrains metrics.Gauge blacklistedDrains metrics.Gauge failedHostsCache *simplecache.SimpleCache[string, bool] + emitter syslog.AppLogEmitter } -func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher { +func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger, emitter syslog.AppLogEmitter) *FilteredBindingFetcher { opt := metrics.WithMetricLabels(map[string]string{"unit": "total"}) invalidDrains := m.NewGauge( @@ -57,6 +59,7 @@ func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, invalidDrains: invalidDrains, blacklistedDrains: blacklistedDrains, failedHostsCache: simplecache.New[string, bool](120 * time.Second), + emitter: emitter, } } @@ -87,12 +90,14 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if invalidScheme(u.Scheme) { f.printWarning("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Invalid scheme %s in syslog drain url %s", u.Scheme, anonymousUrl.String())) continue } if len(u.Host) == 0 { invalidDrains += 1 f.printWarning("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String())) continue } @@ -100,6 +105,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if exists { invalidDrains += 1 f.printWarning("Skipped resolve ip address for syslog drain with url %s for application %s due to prior failure", anonymousUrl.String(), b.AppId) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Skipped resolve ip address for syslog drain with url %s due to prior failure", anonymousUrl.String())) continue } @@ -108,6 +114,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 f.failedHostsCache.Set(u.Host, true) f.printWarning("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Cannot resolve ip address for syslog drain with url %s", anonymousUrl.String())) continue } @@ -116,6 +123,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 blacklistedDrains += 1 f.printWarning("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Resolved ip address for syslog drain with url %s is blacklisted", anonymousUrl.String())) continue } diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go index c217d4e84..70a9b6cb9 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go @@ -2,6 +2,7 @@ package bindings_test import ( "bytes" + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "errors" "log" "net" @@ -19,10 +20,13 @@ var _ = Describe("FilteredBindingFetcher", func() { log = log.New(GinkgoWriter, "", log.LstdFlags) filter *bindings.FilteredBindingFetcher metrics *metricsHelpers.SpyMetricsRegistry + emitter syslog.AppLogEmitter ) BeforeEach(func() { metrics = metricsHelpers.NewMetricsRegistry() + spyEmitter := testhelper.NewSpyAppEmitter() + emitter = &spyEmitter }) It("returns valid bindings", func() { @@ -33,7 +37,14 @@ var _ = Describe("FilteredBindingFetcher", func() { } bindingReader := &SpyBindingReader{bindings: input} - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + bindingReader, + metrics, + true, + log, + emitter, + ) actual, err := filter.FetchBindings() Expect(err).ToNot(HaveOccurred()) @@ -43,7 +54,14 @@ var _ = Describe("FilteredBindingFetcher", func() { It("returns an error if the binding reader cannot fetch bindings", func() { bindingReader := &SpyBindingReader{nil, errors.New("Woops")} - filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log) + filter := bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + bindingReader, + metrics, + true, + log, + emitter, + ) actual, err := filter.FetchBindings() Expect(err).To(HaveOccurred()) @@ -70,6 +88,7 @@ var _ = Describe("FilteredBindingFetcher", func() { metrics, warn, log, + emitter, ) }) @@ -114,6 +133,7 @@ var _ = Describe("FilteredBindingFetcher", func() { metrics, warn, log, + emitter, ) }) @@ -169,6 +189,7 @@ var _ = Describe("FilteredBindingFetcher", func() { metrics, warn, log, + emitter, ) }) @@ -216,6 +237,7 @@ var _ = Describe("FilteredBindingFetcher", func() { metrics, warn, log, + emitter, ) }) @@ -281,6 +303,7 @@ var _ = Describe("FilteredBindingFetcher", func() { metrics, warn, log, + emitter, ) })