From 544a68ddf6cd9d1f98426878d049cf5c46e9ef49 Mon Sep 17 00:00:00 2001 From: george-dorin <120329946+george-dorin@users.noreply.github.com> Date: Fri, 13 Oct 2023 09:57:48 +0300 Subject: [PATCH 1/3] Multichain telemetry support (#10623) * Initial draft * Add manager * Update config TOML * Update config tests * Add telemetry manager tests * Fix lint * Add CHANGELOG.md * Update TelemetryService Send interface * Fix bad merge * Fix hanging tests * Change config from TelemetryIngressEndpoint to TelemetryIngress.Endpoints * - Fix wrong context being passed * - Remove protocol prefix from telemetry URL - Drop context from TelemPayload * - Fix context in tests * - Update CHANGELOG.md * - Add fields back and show error * - Fix failing tests * Move changelog entry to [dev] * Update so that telemetry can still be configured the old way * Update CONFIG.md --- core/config/docs/core.toml | 14 +- core/config/docs/docs.go | 22 ++ core/config/docs/docs_test.go | 7 + core/config/mocks/telemetry_ingress.go | 176 +++++++++ .../mocks/telemetry_ingress_endpoint.go | 87 +++++ core/config/telemetry_ingress_config.go | 16 +- core/config/toml/types.go | 42 ++- core/services/chainlink/application.go | 25 +- .../chainlink/config_telemetry_ingress.go | 54 ++- .../config_telemetry_ingress_test.go | 10 +- core/services/chainlink/config_test.go | 33 +- .../testdata/config-empty-effective.toml | 4 +- .../chainlink/testdata/config-full.toml | 10 +- .../config-multi-chain-effective.toml | 4 +- core/services/functions/listener_test.go | 10 +- core/services/ocr/delegate.go | 4 +- core/services/ocr2/delegate.go | 28 +- core/services/ocrcommon/telemetry_test.go | 22 +- core/services/synchronization/common.go | 21 ++ core/services/synchronization/helpers_test.go | 4 +- .../mocks/telemetry_ingress_batch_client.go | 107 ------ ...ingress_client.go => telemetry_service.go} | 28 +- .../telemetry_ingress_batch_client.go | 22 +- .../telemetry_ingress_batch_client_test.go | 16 +- .../telemetry_ingress_batch_worker_test.go | 2 - .../telemetry_ingress_client.go | 37 +- .../telemetry_ingress_client_test.go | 3 +- core/services/telemetry/common.go | 2 +- core/services/telemetry/ingress.go | 24 +- core/services/telemetry/ingress_batch.go | 24 +- core/services/telemetry/ingress_batch_test.go | 12 +- core/services/telemetry/ingress_test.go | 12 +- core/services/telemetry/manager.go | 221 ++++++++++++ core/services/telemetry/manager_test.go | 341 ++++++++++++++++++ core/services/telemetry/noop.go | 2 +- .../testdata/config-empty-effective.toml | 4 +- core/web/resolver/testdata/config-full.toml | 10 +- .../config-multi-chain-effective.toml | 4 +- docs/CHANGELOG.md | 21 +- docs/CONFIG.md | 38 +- testdata/scripts/node/validate/default.txtar | 4 +- .../disk-based-logging-disabled.txtar | 4 +- .../validate/disk-based-logging-no-dir.txtar | 4 +- .../node/validate/disk-based-logging.txtar | 4 +- testdata/scripts/node/validate/invalid.txtar | 4 +- testdata/scripts/node/validate/valid.txtar | 4 +- 46 files changed, 1230 insertions(+), 317 deletions(-) create mode 100644 core/config/mocks/telemetry_ingress.go create mode 100644 core/config/mocks/telemetry_ingress_endpoint.go delete mode 100644 core/services/synchronization/mocks/telemetry_ingress_batch_client.go rename core/services/synchronization/mocks/{telemetry_ingress_client.go => telemetry_service.go} (59%) create mode 100644 core/services/telemetry/manager.go create mode 100644 core/services/telemetry/manager_test.go diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 8729b86868..90089ef8a5 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -90,9 +90,9 @@ LeaseRefreshInterval = '1s' # Default UniConn = true # Default # Logging toggles verbose logging of the raw telemetry messages being sent. Logging = false # Default -# ServerPubKey is the public key of the telemetry server. +# ServerPubKey is the public key of the telemetry server. This field will be removed in a furture version ServerPubKey = 'test-pub-key' # Example -# URL is where to send telemetry. +# URL is where to send telemetry. This field will be removed in a furture version URL = 'https://prom.test' # Example # BufferSize is the number of telemetry messages to buffer before dropping new ones. BufferSize = 100 # Default @@ -105,6 +105,16 @@ SendTimeout = '10s' # Default # UseBatchSend toggles sending telemetry to the ingress server using the batch client. UseBatchSend = true # Default +[[TelemetryIngress.Endpoints]] # Example +# Network aka EVM, Solana, Starknet +Network = 'EVM' # Example +# ChainID of the network +ChainID = '111551111' # Example +# ServerPubKey is the public key of the telemetry server. +ServerPubKey = 'test-pub-key-111551111-evm' # Example +# URL is where to send telemetry. +URL = 'localhost-111551111-evm:9000' # Example + [AuditLogger] # Enabled determines if this logger should be configured at all Enabled = false # Default diff --git a/core/config/docs/docs.go b/core/config/docs/docs.go index c3b696e551..df08246503 100644 --- a/core/config/docs/docs.go +++ b/core/config/docs/docs.go @@ -122,6 +122,24 @@ func newTable(line string, desc lines) *table { return t } +func newArrayOfTables(line string, desc lines) *table { + t := &table{ + name: strings.Trim(strings.Trim(line, fieldExample), "[]"), + codes: []string{line}, + desc: desc, + } + if len(desc) > 0 { + if strings.HasPrefix(strings.TrimSpace(desc[0]), tokenAdvanced) { + t.adv = true + t.desc = t.desc[1:] + } else if strings.HasPrefix(strings.TrimSpace(desc[len(desc)-1]), tokenExtended) { + t.ext = true + t.desc = t.desc[:len(desc)-1] + } + } + return t +} + func (t table) advanced() string { if t.adv { return advancedWarning("Do not change these settings unless you know what you are doing.") @@ -217,6 +235,10 @@ func parseTOMLDocs(s string) (items []fmt.Stringer, err error) { items = append(items, desc) desc = nil } + } else if strings.HasPrefix(line, "[[") { + currentTable = newArrayOfTables(line, desc) + items = append(items, currentTable) + desc = nil } else if strings.HasPrefix(line, "[") { currentTable = newTable(line, desc) items = append(items, currentTable) diff --git a/core/config/docs/docs_test.go b/core/config/docs/docs_test.go index f1a3ab1906..0eee33cb72 100644 --- a/core/config/docs/docs_test.go +++ b/core/config/docs/docs_test.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink/cfgtest" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils/config" ) @@ -37,6 +38,12 @@ func TestDoc(t *testing.T) { require.NoError(t, err) } + // Except for TelemetryIngress.ServerPubKey and TelemetryIngress.URL as this will be removed in the future + // and its only use is to signal to NOPs that these fields are no longer allowed + emptyString := "" + c.TelemetryIngress.ServerPubKey = &emptyString + c.TelemetryIngress.URL = new(models.URL) + cfgtest.AssertFieldsNotNil(t, c) var defaults chainlink.Config diff --git a/core/config/mocks/telemetry_ingress.go b/core/config/mocks/telemetry_ingress.go new file mode 100644 index 0000000000..eade0bdc25 --- /dev/null +++ b/core/config/mocks/telemetry_ingress.go @@ -0,0 +1,176 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + config "github.com/smartcontractkit/chainlink/v2/core/config" + mock "github.com/stretchr/testify/mock" + + time "time" + + url "net/url" +) + +// TelemetryIngress is an autogenerated mock type for the TelemetryIngress type +type TelemetryIngress struct { + mock.Mock +} + +// BufferSize provides a mock function with given fields: +func (_m *TelemetryIngress) BufferSize() uint { + ret := _m.Called() + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// Endpoints provides a mock function with given fields: +func (_m *TelemetryIngress) Endpoints() []config.TelemetryIngressEndpoint { + ret := _m.Called() + + var r0 []config.TelemetryIngressEndpoint + if rf, ok := ret.Get(0).(func() []config.TelemetryIngressEndpoint); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]config.TelemetryIngressEndpoint) + } + } + + return r0 +} + +// Logging provides a mock function with given fields: +func (_m *TelemetryIngress) Logging() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MaxBatchSize provides a mock function with given fields: +func (_m *TelemetryIngress) MaxBatchSize() uint { + ret := _m.Called() + + var r0 uint + if rf, ok := ret.Get(0).(func() uint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint) + } + + return r0 +} + +// SendInterval provides a mock function with given fields: +func (_m *TelemetryIngress) SendInterval() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// SendTimeout provides a mock function with given fields: +func (_m *TelemetryIngress) SendTimeout() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// ServerPubKey provides a mock function with given fields: +func (_m *TelemetryIngress) ServerPubKey() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// URL provides a mock function with given fields: +func (_m *TelemetryIngress) URL() *url.URL { + ret := _m.Called() + + var r0 *url.URL + if rf, ok := ret.Get(0).(func() *url.URL); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*url.URL) + } + } + + return r0 +} + +// UniConn provides a mock function with given fields: +func (_m *TelemetryIngress) UniConn() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// UseBatchSend provides a mock function with given fields: +func (_m *TelemetryIngress) UseBatchSend() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +type mockConstructorTestingTNewTelemetryIngress interface { + mock.TestingT + Cleanup(func()) +} + +// NewTelemetryIngress creates a new instance of TelemetryIngress. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTelemetryIngress(t mockConstructorTestingTNewTelemetryIngress) *TelemetryIngress { + mock := &TelemetryIngress{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/config/mocks/telemetry_ingress_endpoint.go b/core/config/mocks/telemetry_ingress_endpoint.go new file mode 100644 index 0000000000..c941d0b437 --- /dev/null +++ b/core/config/mocks/telemetry_ingress_endpoint.go @@ -0,0 +1,87 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package mocks + +import ( + url "net/url" + + mock "github.com/stretchr/testify/mock" +) + +// TelemetryIngressEndpoint is an autogenerated mock type for the TelemetryIngressEndpoint type +type TelemetryIngressEndpoint struct { + mock.Mock +} + +// ChainID provides a mock function with given fields: +func (_m *TelemetryIngressEndpoint) ChainID() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Network provides a mock function with given fields: +func (_m *TelemetryIngressEndpoint) Network() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// ServerPubKey provides a mock function with given fields: +func (_m *TelemetryIngressEndpoint) ServerPubKey() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// URL provides a mock function with given fields: +func (_m *TelemetryIngressEndpoint) URL() *url.URL { + ret := _m.Called() + + var r0 *url.URL + if rf, ok := ret.Get(0).(func() *url.URL); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*url.URL) + } + } + + return r0 +} + +type mockConstructorTestingTNewTelemetryIngressEndpoint interface { + mock.TestingT + Cleanup(func()) +} + +// NewTelemetryIngressEndpoint creates a new instance of TelemetryIngressEndpoint. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTelemetryIngressEndpoint(t mockConstructorTestingTNewTelemetryIngressEndpoint) *TelemetryIngressEndpoint { + mock := &TelemetryIngressEndpoint{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/config/telemetry_ingress_config.go b/core/config/telemetry_ingress_config.go index f784f996e0..f6c9fa3f85 100644 --- a/core/config/telemetry_ingress_config.go +++ b/core/config/telemetry_ingress_config.go @@ -5,14 +5,26 @@ import ( "time" ) +//go:generate mockery --quiet --name TelemetryIngress --output ./mocks/ --case=underscore --filename telemetry_ingress.go + type TelemetryIngress interface { Logging() bool UniConn() bool - ServerPubKey() string - URL() *url.URL BufferSize() uint MaxBatchSize() uint SendInterval() time.Duration SendTimeout() time.Duration UseBatchSend() bool + Endpoints() []TelemetryIngressEndpoint + + ServerPubKey() string // Deprecated: Use TelemetryIngressEndpoint.ServerPubKey instead, this field will be removed in future versions + URL() *url.URL // Deprecated: Use TelemetryIngressEndpoint.URL instead, this field will be removed in future versions +} + +//go:generate mockery --quiet --name TelemetryIngressEndpoint --output ./mocks/ --case=underscore --filename telemetry_ingress_endpoint.go +type TelemetryIngressEndpoint interface { + Network() string + ChainID() string + ServerPubKey() string + URL() *url.URL } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 787a52b5c8..4151bf3042 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -426,13 +426,22 @@ func (d *DatabaseBackup) setFrom(f *DatabaseBackup) { type TelemetryIngress struct { UniConn *bool Logging *bool - ServerPubKey *string - URL *models.URL BufferSize *uint16 MaxBatchSize *uint16 SendInterval *models.Duration SendTimeout *models.Duration UseBatchSend *bool + Endpoints []TelemetryIngressEndpoint `toml:",omitempty"` + + URL *models.URL `toml:",omitempty"` // Deprecated: Use TelemetryIngressEndpoint.URL instead, this field will be removed in future versions + ServerPubKey *string `toml:",omitempty"` // Deprecated: Use TelemetryIngressEndpoint.ServerPubKey instead, this field will be removed in future versions +} + +type TelemetryIngressEndpoint struct { + Network *string + ChainID *string + URL *models.URL + ServerPubKey *string } func (t *TelemetryIngress) setFrom(f *TelemetryIngress) { @@ -442,12 +451,6 @@ func (t *TelemetryIngress) setFrom(f *TelemetryIngress) { if v := f.Logging; v != nil { t.Logging = v } - if v := f.ServerPubKey; v != nil { - t.ServerPubKey = v - } - if v := f.URL; v != nil { - t.URL = v - } if v := f.BufferSize; v != nil { t.BufferSize = v } @@ -463,6 +466,29 @@ func (t *TelemetryIngress) setFrom(f *TelemetryIngress) { if v := f.UseBatchSend; v != nil { t.UseBatchSend = v } + if v := f.Endpoints; v != nil { + t.Endpoints = v + } + if v := f.ServerPubKey; v != nil { + t.ServerPubKey = v + } + if v := f.URL; v != nil { + t.URL = v + } +} + +func (t *TelemetryIngress) ValidateConfig() (err error) { + if (!t.URL.IsZero() || *t.ServerPubKey != "") && len(t.Endpoints) > 0 { + return configutils.ErrInvalid{Name: "URL", Value: t.URL.String(), + Msg: `Cannot set both TelemetryIngress.URL and TelemetryIngress.ServerPubKey alongside TelemetryIngress.Endpoints. Please use only TelemetryIngress.Endpoints: + [[TelemetryIngress.Endpoints]] + Network = '...' # e.g. EVM. Solana, Starknet, Cosmos + ChainID = '...' # e.g. 1, 5, devnet, mainnet-beta + URL = '...' + ServerPubKey = '...'`} + } + + return nil } type AuditLogger struct { diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 814c8a1c10..173b697944 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -47,7 +47,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" - "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/core/services/vrf" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" @@ -217,24 +216,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger.Info("Nurse service (automatic pprof profiling) is disabled") } - telemetryIngressClient := synchronization.TelemetryIngressClient(&synchronization.NoopTelemetryIngressClient{}) - telemetryIngressBatchClient := synchronization.TelemetryIngressBatchClient(&synchronization.NoopTelemetryIngressBatchClient{}) - monitoringEndpointGen := telemetry.MonitoringEndpointGenerator(&telemetry.NoopAgent{}) - - ticfg := cfg.TelemetryIngress() - if ticfg.URL() != nil { - if ticfg.UseBatchSend() { - telemetryIngressBatchClient = synchronization.NewTelemetryIngressBatchClient(ticfg.URL(), - ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize(), ticfg.MaxBatchSize(), ticfg.SendInterval(), ticfg.SendTimeout(), ticfg.UniConn()) - monitoringEndpointGen = telemetry.NewIngressAgentBatchWrapper(telemetryIngressBatchClient) - - } else { - telemetryIngressClient = synchronization.NewTelemetryIngressClient(ticfg.URL(), - ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize()) - monitoringEndpointGen = telemetry.NewIngressAgentWrapper(telemetryIngressClient) - } - } - srvcs = append(srvcs, telemetryIngressClient, telemetryIngressBatchClient) + telemetryManager := telemetry.NewManager(cfg.TelemetryIngress(), keyStore.CSA(), globalLogger) + srvcs = append(srvcs, telemetryManager) backupCfg := cfg.Database().Backup() if backupCfg.Mode() != config.DatabaseBackupModeNone && backupCfg.Frequency() > 0 { @@ -359,7 +342,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { keyStore, pipelineRunner, peerWrapper, - monitoringEndpointGen, + telemetryManager, legacyEVMChains, globalLogger, cfg.Database(), @@ -379,7 +362,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { mercuryORM, pipelineRunner, peerWrapper, - monitoringEndpointGen, + telemetryManager, legacyEVMChains, globalLogger, ocr2DelegateConfig, diff --git a/core/services/chainlink/config_telemetry_ingress.go b/core/services/chainlink/config_telemetry_ingress.go index 8557bf0b02..5126833134 100644 --- a/core/services/chainlink/config_telemetry_ingress.go +++ b/core/services/chainlink/config_telemetry_ingress.go @@ -14,6 +14,10 @@ type telemetryIngressConfig struct { c toml.TelemetryIngress } +type telemetryIngressEndpointConfig struct { + c toml.TelemetryIngressEndpoint +} + func (t *telemetryIngressConfig) Logging() bool { return *t.c.Logging } @@ -22,17 +26,6 @@ func (t *telemetryIngressConfig) UniConn() bool { return *t.c.UniConn } -func (t *telemetryIngressConfig) ServerPubKey() string { - return *t.c.ServerPubKey -} - -func (t *telemetryIngressConfig) URL() *url.URL { - if t.c.URL.IsZero() { - return nil - } - return t.c.URL.URL() -} - func (t *telemetryIngressConfig) BufferSize() uint { return uint(*t.c.BufferSize) } @@ -52,3 +45,42 @@ func (t *telemetryIngressConfig) SendTimeout() time.Duration { func (t *telemetryIngressConfig) UseBatchSend() bool { return *t.c.UseBatchSend } + +// Deprecated: Use TelemetryIngressEndpoint.ServerPubKey, this field will be removed in future versions +func (t *telemetryIngressConfig) ServerPubKey() string { + return *t.c.ServerPubKey +} + +// Deprecated: Use TelemetryIngressEndpoint.URL instead, this field will be removed in future versions +func (t *telemetryIngressConfig) URL() *url.URL { + return t.c.URL.URL() +} + +func (t *telemetryIngressConfig) Endpoints() []config.TelemetryIngressEndpoint { + var endpoints []config.TelemetryIngressEndpoint + for _, e := range t.c.Endpoints { + endpoints = append(endpoints, &telemetryIngressEndpointConfig{ + c: e, + }) + } + return endpoints +} + +func (t *telemetryIngressEndpointConfig) Network() string { + return *t.c.Network +} + +func (t *telemetryIngressEndpointConfig) ChainID() string { + return *t.c.ChainID +} + +func (t *telemetryIngressEndpointConfig) URL() *url.URL { + if t.c.URL.IsZero() { + return nil + } + return t.c.URL.URL() +} + +func (t *telemetryIngressEndpointConfig) ServerPubKey() string { + return *t.c.ServerPubKey +} diff --git a/core/services/chainlink/config_telemetry_ingress_test.go b/core/services/chainlink/config_telemetry_ingress_test.go index 5e4561440c..c371b465a2 100644 --- a/core/services/chainlink/config_telemetry_ingress_test.go +++ b/core/services/chainlink/config_telemetry_ingress_test.go @@ -18,11 +18,17 @@ func TestTelemetryIngressConfig(t *testing.T) { ticfg := cfg.TelemetryIngress() assert.True(t, ticfg.Logging()) assert.True(t, ticfg.UniConn()) - assert.Equal(t, "test-pub-key", ticfg.ServerPubKey()) - assert.Equal(t, "https://prom.test", ticfg.URL().String()) assert.Equal(t, uint(1234), ticfg.BufferSize()) assert.Equal(t, uint(4321), ticfg.MaxBatchSize()) assert.Equal(t, time.Minute, ticfg.SendInterval()) assert.Equal(t, 5*time.Second, ticfg.SendTimeout()) assert.True(t, ticfg.UseBatchSend()) + + tec := cfg.TelemetryIngress().Endpoints() + + assert.Equal(t, 1, len(tec)) + assert.Equal(t, "EVM", tec[0].Network()) + assert.Equal(t, "1", tec[0].ChainID()) + assert.Equal(t, "prom.test", tec[0].URL().String()) + assert.Equal(t, "test-pub-key", tec[0].ServerPubKey()) } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 624c130162..b42609318c 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -272,14 +272,21 @@ func TestConfig_Marshal(t *testing.T) { full.TelemetryIngress = toml.TelemetryIngress{ UniConn: ptr(true), Logging: ptr(true), - ServerPubKey: ptr("test-pub-key"), - URL: mustURL("https://prom.test"), BufferSize: ptr[uint16](1234), MaxBatchSize: ptr[uint16](4321), SendInterval: models.MustNewDuration(time.Minute), SendTimeout: models.MustNewDuration(5 * time.Second), UseBatchSend: ptr(true), + URL: ptr(models.URL{}), + ServerPubKey: ptr(""), + Endpoints: []toml.TelemetryIngressEndpoint{{ + Network: ptr("EVM"), + ChainID: ptr("1"), + ServerPubKey: ptr("test-pub-key"), + URL: mustURL("prom.test")}, + }, } + full.Log = toml.Log{ Level: ptr(toml.LogLevel(zapcore.DPanicLevel)), JSONConsole: ptr(true), @@ -685,14 +692,21 @@ LeaseRefreshInterval = '1s' {"TelemetryIngress", Config{Core: toml.Core{TelemetryIngress: full.TelemetryIngress}}, `[TelemetryIngress] UniConn = true Logging = true -ServerPubKey = 'test-pub-key' -URL = 'https://prom.test' BufferSize = 1234 MaxBatchSize = 4321 SendInterval = '1m0s' SendTimeout = '5s' UseBatchSend = true +URL = '' +ServerPubKey = '' + +[[TelemetryIngress.Endpoints]] +Network = 'EVM' +ChainID = '1' +URL = 'prom.test' +ServerPubKey = 'test-pub-key' `}, + {"Log", Config{Core: toml.Core{Log: full.Log}}, `[Log] Level = 'crit' JSONConsole = true @@ -1061,6 +1075,17 @@ func TestConfig_full(t *testing.T) { } } + // Except for TelemetryIngress.ServerPubKey as this will be removed in the future + // and its only use is to signal to NOPs that these fields are no longer allowed + if got.TelemetryIngress.ServerPubKey == nil { + got.TelemetryIngress.ServerPubKey = ptr("") + } + // Except for TelemetryIngress.URL as this will be removed in the future + // and its only use is to signal to NOPs that these fields are no longer allowed + if got.TelemetryIngress.URL == nil { + got.TelemetryIngress.URL = new(models.URL) + } + cfgtest.AssertFieldsNotNil(t, got) } diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index 91383c5fad..ed42a038a1 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -36,13 +36,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 51ab9e4847..d37002ffc4 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -36,13 +36,19 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = true -ServerPubKey = 'test-pub-key' -URL = 'https://prom.test' BufferSize = 1234 MaxBatchSize = 4321 SendInterval = '1m0s' SendTimeout = '5s' UseBatchSend = true +URL = '' +ServerPubKey = '' + +[[TelemetryIngress.Endpoints]] +Network = 'EVM' +ChainID = '1' +URL = 'prom.test' +ServerPubKey = 'test-pub-key' [AuditLogger] Enabled = true diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 2756c6f65b..47e5f008b4 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -36,13 +36,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = true diff --git a/core/services/functions/listener_test.go b/core/services/functions/listener_test.go index a06fcf3e3b..49df5a28c4 100644 --- a/core/services/functions/listener_test.go +++ b/core/services/functions/listener_test.go @@ -54,7 +54,7 @@ type FunctionsListenerUniverse struct { eaClient *functions_mocks.ExternalAdapterClient pluginORM *functions_mocks.ORM logBroadcaster *log_mocks.Broadcaster - ingressClient *sync_mocks.TelemetryIngressClient + ingressClient *sync_mocks.TelemetryService decryptor *threshold_mocks.Decryptor logPollerWrapper *evmrelay_mocks.LogPollerWrapper contractVersion uint32 @@ -130,9 +130,9 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe contractAddress := "0xa" - ingressClient := sync_mocks.NewTelemetryIngressClient(t) + ingressClient := sync_mocks.NewTelemetryService(t) ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) - monEndpoint := ingressAgent.GenMonitoringEndpoint(contractAddress, synchronization.FunctionsRequests) + monEndpoint := ingressAgent.GenMonitoringEndpoint(contractAddress, synchronization.FunctionsRequests, "test-network", "test-chainID") s4Storage := s4_mocks.NewStorage(t) client := chain.Client() @@ -289,8 +289,8 @@ func TestFunctionsListener_ReportSourceCodeDomains(t *testing.T) { }).Return(nil) var sentMessage []byte - uni.ingressClient.On("Send", mock.Anything).Return().Run(func(args mock.Arguments) { - sentMessage = args[0].(synchronization.TelemPayload).Telemetry + uni.ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) }) uni.service.HandleLog(log) diff --git a/core/services/ocr/delegate.go b/core/services/ocr/delegate.go index 9ed22d01e7..e1638bdb76 100644 --- a/core/services/ocr/delegate.go +++ b/core/services/ocr/delegate.go @@ -297,7 +297,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job enhancedTelemChan := make(chan ocrcommon.EnhancedTelemetryData, 100) if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) { - enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry")) + enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.EnhancedEA, "EVM", chain.ID().String()), lggr.Named("EnhancedTelemetry")) services = append(services, enhancedTelemService) } @@ -319,7 +319,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job Logger: ocrLogger, V1Bootstrappers: v1BootstrapPeers, V2Bootstrappers: v2Bootstrappers, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.OCR), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.OCR, "EVM", chain.ID().String()), ConfigOverrider: configOverrider, }) if err != nil { diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index e8c50610b3..c21ffd8971 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -560,7 +560,7 @@ func (d *Delegate) newServicesMercury( Database: ocrDB, LocalConfig: lc, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.OCR3Mercury), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.OCR3Mercury, rid.Network, rid.ChainID), OffchainConfigDigester: mercuryProvider.OffchainConfigDigester(), OffchainKeyring: kb, OnchainKeyring: kb, @@ -571,7 +571,7 @@ func (d *Delegate) newServicesMercury( mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&jb) { - enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) + enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetryMercury")) mercuryServices = append(mercuryServices, enhancedTelemService) } @@ -590,13 +590,19 @@ func (d *Delegate) newServicesMedian( ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec + + rid, err := spec.RelayID() + if err != nil { + return nil, fmt.Errorf("median services: %w: %w", ErrJobSpecNoRelayer, err) + } + oracleArgsNoPlugin := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, V2Bootstrappers: bootstrapPeers, Database: ocrDB, LocalConfig: lc, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Median), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Median, rid.Network, rid.ChainID), OffchainKeyring: kb, OnchainKeyring: kb, } @@ -604,10 +610,6 @@ func (d *Delegate) newServicesMedian( enhancedTelemChan := make(chan ocrcommon.EnhancedTelemetryData, 100) mConfig := median.NewMedianConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg) - rid, err := spec.RelayID() - if err != nil { - return nil, fmt.Errorf("median services: %w: %w", ErrJobSpecNoRelayer, err) - } relayer, err := d.RelayGetter.Get(rid) if err != nil { return nil, fmt.Errorf("median services; failed to get relay %s is it enabled?: %w", spec.Relay, err) @@ -616,7 +618,7 @@ func (d *Delegate) newServicesMedian( medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog) if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) { - enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry")) + enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.EnhancedEA, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetry")) medianServices = append(medianServices, enhancedTelemService) } @@ -837,7 +839,7 @@ func (d *Delegate) newServicesOCR2VRF( VRFContractTransmitter: vrfProvider.ContractTransmitter(), VRFDatabase: ocrDB, VRFLocalConfig: lc, - VRFMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2VRF), + VRFMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2VRF, rid.Network, rid.ChainID), DKGContractConfigTracker: dkgProvider.ContractConfigTracker(), DKGOffchainConfigDigester: dkgProvider.OffchainConfigDigester(), DKGContract: dkgpkg.NewOnchainContract(dkgContract, &altbn_128.G2{}), @@ -976,7 +978,7 @@ func (d *Delegate) newServicesOCR2Keepers21( ContractConfigTracker: keeperProvider.ContractConfigTracker(), KeepersDatabase: ocrDB, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation, rid.Network, rid.ChainID), OffchainConfigDigester: keeperProvider.OffchainConfigDigester(), OffchainKeyring: kb, OnchainKeyring: services.Keyring(), @@ -1121,7 +1123,7 @@ func (d *Delegate) newServicesOCR2Keepers20( KeepersDatabase: ocrDB, LocalConfig: lc, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation, rid.Network, rid.ChainID), OffchainConfigDigester: keeperProvider.OffchainConfigDigester(), OffchainKeyring: kb, OnchainKeyring: kb, @@ -1230,7 +1232,7 @@ func (d *Delegate) newServicesOCR2Functions( Database: functionsOcrDB, LocalConfig: lc, Logger: ocrLogger, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Functions), + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Functions, rid.Network, rid.ChainID), OffchainConfigDigester: functionsProvider.OffchainConfigDigester(), OffchainKeyring: kb, OnchainKeyring: kb, @@ -1294,7 +1296,7 @@ func (d *Delegate) newServicesOCR2Functions( ContractID: spec.ContractID, Logger: lggr, MailMon: d.mailMon, - URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.FunctionsRequests), + URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.FunctionsRequests, rid.Network, rid.ChainID), EthKeystore: d.ethKs, ThresholdKeyShare: thresholdKeyShare, LogPollerWrapper: functionsProvider.LogPollerWrapper(), diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index 495dc6fe78..88fb7de3e3 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -186,13 +186,13 @@ func TestGetJsonParsedValue(t *testing.T) { func TestSendEATelemetry(t *testing.T) { wg := sync.WaitGroup{} - ingressClient := mocks.NewTelemetryIngressClient(t) + ingressClient := mocks.NewTelemetryService(t) ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) - monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA, "test-network", "test-chainID") var sentMessage []byte - ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) { - sentMessage = args[0].(synchronization.TelemPayload).Telemetry + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) wg.Done() }) @@ -302,10 +302,10 @@ func TestGetObservation(t *testing.T) { func TestCollectAndSend(t *testing.T) { wg := sync.WaitGroup{} - ingressClient := mocks.NewTelemetryIngressClient(t) + ingressClient := mocks.NewTelemetryService(t) ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) - monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA) - ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) { + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA, "test-network", "test-chainID") + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { wg.Done() }) @@ -549,13 +549,13 @@ func TestGetAssetSymbolFromRequestData(t *testing.T) { func TestCollectMercuryEnhancedTelemetry(t *testing.T) { wg := sync.WaitGroup{} - ingressClient := mocks.NewTelemetryIngressClient(t) + ingressClient := mocks.NewTelemetryService(t) ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) - monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEAMercury) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEAMercury, "test-network", "test-chainID") var sentMessage []byte - ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) { - sentMessage = args[0].(synchronization.TelemPayload).Telemetry + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) wg.Done() }) diff --git a/core/services/synchronization/common.go b/core/services/synchronization/common.go index 86641dff17..9ae7d5a6c9 100644 --- a/core/services/synchronization/common.go +++ b/core/services/synchronization/common.go @@ -1,5 +1,11 @@ package synchronization +import ( + "context" + + "github.com/smartcontractkit/chainlink/v2/core/services" +) + // TelemetryType defines supported telemetry types type TelemetryType string @@ -18,3 +24,18 @@ const ( OCR2VRF TelemetryType = "ocr2-vrf" AutomationCustom TelemetryType = "automation-custom" ) + +type TelemPayload struct { + Telemetry []byte + TelemType TelemetryType + ContractID string +} + +// TelemetryService encapsulates all the functionality needed to +// send telemetry to the ingress server using wsrpc +// +//go:generate mockery --quiet --name TelemetryService --output ./mocks --case=underscore +type TelemetryService interface { + services.ServiceCtx + Send(ctx context.Context, telemetry []byte, contractID string, telemType TelemetryType) +} diff --git a/core/services/synchronization/helpers_test.go b/core/services/synchronization/helpers_test.go index da8f1bac5a..14aaf5a7a0 100644 --- a/core/services/synchronization/helpers_test.go +++ b/core/services/synchronization/helpers_test.go @@ -11,14 +11,14 @@ import ( ) // NewTestTelemetryIngressClient calls NewTelemetryIngressClient and injects telemClient. -func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient) TelemetryIngressClient { +func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient) TelemetryService { tc := NewTelemetryIngressClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100) tc.(*telemetryIngressClient).telemClient = telemClient return tc } // NewTestTelemetryIngressBatchClient calls NewTelemetryIngressBatchClient and injects telemClient. -func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration, uniconn bool) TelemetryIngressBatchClient { +func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration, uniconn bool) TelemetryService { tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second, uniconn) tc.(*telemetryIngressBatchClient).close = func() error { return nil } tc.(*telemetryIngressBatchClient).telemClient = telemClient diff --git a/core/services/synchronization/mocks/telemetry_ingress_batch_client.go b/core/services/synchronization/mocks/telemetry_ingress_batch_client.go deleted file mode 100644 index 4f992b0c79..0000000000 --- a/core/services/synchronization/mocks/telemetry_ingress_batch_client.go +++ /dev/null @@ -1,107 +0,0 @@ -// Code generated by mockery v2.28.1. DO NOT EDIT. - -package mocks - -import ( - context "context" - - synchronization "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" - mock "github.com/stretchr/testify/mock" -) - -// TelemetryIngressBatchClient is an autogenerated mock type for the TelemetryIngressBatchClient type -type TelemetryIngressBatchClient struct { - mock.Mock -} - -// Close provides a mock function with given fields: -func (_m *TelemetryIngressBatchClient) Close() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// HealthReport provides a mock function with given fields: -func (_m *TelemetryIngressBatchClient) HealthReport() map[string]error { - ret := _m.Called() - - var r0 map[string]error - if rf, ok := ret.Get(0).(func() map[string]error); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]error) - } - } - - return r0 -} - -// Name provides a mock function with given fields: -func (_m *TelemetryIngressBatchClient) Name() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// Ready provides a mock function with given fields: -func (_m *TelemetryIngressBatchClient) Ready() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Send provides a mock function with given fields: _a0 -func (_m *TelemetryIngressBatchClient) Send(_a0 synchronization.TelemPayload) { - _m.Called(_a0) -} - -// Start provides a mock function with given fields: _a0 -func (_m *TelemetryIngressBatchClient) Start(_a0 context.Context) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -type mockConstructorTestingTNewTelemetryIngressBatchClient interface { - mock.TestingT - Cleanup(func()) -} - -// NewTelemetryIngressBatchClient creates a new instance of TelemetryIngressBatchClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewTelemetryIngressBatchClient(t mockConstructorTestingTNewTelemetryIngressBatchClient) *TelemetryIngressBatchClient { - mock := &TelemetryIngressBatchClient{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/synchronization/mocks/telemetry_ingress_client.go b/core/services/synchronization/mocks/telemetry_service.go similarity index 59% rename from core/services/synchronization/mocks/telemetry_ingress_client.go rename to core/services/synchronization/mocks/telemetry_service.go index 42eb4616d0..0ab11bde23 100644 --- a/core/services/synchronization/mocks/telemetry_ingress_client.go +++ b/core/services/synchronization/mocks/telemetry_service.go @@ -9,13 +9,13 @@ import ( mock "github.com/stretchr/testify/mock" ) -// TelemetryIngressClient is an autogenerated mock type for the TelemetryIngressClient type -type TelemetryIngressClient struct { +// TelemetryService is an autogenerated mock type for the TelemetryService type +type TelemetryService struct { mock.Mock } // Close provides a mock function with given fields: -func (_m *TelemetryIngressClient) Close() error { +func (_m *TelemetryService) Close() error { ret := _m.Called() var r0 error @@ -29,7 +29,7 @@ func (_m *TelemetryIngressClient) Close() error { } // HealthReport provides a mock function with given fields: -func (_m *TelemetryIngressClient) HealthReport() map[string]error { +func (_m *TelemetryService) HealthReport() map[string]error { ret := _m.Called() var r0 map[string]error @@ -45,7 +45,7 @@ func (_m *TelemetryIngressClient) HealthReport() map[string]error { } // Name provides a mock function with given fields: -func (_m *TelemetryIngressClient) Name() string { +func (_m *TelemetryService) Name() string { ret := _m.Called() var r0 string @@ -59,7 +59,7 @@ func (_m *TelemetryIngressClient) Name() string { } // Ready provides a mock function with given fields: -func (_m *TelemetryIngressClient) Ready() error { +func (_m *TelemetryService) Ready() error { ret := _m.Called() var r0 error @@ -72,13 +72,13 @@ func (_m *TelemetryIngressClient) Ready() error { return r0 } -// Send provides a mock function with given fields: _a0 -func (_m *TelemetryIngressClient) Send(_a0 synchronization.TelemPayload) { - _m.Called(_a0) +// Send provides a mock function with given fields: ctx, telemetry, contractID, telemType +func (_m *TelemetryService) Send(ctx context.Context, telemetry []byte, contractID string, telemType synchronization.TelemetryType) { + _m.Called(ctx, telemetry, contractID, telemType) } // Start provides a mock function with given fields: _a0 -func (_m *TelemetryIngressClient) Start(_a0 context.Context) error { +func (_m *TelemetryService) Start(_a0 context.Context) error { ret := _m.Called(_a0) var r0 error @@ -91,14 +91,14 @@ func (_m *TelemetryIngressClient) Start(_a0 context.Context) error { return r0 } -type mockConstructorTestingTNewTelemetryIngressClient interface { +type mockConstructorTestingTNewTelemetryService interface { mock.TestingT Cleanup(func()) } -// NewTelemetryIngressClient creates a new instance of TelemetryIngressClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewTelemetryIngressClient(t mockConstructorTestingTNewTelemetryIngressClient) *TelemetryIngressClient { - mock := &TelemetryIngressClient{} +// NewTelemetryService creates a new instance of TelemetryService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTelemetryService(t mockConstructorTestingTNewTelemetryService) *TelemetryService { + mock := &TelemetryService{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/core/services/synchronization/telemetry_ingress_batch_client.go b/core/services/synchronization/telemetry_ingress_batch_client.go index ccafc32bc3..35815dac0c 100644 --- a/core/services/synchronization/telemetry_ingress_batch_client.go +++ b/core/services/synchronization/telemetry_ingress_batch_client.go @@ -13,21 +13,11 @@ import ( "github.com/smartcontractkit/wsrpc/examples/simple/keys" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" "github.com/smartcontractkit/chainlink/v2/core/utils" ) -//go:generate mockery --quiet --name TelemetryIngressBatchClient --output ./mocks --case=underscore - -// TelemetryIngressBatchClient encapsulates all the functionality needed to -// send telemetry to the ingress server using wsrpc -type TelemetryIngressBatchClient interface { - services.ServiceCtx - Send(TelemPayload) -} - // NoopTelemetryIngressBatchClient is a no-op interface for TelemetryIngressBatchClient type NoopTelemetryIngressBatchClient struct{} @@ -77,7 +67,7 @@ type telemetryIngressBatchClient struct { // NewTelemetryIngressBatchClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryIngressBatchClient { +func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryService { return &telemetryIngressBatchClient{ telemBufferSize: telemBufferSize, telemMaxBatchSize: telemMaxBatchSize, @@ -182,16 +172,22 @@ func (tc *telemetryIngressBatchClient) getCSAPrivateKey() (privkey []byte, err e // Send directs incoming telmetry messages to the worker responsible for pushing it to // the ingress server. If the worker telemetry buffer is full, messages are dropped // and a warning is logged. -func (tc *telemetryIngressBatchClient) Send(payload TelemPayload) { +func (tc *telemetryIngressBatchClient) Send(ctx context.Context, telemData []byte, contractID string, telemType TelemetryType) { if tc.useUniConn && !tc.connected.Load() { tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String()) return } + payload := TelemPayload{ + Telemetry: telemData, + TelemType: telemType, + ContractID: contractID, + } worker := tc.findOrCreateWorker(payload) + select { case worker.chTelemetry <- payload: worker.dropMessageCount.Store(0) - case <-payload.Ctx.Done(): + case <-ctx.Done(): return default: worker.logBufferFullWithExpBackoff(payload) diff --git a/core/services/synchronization/telemetry_ingress_batch_client_test.go b/core/services/synchronization/telemetry_ingress_batch_client_test.go index 81a3a950f3..6dd9d401a8 100644 --- a/core/services/synchronization/telemetry_ingress_batch_client_test.go +++ b/core/services/synchronization/telemetry_ingress_batch_client_test.go @@ -41,19 +41,16 @@ func TestTelemetryIngressBatchClient_HappyPath(t *testing.T) { // Create telemetry payloads for different contracts telemPayload1 := synchronization.TelemPayload{ - Ctx: testutils.Context(t), Telemetry: []byte("Mock telem 1"), ContractID: "0x1", TelemType: synchronization.OCR, } telemPayload2 := synchronization.TelemPayload{ - Ctx: testutils.Context(t), Telemetry: []byte("Mock telem 2"), ContractID: "0x2", TelemType: synchronization.OCR2VRF, } telemPayload3 := synchronization.TelemPayload{ - Ctx: testutils.Context(t), Telemetry: []byte("Mock telem 3"), ContractID: "0x3", TelemType: synchronization.OCR2Functions, @@ -90,13 +87,14 @@ func TestTelemetryIngressBatchClient_HappyPath(t *testing.T) { }) // Send telemetry - telemIngressClient.Send(telemPayload1) - telemIngressClient.Send(telemPayload2) - telemIngressClient.Send(telemPayload3) + testCtx := testutils.Context(t) + telemIngressClient.Send(testCtx, telemPayload1.Telemetry, telemPayload1.ContractID, telemPayload1.TelemType) + telemIngressClient.Send(testCtx, telemPayload2.Telemetry, telemPayload2.ContractID, telemPayload2.TelemType) + telemIngressClient.Send(testCtx, telemPayload3.Telemetry, telemPayload3.ContractID, telemPayload3.TelemType) time.Sleep(sendInterval * 2) - telemIngressClient.Send(telemPayload1) - telemIngressClient.Send(telemPayload1) - telemIngressClient.Send(telemPayload2) + telemIngressClient.Send(testCtx, telemPayload1.Telemetry, telemPayload1.ContractID, telemPayload1.TelemType) + telemIngressClient.Send(testCtx, telemPayload1.Telemetry, telemPayload1.ContractID, telemPayload1.TelemType) + telemIngressClient.Send(testCtx, telemPayload2.Telemetry, telemPayload2.ContractID, telemPayload2.TelemType) // Wait for the telemetry to be handled g.Eventually(func() []uint32 { diff --git a/core/services/synchronization/telemetry_ingress_batch_worker_test.go b/core/services/synchronization/telemetry_ingress_batch_worker_test.go index d81321b134..109022c713 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker_test.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/mocks" @@ -15,7 +14,6 @@ import ( func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) { telemPayload := synchronization.TelemPayload{ - Ctx: testutils.Context(t), Telemetry: []byte("Mock telemetry"), ContractID: "0xa", } diff --git a/core/services/synchronization/telemetry_ingress_client.go b/core/services/synchronization/telemetry_ingress_client.go index d5b0ed7d93..ea62ab68ec 100644 --- a/core/services/synchronization/telemetry_ingress_client.go +++ b/core/services/synchronization/telemetry_ingress_client.go @@ -12,23 +12,11 @@ import ( "github.com/smartcontractkit/wsrpc/examples/simple/keys" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" telemPb "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" "github.com/smartcontractkit/chainlink/v2/core/utils" ) -//go:generate mockery --quiet --dir ./telem --name TelemClient --output ./mocks/ --case=underscore - -//go:generate mockery --quiet --name TelemetryIngressClient --output ./mocks --case=underscore - -// TelemetryIngressClient encapsulates all the functionality needed to -// send telemetry to the ingress server using wsrpc -type TelemetryIngressClient interface { - services.ServiceCtx - Send(TelemPayload) -} - type NoopTelemetryIngressClient struct{} // Start is a no-op @@ -38,7 +26,7 @@ func (NoopTelemetryIngressClient) Start(context.Context) error { return nil } func (NoopTelemetryIngressClient) Close() error { return nil } // Send is a no-op -func (NoopTelemetryIngressClient) Send(TelemPayload) {} +func (NoopTelemetryIngressClient) Send(context.Context, TelemPayload) {} func (NoopTelemetryIngressClient) HealthReport() map[string]error { return map[string]error{} } func (NoopTelemetryIngressClient) Name() string { return "NoopTelemetryIngressClient" } @@ -62,16 +50,9 @@ type telemetryIngressClient struct { chTelemetry chan TelemPayload } -type TelemPayload struct { - Ctx context.Context - Telemetry []byte - TelemType TelemetryType - ContractID string -} - // NewTelemetryIngressClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryIngressClient { +func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryService { return &telemetryIngressClient{ url: url, ks: ks, @@ -150,6 +131,8 @@ func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []b func (tc *telemetryIngressClient) handleTelemetry() { go func() { + ctx, cancel := utils.StopChan(tc.chDone).NewCtx() + defer cancel() for { select { case p := <-tc.chTelemetry: @@ -160,7 +143,7 @@ func (tc *telemetryIngressClient) handleTelemetry() { TelemetryType: string(p.TelemType), SentAt: time.Now().UnixNano(), } - _, err := tc.telemClient.Telem(p.Ctx, telemReq) + _, err := tc.telemClient.Telem(ctx, telemReq) if err != nil { tc.lggr.Errorf("Could not send telemetry: %v", err) continue @@ -211,11 +194,17 @@ func (tc *telemetryIngressClient) getCSAPrivateKey() (privkey []byte, err error) // Send sends telemetry to the ingress server using wsrpc if the client is ready. // Also stores telemetry in a small buffer in case of backpressure from wsrpc, // throwing away messages once buffer is full -func (tc *telemetryIngressClient) Send(payload TelemPayload) { +func (tc *telemetryIngressClient) Send(ctx context.Context, telemData []byte, contractID string, telemType TelemetryType) { + payload := TelemPayload{ + Telemetry: telemData, + TelemType: telemType, + ContractID: contractID, + } + select { case tc.chTelemetry <- payload: tc.dropMessageCount.Store(0) - case <-payload.Ctx.Done(): + case <-ctx.Done(): return default: tc.logBufferFullWithExpBackoff(payload) diff --git a/core/services/synchronization/telemetry_ingress_client_test.go b/core/services/synchronization/telemetry_ingress_client_test.go index 83c187baaa..5a0cc23ecd 100644 --- a/core/services/synchronization/telemetry_ingress_client_test.go +++ b/core/services/synchronization/telemetry_ingress_client_test.go @@ -42,7 +42,6 @@ func TestTelemetryIngressClient_Send_HappyPath(t *testing.T) { telemetry := []byte("101010") address := common.HexToAddress("0xa") telemPayload := synchronization.TelemPayload{ - Ctx: testutils.Context(t), Telemetry: telemetry, ContractID: address.String(), TelemType: synchronization.OCR, @@ -60,7 +59,7 @@ func TestTelemetryIngressClient_Send_HappyPath(t *testing.T) { }) // Send telemetry - telemIngressClient.Send(telemPayload) + telemIngressClient.Send(testutils.Context(t), telemPayload.Telemetry, telemPayload.ContractID, telemPayload.TelemType) // Wait for the telemetry to be handled gomega.NewWithT(t).Eventually(called.Load).Should(gomega.BeTrue()) diff --git a/core/services/telemetry/common.go b/core/services/telemetry/common.go index cfbd519130..5a3f6706f7 100644 --- a/core/services/telemetry/common.go +++ b/core/services/telemetry/common.go @@ -7,5 +7,5 @@ import ( ) type MonitoringEndpointGenerator interface { - GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint + GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint } diff --git a/core/services/telemetry/ingress.go b/core/services/telemetry/ingress.go index a640b4e2a2..637fa0dd3b 100644 --- a/core/services/telemetry/ingress.go +++ b/core/services/telemetry/ingress.go @@ -11,38 +11,36 @@ import ( var _ MonitoringEndpointGenerator = &IngressAgentWrapper{} type IngressAgentWrapper struct { - telemetryIngressClient synchronization.TelemetryIngressClient + telemetryIngressClient synchronization.TelemetryService } -func NewIngressAgentWrapper(telemetryIngressClient synchronization.TelemetryIngressClient) *IngressAgentWrapper { +func NewIngressAgentWrapper(telemetryIngressClient synchronization.TelemetryService) *IngressAgentWrapper { return &IngressAgentWrapper{telemetryIngressClient} } -func (t *IngressAgentWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint { - return NewIngressAgent(t.telemetryIngressClient, contractID, telemType) +func (t *IngressAgentWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint { + return NewIngressAgent(t.telemetryIngressClient, contractID, telemType, network, chainID) } type IngressAgent struct { - telemetryIngressClient synchronization.TelemetryIngressClient + telemetryIngressClient synchronization.TelemetryService contractID string telemType synchronization.TelemetryType + network string + chainID string } -func NewIngressAgent(telemetryIngressClient synchronization.TelemetryIngressClient, contractID string, telemType synchronization.TelemetryType) *IngressAgent { +func NewIngressAgent(telemetryIngressClient synchronization.TelemetryService, contractID string, telemType synchronization.TelemetryType, network string, chainID string) *IngressAgent { return &IngressAgent{ telemetryIngressClient, contractID, telemType, + network, + chainID, } } // SendLog sends a telemetry log to the ingress server func (t *IngressAgent) SendLog(telemetry []byte) { - payload := synchronization.TelemPayload{ - Ctx: context.Background(), - Telemetry: telemetry, - ContractID: t.contractID, - TelemType: t.telemType, - } - t.telemetryIngressClient.Send(payload) + t.telemetryIngressClient.Send(context.Background(), telemetry, t.contractID, t.telemType) } diff --git a/core/services/telemetry/ingress_batch.go b/core/services/telemetry/ingress_batch.go index 0cc100e0b8..df86085359 100644 --- a/core/services/telemetry/ingress_batch.go +++ b/core/services/telemetry/ingress_batch.go @@ -12,42 +12,40 @@ var _ MonitoringEndpointGenerator = &IngressAgentBatchWrapper{} // IngressAgentBatchWrapper provides monitoring endpoint generation for the telemetry batch client type IngressAgentBatchWrapper struct { - telemetryIngressBatchClient synchronization.TelemetryIngressBatchClient + telemetryIngressBatchClient synchronization.TelemetryService } // NewIngressAgentBatchWrapper creates a new IngressAgentBatchWrapper with the provided telemetry batch client -func NewIngressAgentBatchWrapper(telemetryIngressBatchClient synchronization.TelemetryIngressBatchClient) *IngressAgentBatchWrapper { +func NewIngressAgentBatchWrapper(telemetryIngressBatchClient synchronization.TelemetryService) *IngressAgentBatchWrapper { return &IngressAgentBatchWrapper{telemetryIngressBatchClient} } // GenMonitoringEndpoint returns a new ingress batch agent instantiated with the batch client and a contractID -func (t *IngressAgentBatchWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint { - return NewIngressAgentBatch(t.telemetryIngressBatchClient, contractID, telemType) +func (t *IngressAgentBatchWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint { + return NewIngressAgentBatch(t.telemetryIngressBatchClient, contractID, telemType, network, chainID) } // IngressAgentBatch allows for sending batch telemetry for a given contractID type IngressAgentBatch struct { - telemetryIngressBatchClient synchronization.TelemetryIngressBatchClient + telemetryIngressBatchClient synchronization.TelemetryService contractID string telemType synchronization.TelemetryType + network string + chainID string } // NewIngressAgentBatch creates a new IngressAgentBatch with the given batch client and contractID -func NewIngressAgentBatch(telemetryIngressBatchClient synchronization.TelemetryIngressBatchClient, contractID string, telemType synchronization.TelemetryType) *IngressAgentBatch { +func NewIngressAgentBatch(telemetryIngressBatchClient synchronization.TelemetryService, contractID string, telemType synchronization.TelemetryType, network string, chainID string) *IngressAgentBatch { return &IngressAgentBatch{ telemetryIngressBatchClient, contractID, telemType, + network, + chainID, } } // SendLog sends a telemetry log to the ingress server func (t *IngressAgentBatch) SendLog(telemetry []byte) { - payload := synchronization.TelemPayload{ - Ctx: context.Background(), - Telemetry: telemetry, - ContractID: t.contractID, - TelemType: t.telemType, - } - t.telemetryIngressBatchClient.Send(payload) + t.telemetryIngressBatchClient.Send(context.Background(), telemetry, t.contractID, t.telemType) } diff --git a/core/services/telemetry/ingress_batch_test.go b/core/services/telemetry/ingress_batch_test.go index 70972ecd5a..3923b569fe 100644 --- a/core/services/telemetry/ingress_batch_test.go +++ b/core/services/telemetry/ingress_batch_test.go @@ -12,14 +12,18 @@ import ( ) func TestIngressAgentBatch(t *testing.T) { - telemetryBatchClient := mocks.NewTelemetryIngressBatchClient(t) + telemetryBatchClient := mocks.NewTelemetryService(t) ingressAgentBatch := telemetry.NewIngressAgentWrapper(telemetryBatchClient) - monitoringEndpoint := ingressAgentBatch.GenMonitoringEndpoint("0xa", synchronization.OCR) + monitoringEndpoint := ingressAgentBatch.GenMonitoringEndpoint("0xa", synchronization.OCR, "test-network", "test-chainID") // Handle the Send call and store the telem var telemPayload synchronization.TelemPayload - telemetryBatchClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) { - telemPayload = args[0].(synchronization.TelemPayload) + telemetryBatchClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + telemPayload = synchronization.TelemPayload{ + Telemetry: args[1].([]byte), + ContractID: args[2].(string), + TelemType: args[3].(synchronization.TelemetryType), + } }) // Send the log to the monitoring endpoint diff --git a/core/services/telemetry/ingress_test.go b/core/services/telemetry/ingress_test.go index 7531a56798..31028f2f60 100644 --- a/core/services/telemetry/ingress_test.go +++ b/core/services/telemetry/ingress_test.go @@ -12,14 +12,18 @@ import ( ) func TestIngressAgent(t *testing.T) { - telemetryClient := mocks.NewTelemetryIngressClient(t) + telemetryClient := mocks.NewTelemetryService(t) ingressAgent := telemetry.NewIngressAgentWrapper(telemetryClient) - monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.OCR) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.OCR, "test-network", "test-chainID") // Handle the Send call and store the telem var telemPayload synchronization.TelemPayload - telemetryClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) { - telemPayload = args[0].(synchronization.TelemPayload) + telemetryClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + telemPayload = synchronization.TelemPayload{ + Telemetry: args[1].([]byte), + ContractID: args[2].(string), + TelemType: args[3].(synchronization.TelemetryType), + } }) // Send the log to the monitoring endpoint diff --git a/core/services/telemetry/manager.go b/core/services/telemetry/manager.go new file mode 100644 index 0000000000..3818341f5b --- /dev/null +++ b/core/services/telemetry/manager.go @@ -0,0 +1,221 @@ +package telemetry + +import ( + "context" + "fmt" + "net/url" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/smartcontractkit/libocr/commontypes" + "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +//// Client encapsulates all the functionality needed to +//// send telemetry to the ingress server using wsrpc +//type Client interface { +// services.ServiceCtx +// Send(context.Context, synchronization.TelemPayload) +//} + +type Manager struct { + utils.StartStopOnce + bufferSize uint + endpoints []*telemetryEndpoint + ks keystore.CSA + lggr logger.Logger + logging bool + maxBatchSize uint + sendInterval time.Duration + sendTimeout time.Duration + uniConn bool + useBatchSend bool + MonitoringEndpointGenerator MonitoringEndpointGenerator + + //legacyMode means that we are sending all telemetry to a single endpoint. + //In order for this to be set as true, we need to have no endpoints defined with TelemetryIngress.URL and TelemetryIngress.ServerPubKey set. + //This mode will be supported until we completely switch to TelemetryIngress.Endpoints in config.toml + legacyMode bool +} + +type legacyEndpointConfig struct { + Url *url.URL + PubKey string +} + +func (l *legacyEndpointConfig) Network() string { + return "-" +} + +func (l *legacyEndpointConfig) ChainID() string { + return "-" +} + +func (l *legacyEndpointConfig) ServerPubKey() string { + return l.PubKey +} + +func (l *legacyEndpointConfig) URL() *url.URL { + return l.Url +} + +type telemetryEndpoint struct { + utils.StartStopOnce + ChainID string + Network string + URL *url.URL + client synchronization.TelemetryService + PubKey string +} + +// NewManager create a new telemetry manager that is responsible for configuring telemetry agents and generating the defined telemetry endpoints and monitoring endpoints +func NewManager(cfg config.TelemetryIngress, csaKeyStore keystore.CSA, lggr logger.Logger) *Manager { + m := &Manager{ + bufferSize: cfg.BufferSize(), + endpoints: nil, + ks: csaKeyStore, + lggr: lggr.Named("TelemetryManager"), + logging: cfg.Logging(), + maxBatchSize: cfg.MaxBatchSize(), + sendInterval: cfg.SendInterval(), + sendTimeout: cfg.SendTimeout(), + uniConn: cfg.UniConn(), + useBatchSend: cfg.UseBatchSend(), + legacyMode: false, + } + for _, e := range cfg.Endpoints() { + if err := m.addEndpoint(e); err != nil { + m.lggr.Error(err) + } + } + + if len(cfg.Endpoints()) == 0 && cfg.URL() != nil && cfg.ServerPubKey() != "" { + m.lggr.Error(`TelemetryIngress.URL and TelemetryIngress.ServerPubKey will be removed in a future version, please switch to TelemetryIngress.Endpoints: + [[TelemetryIngress.Endpoints]] + Network = '...' # e.g. EVM. Solana, Starknet, Cosmos + ChainID = '...' # e.g. 1, 5, devnet, mainnet-beta + URL = '...' + ServerPubKey = '...'`) + m.legacyMode = true + if err := m.addEndpoint(&legacyEndpointConfig{ + Url: cfg.URL(), + PubKey: cfg.ServerPubKey(), + }); err != nil { + m.lggr.Error(err) + } + } + + return m +} + +func (m *Manager) Start(ctx context.Context) error { + return m.StartOnce("TelemetryManager", func() error { + var err error + for _, e := range m.endpoints { + err = multierr.Append(err, e.client.Start(ctx)) + } + return err + }) +} +func (m *Manager) Close() error { + return m.StopOnce("TelemetryManager", func() error { + var err error + for _, e := range m.endpoints { + err = multierr.Append(err, e.client.Close()) + } + return err + }) +} + +func (m *Manager) Name() string { + return m.lggr.Name() +} + +func (m *Manager) HealthReport() map[string]error { + hr := make(map[string]error) + hr[m.lggr.Name()] = m.Healthy() + for _, e := range m.endpoints { + name := fmt.Sprintf("%s.%s.%s", m.lggr.Name(), e.Network, e.ChainID) + hr[name] = e.StartStopOnce.Healthy() + } + return hr +} + +// GenMonitoringEndpoint creates a new monitoring endpoints based on the existing available endpoints defined in the core config TOML, if no endpoint for the network and chainID exists, a NOOP agent will be used and the telemetry will not be sent +func (m *Manager) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) commontypes.MonitoringEndpoint { + + e, found := m.getEndpoint(network, chainID) + + if !found { + m.lggr.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID) + return &NoopAgent{} + } + + if m.useBatchSend { + return NewIngressAgentBatch(e.client, contractID, telemType, network, chainID) + } + + return NewIngressAgent(e.client, contractID, telemType, network, chainID) + +} + +func (m *Manager) addEndpoint(e config.TelemetryIngressEndpoint) error { + if e.Network() == "" && !m.legacyMode { + return errors.New("cannot add telemetry endpoint, network cannot be empty") + } + + if e.ChainID() == "" && !m.legacyMode { + return errors.New("cannot add telemetry endpoint, chainID cannot be empty") + } + + if e.URL() == nil { + return errors.New("cannot add telemetry endpoint, URL cannot be empty") + } + + if e.ServerPubKey() == "" { + return errors.New("cannot add telemetry endpoint, ServerPubKey cannot be empty") + } + + if _, found := m.getEndpoint(e.Network(), e.ChainID()); found { + return errors.Errorf("cannot add telemetry endpoint for network %q and chainID %q, endpoint already exists", e.Network(), e.ChainID()) + } + + var tClient synchronization.TelemetryService + if m.useBatchSend { + tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, m.maxBatchSize, m.sendInterval, m.sendTimeout, m.uniConn) + } else { + tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize) + } + + te := telemetryEndpoint{ + Network: strings.ToUpper(e.Network()), + ChainID: strings.ToUpper(e.ChainID()), + URL: e.URL(), + PubKey: e.ServerPubKey(), + client: tClient, + } + + m.endpoints = append(m.endpoints, &te) + return nil +} + +func (m *Manager) getEndpoint(network string, chainID string) (*telemetryEndpoint, bool) { + //in legacy mode we send telemetry to a single endpoint + if m.legacyMode && len(m.endpoints) == 1 { + return m.endpoints[0], true + } + + for _, e := range m.endpoints { + if e.Network == strings.ToUpper(network) && e.ChainID == strings.ToUpper(chainID) { + return e, true + } + } + return nil, false +} diff --git a/core/services/telemetry/manager_test.go b/core/services/telemetry/manager_test.go new file mode 100644 index 0000000000..93527f7043 --- /dev/null +++ b/core/services/telemetry/manager_test.go @@ -0,0 +1,341 @@ +package telemetry + +import ( + "context" + "fmt" + "math/big" + "net/url" + "reflect" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/config/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" + mocks3 "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + mocks2 "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/mocks" + "github.com/smartcontractkit/chainlink/v2/core/store/models" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +func setupMockConfig(t *testing.T, useBatchSend bool) *mocks.TelemetryIngress { + tic := mocks.NewTelemetryIngress(t) + tic.On("BufferSize").Return(uint(123)) + tic.On("Logging").Return(true) + tic.On("MaxBatchSize").Return(uint(51)) + tic.On("SendInterval").Return(time.Millisecond * 512) + tic.On("SendTimeout").Return(time.Second * 7) + tic.On("UniConn").Return(true) + tic.On("UseBatchSend").Return(useBatchSend) + + return tic +} + +func TestManagerAgents(t *testing.T) { + tic := setupMockConfig(t, true) + te := mocks.NewTelemetryIngressEndpoint(t) + te.On("Network").Return("network-1") + te.On("ChainID").Return("network-1-chainID-1") + te.On("ServerPubKey").Return("some-pubkey") + u, _ := url.Parse("http://some-url.test") + te.On("URL").Return(u) + tic.On("Endpoints").Return([]config.TelemetryIngressEndpoint{te}) + + lggr, _ := logger.TestLoggerObserved(t, zapcore.InfoLevel) + + ks := mocks3.NewCSA(t) + + tm := NewManager(tic, ks, lggr) + require.Equal(t, "*synchronization.telemetryIngressBatchClient", reflect.TypeOf(tm.endpoints[0].client).String()) + me := tm.GenMonitoringEndpoint("", "", "network-1", "network-1-chainID-1") + require.Equal(t, "*telemetry.IngressAgentBatch", reflect.TypeOf(me).String()) + + tic = setupMockConfig(t, false) + tic.On("Endpoints").Return([]config.TelemetryIngressEndpoint{te}) + tm = NewManager(tic, ks, lggr) + require.Equal(t, "*synchronization.telemetryIngressClient", reflect.TypeOf(tm.endpoints[0].client).String()) + me = tm.GenMonitoringEndpoint("", "", "network-1", "network-1-chainID-1") + require.Equal(t, "*telemetry.IngressAgent", reflect.TypeOf(me).String()) +} + +func TestNewManager(t *testing.T) { + + type endpointTest struct { + network string + chainID string + url string + pubKey string + shouldError bool + expectedError string + } + + endpoints := []endpointTest{ + { + network: "NETWORK-1", + chainID: "NETWORK-1-CHAINID-1", + url: "http://network-1-chainID-1.test", + pubKey: "network-1-chainID-1-pub-key", + shouldError: false, + }, + { + network: "NETWORK-1", + chainID: "NETWORK-1-CHAINID-2", + url: "http://network-1-chainID-2.test", + pubKey: "network-1-chainID-2-pub-key", + shouldError: false, + }, + { + network: "NETWORK-2", + chainID: "NETWORK-2-CHAINID-1", + url: "http://network-2-chainID-1.test", + pubKey: "network-2-chainID-1-pub-key", + shouldError: false, + }, + { + shouldError: true, + expectedError: "network cannot be empty", + }, + { + network: "ERROR", + shouldError: true, + expectedError: "chainID cannot be empty", + }, + { + network: "ERROR", + chainID: "ERROR", + shouldError: true, + expectedError: "URL cannot be empty", + }, + { + network: "ERROR", + chainID: "ERROR", + url: "http://error.test", + shouldError: true, + expectedError: "cannot add telemetry endpoint, ServerPubKey cannot be empty", + }, + { + network: "NETWORK-1", + chainID: "NETWORK-1-CHAINID-1", + url: "http://network-1-chainID-1.test", + pubKey: "network-1-chainID-1-pub-key", + shouldError: true, + expectedError: "endpoint already exists", + }, + } + + var mockEndpoints []config.TelemetryIngressEndpoint + + for _, e := range endpoints { + te := mocks.NewTelemetryIngressEndpoint(t) + te.On("Network").Maybe().Return(e.network) + te.On("ChainID").Maybe().Return(e.chainID) + te.On("ServerPubKey").Maybe().Return(e.pubKey) + + u, _ := url.Parse(e.url) + if e.url == "" { + u = nil + } + te.On("URL").Maybe().Return(u) + mockEndpoints = append(mockEndpoints, te) + } + + tic := setupMockConfig(t, true) + tic.On("Endpoints").Return(mockEndpoints) + + lggr, logObs := logger.TestLoggerObserved(t, zapcore.InfoLevel) + + ks := mocks3.NewCSA(t) + + ks.On("GetAll").Return([]csakey.KeyV2{csakey.MustNewV2XXXTestingOnly(big.NewInt(0))}, nil) + + m := NewManager(tic, ks, lggr) + + require.Equal(t, uint(123), m.bufferSize) + require.Equal(t, ks, m.ks) + require.Equal(t, "TelemetryManager", m.lggr.Name()) + require.Equal(t, true, m.logging) + require.Equal(t, uint(51), m.maxBatchSize) + require.Equal(t, time.Millisecond*512, m.sendInterval) + require.Equal(t, time.Second*7, m.sendTimeout) + require.Equal(t, true, m.uniConn) + require.Equal(t, true, m.useBatchSend) + + logs := logObs.TakeAll() + for i, e := range endpoints { + if !e.shouldError { + require.Equal(t, e.network, m.endpoints[i].Network) + require.Equal(t, e.chainID, m.endpoints[i].ChainID) + require.Equal(t, e.pubKey, m.endpoints[i].PubKey) + require.Equal(t, e.url, m.endpoints[i].URL.String()) + } else { + found := false + for _, l := range logs { + if strings.Contains(l.Message, e.expectedError) { + found = true + } + } + require.Equal(t, true, found, "cannot find log: %s", e.expectedError) + } + + } + + require.Equal(t, "TelemetryManager", m.Name()) + + require.Nil(t, m.Start(context.Background())) + testutils.WaitForLogMessageCount(t, logObs, "error connecting error while dialing dial tcp", 3) + + hr := m.HealthReport() + require.Equal(t, 4, len(hr)) + require.Nil(t, m.Close()) + time.Sleep(time.Second * 1) +} + +func TestCorrectEndpointRouting(t *testing.T) { + tic := setupMockConfig(t, true) + tic.On("Endpoints").Return(nil) + tic.On("URL").Return(nil) + + lggr, obsLogs := logger.TestLoggerObserved(t, zapcore.InfoLevel) + ks := mocks3.NewCSA(t) + + tm := NewManager(tic, ks, lggr) + + type testEndpoint struct { + network string + chainID string + } + + testEndpoints := []testEndpoint{ + { + network: "NETWORK-1", + chainID: "NETWORK-1-CHAINID-1", + }, + { + network: "NETWORK-1", + chainID: "NETWORK-1-CHAINID-2", + }, + { + network: "NETWORK-2", + chainID: "NETWORK-2-CHAINID-1", + }, + { + network: "NETWORK-2", + chainID: "NETWORK-2-CHAINID-2", + }, + } + + tm.endpoints = make([]*telemetryEndpoint, len(testEndpoints)) + clientSent := make([]synchronization.TelemPayload, 0) + for i, e := range testEndpoints { + clientMock := mocks2.NewTelemetryService(t) + clientMock.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + clientSent = append(clientSent, synchronization.TelemPayload{ + Telemetry: args[1].([]byte), + ContractID: args[2].(string), + TelemType: args[3].(synchronization.TelemetryType), + }) + }) + + tm.endpoints[i] = &telemetryEndpoint{ + StartStopOnce: utils.StartStopOnce{}, + ChainID: e.chainID, + Network: e.network, + client: clientMock, + } + + } + //Unknown networks or chainID + noopEndpoint := tm.GenMonitoringEndpoint("some-contractID", "some-type", "unknown-network", "unknown-chainID") + require.Equal(t, "*telemetry.NoopAgent", reflect.TypeOf(noopEndpoint).String()) + require.Equal(t, 1, obsLogs.Len()) + require.Contains(t, obsLogs.TakeAll()[0].Message, "no telemetry endpoint found") + + noopEndpoint = tm.GenMonitoringEndpoint("some-contractID", "some-type", "network-1", "unknown-chainID") + require.Equal(t, "*telemetry.NoopAgent", reflect.TypeOf(noopEndpoint).String()) + require.Equal(t, 1, obsLogs.Len()) + require.Contains(t, obsLogs.TakeAll()[0].Message, "no telemetry endpoint found") + + noopEndpoint = tm.GenMonitoringEndpoint("some-contractID", "some-type", "network-2", "network-1-chainID-1") + require.Equal(t, "*telemetry.NoopAgent", reflect.TypeOf(noopEndpoint).String()) + require.Equal(t, 1, obsLogs.Len()) + require.Contains(t, obsLogs.TakeAll()[0].Message, "no telemetry endpoint found") + + //Known networks and chainID + for i, e := range testEndpoints { + telemType := fmt.Sprintf("TelemType_%s", e.chainID) + contractID := fmt.Sprintf("contractID_%s", e.chainID) + me := tm.GenMonitoringEndpoint( + contractID, + synchronization.TelemetryType(telemType), + e.network, + e.chainID, + ) + me.SendLog([]byte(e.chainID)) + require.Equal(t, 0, obsLogs.Len()) + + require.Equal(t, i+1, len(clientSent)) + require.Equal(t, contractID, clientSent[i].ContractID) + require.Equal(t, telemType, string(clientSent[i].TelemType)) + require.Equal(t, []byte(e.chainID), clientSent[i].Telemetry) + } + +} + +func TestLegacyMode(t *testing.T) { + tic := setupMockConfig(t, true) + tic.On("Endpoints").Return(nil) + url, err := models.ParseURL("test.test") + require.NoError(t, err) + tic.On("URL").Return(url.URL()) + tic.On("ServerPubKey").Return("some-pub-key") + + lggr, obsLogs := logger.TestLoggerObserved(t, zapcore.InfoLevel) + ks := mocks3.NewCSA(t) + + tm := NewManager(tic, ks, lggr) + require.Equal(t, true, tm.legacyMode) + require.Len(t, tm.endpoints, 1) + + clientSent := make([]synchronization.TelemPayload, 0) + clientMock := mocks2.NewTelemetryService(t) + clientMock.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + clientSent = append(clientSent, synchronization.TelemPayload{ + Telemetry: args[1].([]byte), + ContractID: args[2].(string), + TelemType: args[3].(synchronization.TelemetryType), + }) + }) + tm.endpoints[0].client = clientMock + + e := tm.GenMonitoringEndpoint("some-contractID", "some-type", "unknown-network", "unknown-chainID") + require.Equal(t, "*telemetry.IngressAgentBatch", reflect.TypeOf(e).String()) + + e.SendLog([]byte("endpoint-1-message-1")) + e.SendLog([]byte("endpoint-1-message-2")) + e.SendLog([]byte("endpoint-1-message-3")) + require.Len(t, clientSent, 3) + + e2 := tm.GenMonitoringEndpoint("another-contractID", "another-type", "another-unknown-network", "another-unknown-chainID") + require.Equal(t, "*telemetry.IngressAgentBatch", reflect.TypeOf(e).String()) + + e2.SendLog([]byte("endpoint-2-message-1")) + e2.SendLog([]byte("endpoint-2-message-2")) + e2.SendLog([]byte("endpoint-2-message-3")) + require.Len(t, clientSent, 6) + + require.Equal(t, 1, obsLogs.Len()) // Deprecation warning for TelemetryIngress.URL and TelemetryIngress.ServerPubKey + require.Equal(t, []byte("endpoint-1-message-1"), clientSent[0].Telemetry) + require.Equal(t, []byte("endpoint-1-message-2"), clientSent[1].Telemetry) + require.Equal(t, []byte("endpoint-1-message-3"), clientSent[2].Telemetry) + require.Equal(t, []byte("endpoint-2-message-1"), clientSent[3].Telemetry) + require.Equal(t, []byte("endpoint-2-message-2"), clientSent[4].Telemetry) + require.Equal(t, []byte("endpoint-2-message-3"), clientSent[5].Telemetry) +} diff --git a/core/services/telemetry/noop.go b/core/services/telemetry/noop.go index 71670f2a19..cbeb038708 100644 --- a/core/services/telemetry/noop.go +++ b/core/services/telemetry/noop.go @@ -16,6 +16,6 @@ func (t *NoopAgent) SendLog(log []byte) { } // GenMonitoringEndpoint creates a monitoring endpoint for telemetry -func (t *NoopAgent) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint { +func (t *NoopAgent) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint { return t } diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index 91383c5fad..ed42a038a1 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -36,13 +36,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index accdbe180c..b3a186c7c9 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -36,13 +36,19 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = true -ServerPubKey = 'test-pub-key' -URL = 'https://prom.test' BufferSize = 1234 MaxBatchSize = 4321 SendInterval = '1m0s' SendTimeout = '5s' UseBatchSend = true +URL = '' +ServerPubKey = '' + +[[TelemetryIngress.Endpoints]] +Network = 'EVM' +ChainID = '1' +URL = 'endpoint-1.test' +ServerPubKey = 'test-pub-key-1' [AuditLogger] Enabled = true diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 2756c6f65b..47e5f008b4 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -36,13 +36,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = true diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 64bace1993..3d920ecde5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,7 +9,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [dev] -... +### Added + +- Added new configuration field named `LeaseDuration` for `EVM.NodePool` that will periodically check if internal subscriptions are connected to the "best" (as defined by the `SelectionMode`) node and switch to it if necessary. Setting this value to `0s` will disable this feature. +- Added multichain telemetry support. Each network/chainID pair must be configured using the new fields: +```toml +[[TelemetryIngress.Endpoints]] +Network = '...' # e.g. EVM. Solana, Starknet, Cosmos +ChainID = '...' # e.g. 1, 5, devnet, mainnet-beta +URL = '...' +ServerPubKey = '...' +``` +These will eventually replace `TelemetryIngress.URL` and `TelemetryIngress.ServerPubKey`. Setting `TelemetryIngress.URL` and `TelemetryIngress.ServerPubKey` alongside `[[TelemetryIngress.Endpoints]]` will prevent the node from booting. Only one way of configuring telemetry endpoints is supported. + +### Upcoming Required Configuration Change + +- Starting in 2.8.0, chainlink nodes will no longer allow `TelemetryIngress.URL` and `TelemetryIngress.ServerPubKey`. Any TOML configuration that sets this fields will prevent the node from booting. These fields will be replaced by `[[TelemetryIngress.Endpoints]]` + +### Removed + +- Removed the ability to set a next nonce value for an address through CLI ## 2.6.0 - UNRELEASED diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 5928287f36..5d16199e8a 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -277,13 +277,13 @@ Logging toggles verbose logging of the raw telemetry messages being sent. ```toml ServerPubKey = 'test-pub-key' # Example ``` -ServerPubKey is the public key of the telemetry server. +ServerPubKey is the public key of the telemetry server. This field will be removed in a furture version ### URL ```toml URL = 'https://prom.test' # Example ``` -URL is where to send telemetry. +URL is where to send telemetry. This field will be removed in a furture version ### BufferSize ```toml @@ -315,6 +315,40 @@ UseBatchSend = true # Default ``` UseBatchSend toggles sending telemetry to the ingress server using the batch client. +## TelemetryIngress.Endpoints +```toml +[[TelemetryIngress.Endpoints]] # Example +Network = 'EVM' # Example +ChainID = '111551111' # Example +ServerPubKey = 'test-pub-key-111551111-evm' # Example +URL = 'localhost-111551111-evm:9000' # Example +``` + + +### Network +```toml +Network = 'EVM' # Example +``` +Network aka EVM, Solana, Starknet + +### ChainID +```toml +ChainID = '111551111' # Example +``` +ChainID of the network + +### ServerPubKey +```toml +ServerPubKey = 'test-pub-key-111551111-evm' # Example +``` +ServerPubKey is the public key of the telemetry server. + +### URL +```toml +URL = 'localhost-111551111-evm:9000' # Example +``` +URL is where to send telemetry. + ## AuditLogger ```toml [AuditLogger] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 969785a7b5..9d56fdd259 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -48,13 +48,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index aa03e89744..c5dfc593c2 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -92,13 +92,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 63a825951c..49feeda14b 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -92,13 +92,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index cbc342b783..eff430b409 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -92,13 +92,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index d6f832ccae..899e106e2e 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -82,13 +82,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index 5bb13732bb..be53863d13 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -89,13 +89,13 @@ LeaseRefreshInterval = '1s' [TelemetryIngress] UniConn = true Logging = false -ServerPubKey = '' -URL = '' BufferSize = 100 MaxBatchSize = 50 SendInterval = '500ms' SendTimeout = '10s' UseBatchSend = true +URL = '' +ServerPubKey = '' [AuditLogger] Enabled = false From 5a4e7ec2fbd67035ccecfdf90dc912b8821378f4 Mon Sep 17 00:00:00 2001 From: Vlad Sirenko <821251+sirenko@users.noreply.github.com> Date: Sat, 21 Oct 2023 18:34:24 -0700 Subject: [PATCH 2/3] Fix monitoring endpoint delegation --- core/services/ocr2/delegate.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index c21ffd8971..c43eab23f2 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1359,10 +1359,15 @@ func (d *Delegate) newServicesCCIPCommit(lggr logger.SugaredLogger, jb job.Job, ContractConfigTracker: ccipProvider.ContractConfigTracker(), Database: ocrDB, LocalConfig: lc, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2CCIP), - OffchainConfigDigester: ccipProvider.OffchainConfigDigester(), - OffchainKeyring: kb, - OnchainKeyring: kb, + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint( + spec.ContractID, + synchronization.OCR2CCIP, + rid.Network, + rid.ChainID, + ), + OffchainConfigDigester: ccipProvider.OffchainConfigDigester(), + OffchainKeyring: kb, + OnchainKeyring: kb, } logError := func(msg string) { lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error") @@ -1407,10 +1412,15 @@ func (d *Delegate) newServicesCCIPExecution(lggr logger.SugaredLogger, jb job.Jo ContractConfigTracker: ccipProvider.ContractConfigTracker(), Database: ocrDB, LocalConfig: lc, - MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2CCIP), - OffchainConfigDigester: ccipProvider.OffchainConfigDigester(), - OffchainKeyring: kb, - OnchainKeyring: kb, + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint( + spec.ContractID, + synchronization.OCR2CCIP, + rid.Network, + rid.ChainID, + ), + OffchainConfigDigester: ccipProvider.OffchainConfigDigester(), + OffchainKeyring: kb, + OnchainKeyring: kb, } logError := func(msg string) { lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error") From f6cb49842f6b2264ce2cc12fed89e1fc3a51a806 Mon Sep 17 00:00:00 2001 From: Vlad Sirenko <821251+sirenko@users.noreply.github.com> Date: Sat, 21 Oct 2023 18:52:39 -0700 Subject: [PATCH 3/3] Fix the test --- core/services/telemetry/manager_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/services/telemetry/manager_test.go b/core/services/telemetry/manager_test.go index 93527f7043..2325c0d3b0 100644 --- a/core/services/telemetry/manager_test.go +++ b/core/services/telemetry/manager_test.go @@ -332,10 +332,14 @@ func TestLegacyMode(t *testing.T) { require.Len(t, clientSent, 6) require.Equal(t, 1, obsLogs.Len()) // Deprecation warning for TelemetryIngress.URL and TelemetryIngress.ServerPubKey - require.Equal(t, []byte("endpoint-1-message-1"), clientSent[0].Telemetry) - require.Equal(t, []byte("endpoint-1-message-2"), clientSent[1].Telemetry) - require.Equal(t, []byte("endpoint-1-message-3"), clientSent[2].Telemetry) - require.Equal(t, []byte("endpoint-2-message-1"), clientSent[3].Telemetry) - require.Equal(t, []byte("endpoint-2-message-2"), clientSent[4].Telemetry) - require.Equal(t, []byte("endpoint-2-message-3"), clientSent[5].Telemetry) + // disable false positive linter, it misses the size check above + // nolint: gosec + if len(clientSent) >= 6 { + require.Equal(t, []byte("endpoint-1-message-1"), clientSent[0].Telemetry) + require.Equal(t, []byte("endpoint-1-message-2"), clientSent[1].Telemetry) + require.Equal(t, []byte("endpoint-1-message-3"), clientSent[2].Telemetry) + require.Equal(t, []byte("endpoint-2-message-1"), clientSent[3].Telemetry) + require.Equal(t, []byte("endpoint-2-message-2"), clientSent[4].Telemetry) + require.Equal(t, []byte("endpoint-2-message-3"), clientSent[5].Telemetry) + } }