From 1d3caadcb72156237789bffea6ed069f97d8fa7c Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 14 Feb 2022 11:48:20 -0800 Subject: [PATCH 01/14] Split goreleaser archive in two:cgo/nocgo (#2505) --- .goreleaser.yml | 81 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 12 deletions(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index 193e48347a0..97e4f4eafac 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -2,8 +2,36 @@ before: hooks: - go mod download - ./develop/scripts/create_build_info_data.sh + +archives: + - id: default + builds: + - temporal-server + - tctl + - temporal-cassandra-tool + - temporal-sql-tool + name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" + format_overrides: + - goos: windows + format: zip + files: + - ./config/* + + - id: no-cgo + builds: + - temporal-server-no-cgo + - tctl-no-cgo + - temporal-cassandra-tool-no-cgo + - temporal-sql-tool-no-cgo + name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}_no_cgo" + format_overrides: + - goos: windows + format: zip + files: + - ./config/* + builds: - - id: "temporal-server" + - id: temporal-server dir: cmd/server binary: temporal-server goos: @@ -13,7 +41,7 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-server-no-cgo" + - id: temporal-server-no-cgo dir: cmd/server binary: temporal-server env: @@ -23,7 +51,7 @@ builds: goarch: - amd64 - arm64 - - id: "tctl" + - id: tctl dir: cmd/tools/cli binary: tctl goos: @@ -33,7 +61,17 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-cassandra-tool" + - id: tctl-no-cgo + dir: cmd/tools/cli + binary: tctl + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + - id: temporal-cassandra-tool dir: cmd/tools/cassandra binary: temporal-cassandra-tool goos: @@ -43,7 +81,17 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-sql-tool" + - id: temporal-cassandra-tool-no-cgo + dir: cmd/tools/cassandra + binary: temporal-cassandra-tool + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + - id: temporal-sql-tool dir: cmd/tools/sql binary: temporal-sql-tool goos: @@ -53,14 +101,23 @@ builds: goarch: - amd64 - arm64 + - id: temporal-sql-tool-no-cgo + dir: cmd/tools/sql + binary: temporal-sql-tool + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + checksum: name_template: 'checksums.txt' algorithm: sha256 -snapshot: - name_template: "{{ .Tag }}-next" + changelog: - sort: asc - filters: - exclude: - - '^docs:' - - '^test:' + skip: true + +announce: + skip: "true" From faf1b3e2cb16e67ea2784c4b92d1ac676fff00b1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 14 Feb 2022 13:07:24 -0800 Subject: [PATCH 02/14] Update go version of goreleaser to to 1.17.6 (#2506) --- .github/workflows/goreleaser.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index f6c94a6d6bc..804a73f0e35 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -14,7 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.17.3 + go-version: 1.17.6 - name: Run GoReleaser uses: goreleaser/goreleaser-action@v2 with: From 1a1aa2f751e3c6b46ea675d1a12d8c86f9e71cd6 Mon Sep 17 00:00:00 2001 From: mastermanu Date: Tue, 22 Feb 2022 19:25:17 -0500 Subject: [PATCH 03/14] Adds ExecutionInfo reference to InternalWorkflowMutation #2530 (#2531) This enables the underlying store to have access to the ExecutionInfo on the Mutation object without having to do deserialization --- common/persistence/cassandra/util.go | 2 +- common/persistence/execution_manager.go | 3 ++- common/persistence/persistenceInterface.go | 3 ++- common/persistence/size.go | 2 +- common/persistence/sql/execution_util.go | 2 +- 5 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/persistence/cassandra/util.go b/common/persistence/cassandra/util.go index 414b9d6cc88..681dbe1485c 100644 --- a/common/persistence/cassandra/util.go +++ b/common/persistence/cassandra/util.go @@ -54,7 +54,7 @@ func applyWorkflowMutationBatch( namespaceID, workflowID, runID, - workflowMutation.ExecutionInfo, + workflowMutation.ExecutionInfoBlob, workflowMutation.ExecutionState, workflowMutation.ExecutionStateBlob, workflowMutation.NextEventID, diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index ee3267efff5..5dd4bd467cc 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -454,6 +454,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( UpsertRequestCancelInfos: make(map[int64]*commonpb.DataBlob), UpsertSignalInfos: make(map[int64]*commonpb.DataBlob), + ExecutionInfo: input.ExecutionInfo, ExecutionState: input.ExecutionState, DeleteActivityInfos: input.DeleteActivityInfos, @@ -476,7 +477,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( NextEventID: input.NextEventID, } - result.ExecutionInfo, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3) + result.ExecutionInfoBlob, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3) if err != nil { return nil, err } diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 1da8a1947ab..16557c9a6fd 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -380,7 +380,8 @@ type ( WorkflowID string RunID string - ExecutionInfo *commonpb.DataBlob + ExecutionInfo *persistencespb.WorkflowExecutionInfo + ExecutionInfoBlob *commonpb.DataBlob ExecutionState *persistencespb.WorkflowExecutionState ExecutionStateBlob *commonpb.DataBlob NextEventID int64 diff --git a/common/persistence/size.go b/common/persistence/size.go index d668d1e6a0d..77ddd69f32a 100644 --- a/common/persistence/size.go +++ b/common/persistence/size.go @@ -104,7 +104,7 @@ func statusOfInternalWorkflowMutation( return nil } - executionInfoSize := sizeOfBlob(mutation.ExecutionInfo) + executionInfoSize := sizeOfBlob(mutation.ExecutionInfoBlob) executionStateSize := sizeOfBlob(mutation.ExecutionStateBlob) activityInfoCount := len(mutation.UpsertActivityInfos) diff --git a/common/persistence/sql/execution_util.go b/common/persistence/sql/execution_util.go index 964921f7905..1645d99a577 100644 --- a/common/persistence/sql/execution_util.go +++ b/common/persistence/sql/execution_util.go @@ -86,7 +86,7 @@ func applyWorkflowMutationTx( tx, namespaceID, workflowID, - workflowMutation.ExecutionInfo, + workflowMutation.ExecutionInfoBlob, workflowMutation.ExecutionState, workflowMutation.NextEventID, lastWriteVersion, From e7852f73e24b2459c9523c1dd4a83d8948bdcd42 Mon Sep 17 00:00:00 2001 From: Sergey Bykov <8248806+sergeybykov@users.noreply.github.com> Date: Mon, 14 Feb 2022 22:05:51 -0800 Subject: [PATCH 04/14] Treat enablement of TLS separately for server and client config (#2501) --- common/config/config.go | 9 ++++--- .../rpc/encryption/localStoreTlsProvider.go | 8 +++---- common/rpc/encryption/tls_config_test.go | 22 +++++++++++------ common/rpc/test/rpc_localstore_tls_test.go | 24 +++++++++++++++++++ 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index 96634394d5c..3be6b3a3c35 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -484,9 +484,12 @@ func (c *Config) String() string { return maskedYaml } -func (r *GroupTLS) IsEnabled() bool { - return r.Server.KeyFile != "" || r.Server.KeyData != "" || - len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 || +func (r *GroupTLS) IsServerEnabled() bool { + return r.Server.KeyFile != "" || r.Server.KeyData != "" +} + +func (r *GroupTLS) IsClientEnabled() bool { + return len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 || r.Client.ForceTLS } diff --git a/common/rpc/encryption/localStoreTlsProvider.go b/common/rpc/encryption/localStoreTlsProvider.go index 9f4556de16f..74273656894 100644 --- a/common/rpc/encryption/localStoreTlsProvider.go +++ b/common/rpc/encryption/localStoreTlsProvider.go @@ -130,7 +130,7 @@ func (s *localStoreTlsProvider) GetInternodeClientConfig() (*tls.Config, error) return newClientTLSConfig(s.internodeClientCertProvider, client.ServerName, s.settings.Internode.Server.RequireClientAuth, false, !client.DisableHostVerification) }, - s.settings.Internode.IsEnabled(), + s.settings.Internode.IsClientEnabled(), ) } @@ -143,7 +143,7 @@ func (s *localStoreTlsProvider) GetFrontendClientConfig() (*tls.Config, error) { useTLS = true } else { client = &s.settings.Frontend.Client - useTLS = s.settings.Frontend.IsEnabled() + useTLS = s.settings.Frontend.IsClientEnabled() } return s.getOrCreateConfig( &s.cachedFrontendClientConfig, @@ -161,7 +161,7 @@ func (s *localStoreTlsProvider) GetFrontendServerConfig() (*tls.Config, error) { func() (*tls.Config, error) { return newServerTLSConfig(s.frontendCertProvider, s.frontendPerHostCertProviderMap, &s.settings.Frontend, s.logger) }, - s.settings.Frontend.IsEnabled()) + s.settings.Frontend.IsServerEnabled()) } func (s *localStoreTlsProvider) GetInternodeServerConfig() (*tls.Config, error) { @@ -170,7 +170,7 @@ func (s *localStoreTlsProvider) GetInternodeServerConfig() (*tls.Config, error) func() (*tls.Config, error) { return newServerTLSConfig(s.internodeCertProvider, nil, &s.settings.Internode, s.logger) }, - s.settings.Internode.IsEnabled()) + s.settings.Internode.IsServerEnabled()) } func (s *localStoreTlsProvider) GetExpiringCerts(timeWindow time.Duration, diff --git a/common/rpc/encryption/tls_config_test.go b/common/rpc/encryption/tls_config_test.go index 6b11c829942..808f1a41dfc 100644 --- a/common/rpc/encryption/tls_config_test.go +++ b/common/rpc/encryption/tls_config_test.go @@ -51,19 +51,27 @@ func (s *tlsConfigTest) SetupTest() { func (s *tlsConfigTest) TestIsEnabled() { emptyCfg := config.GroupTLS{} - s.False(emptyCfg.IsEnabled()) + s.False(emptyCfg.IsServerEnabled()) + s.False(emptyCfg.IsClientEnabled()) cfg := config.GroupTLS{Server: config.ServerTLS{KeyFile: "foo"}} - s.True(cfg.IsEnabled()) + s.True(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) cfg = config.GroupTLS{Server: config.ServerTLS{KeyData: "foo"}} - s.True(cfg.IsEnabled()) + s.True(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{RootCAFiles: []string{"bar"}}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{RootCAData: []string{"bar"}}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{ForceTLS: true}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{ForceTLS: false}} - s.False(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) + } func (s *tlsConfigTest) TestIsSystemWorker() { diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index c570eb53dc7..bf37bb80877 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -76,6 +76,7 @@ type localStoreRPCSuite struct { internodeDynamicTLSFactory *TestFactory internodeMutualTLSRPCRefreshFactory *TestFactory frontendMutualTLSRPCRefreshFactory *TestFactory + frontendConfigRootCAForceTLSFactory *TestFactory internodeCertDir string frontendCertDir string @@ -101,6 +102,7 @@ type localStoreRPCSuite struct { frontendConfigMutualTLS config.GroupTLS frontendConfigPerHostOverrides config.GroupTLS frontendConfigRootCAOnly config.GroupTLS + frontendConfigRootCAForceTLS config.GroupTLS frontendConfigAltRootCAOnly config.GroupTLS frontendConfigSystemWorker config.WorkerTLS frontendConfigMutualTLSRefresh config.GroupTLS @@ -201,6 +203,9 @@ func (s *localStoreRPCSuite) SetupSuite() { RootCAData: []string{convertFileToBase64(s.frontendChain.CaPubFile)}, }, } + s.frontendConfigRootCAForceTLS = s.frontendConfigRootCAOnly + s.frontendConfigRootCAForceTLS.Client.ForceTLS = true + s.frontendConfigAltRootCAOnly = config.GroupTLS{ Server: config.ServerTLS{ RequireClientAuth: true, @@ -319,6 +324,13 @@ func (s *localStoreRPCSuite) setupFrontend() { }, } + localStoreRootCAForceTLS := &config.Global{ + Membership: s.membershipConfig, + TLS: config.RootTLS{ + Frontend: s.frontendConfigRootCAForceTLS, + }, + } + provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) @@ -355,6 +367,12 @@ func (s *localStoreRPCSuite) setupFrontend() { s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) s.frontendMutualTLSRPCRefreshFactory = f(frontendMutualTLSRefreshFactory) + + provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, nil, s.logger, nil) + s.NoError(err) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + s.NotNil(frontendServerTLSFactory) + s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) } func (s *localStoreRPCSuite) setupInternode() { @@ -789,3 +807,9 @@ func runRingpopTLSTest(s suite.Suite, logger log.Logger, serverA *TestFactory, s s.NoError(err) } } + +func (s *localStoreRPCSuite) TestClientForceTLS() { + options, err := s.frontendConfigRootCAForceTLSFactory.RPCFactory.GetFrontendGRPCServerOptions() + s.NoError(err) + s.Nil(options) +} From 66ebbd49de0c598cd68c797dc41c12a583bced19 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Thu, 17 Feb 2022 19:50:25 -0800 Subject: [PATCH 05/14] Add config and handling for remote cluster cert (#2475) --- common/config/config.go | 2 + common/rpc.go | 4 +- .../rpc/encryption/localStoreTlsProvider.go | 102 +++++++++++++++--- .../testDynamicTLSConfigProvider.go | 5 + common/rpc/encryption/tlsFactory.go | 1 + common/rpc/rpc.go | 60 ++++++++--- common/rpc/test/rpc_common_test.go | 29 +++-- common/rpc/test/rpc_localstore_tls_test.go | 53 ++++++--- temporal/server_impl.go | 2 +- 9 files changed, 198 insertions(+), 60 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index 3be6b3a3c35..7780e803b4e 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -116,6 +116,8 @@ type ( Frontend GroupTLS `yaml:"frontend"` // SystemWorker controls TLS setting for System Workers connecting to Frontend. SystemWorker WorkerTLS `yaml:"systemWorker"` + // RemoteFrontendClients controls TLS setting for talking to remote cluster. + RemoteClusters map[string]GroupTLS `yaml:"remoteClusters"` // ExpirationChecks defines settings for periodic checks for expiration of certificates ExpirationChecks CertExpirationValidation `yaml:"expirationChecks"` // Interval between refreshes of certificates loaded from files diff --git a/common/rpc.go b/common/rpc.go index 33a733a65f7..20a374a2711 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -38,7 +38,7 @@ type ( GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) GetGRPCListener() net.Listener GetRingpopChannel() *tchannel.Channel - CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn - CreateInternodeGRPCConnection(hostName string) *grpc.ClientConn + CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn + CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn } ) diff --git a/common/rpc/encryption/localStoreTlsProvider.go b/common/rpc/encryption/localStoreTlsProvider.go index 74273656894..65a782fb5cd 100644 --- a/common/rpc/encryption/localStoreTlsProvider.go +++ b/common/rpc/encryption/localStoreTlsProvider.go @@ -51,17 +51,18 @@ type localStoreTlsProvider struct { settings *config.RootTLS - internodeCertProvider CertProvider - internodeClientCertProvider CertProvider - frontendCertProvider CertProvider - workerCertProvider CertProvider - - frontendPerHostCertProviderMap *localStorePerHostCertProviderMap - - cachedInternodeServerConfig *tls.Config - cachedInternodeClientConfig *tls.Config - cachedFrontendServerConfig *tls.Config - cachedFrontendClientConfig *tls.Config + internodeCertProvider CertProvider + internodeClientCertProvider CertProvider + frontendCertProvider CertProvider + workerCertProvider CertProvider + remoteClusterClientCertProvider map[string]CertProvider + frontendPerHostCertProviderMap *localStorePerHostCertProviderMap + + cachedInternodeServerConfig *tls.Config + cachedInternodeClientConfig *tls.Config + cachedFrontendServerConfig *tls.Config + cachedFrontendClientConfig *tls.Config + cachedRemoteClusterClientConfig map[string]*tls.Config ticker *time.Ticker logger log.Logger @@ -84,6 +85,11 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerProvider = internodeWorkerProvider } + remoteClusterClientCertProvider := make(map[string]CertProvider) + for hostname, groupTLS := range tlsConfig.RemoteClusters { + remoteClusterClientCertProvider[hostname] = certProviderFactory(&groupTLS, nil, nil, tlsConfig.RefreshInterval, logger) + } + provider := &localStoreTlsProvider{ internodeCertProvider: internodeProvider, internodeClientCertProvider: internodeProvider, @@ -91,10 +97,12 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerCertProvider: workerProvider, frontendPerHostCertProviderMap: newLocalStorePerHostCertProviderMap( tlsConfig.Frontend.PerHostOverrides, certProviderFactory, tlsConfig.RefreshInterval, logger), - RWMutex: sync.RWMutex{}, - settings: tlsConfig, - scope: scope, - logger: logger, + remoteClusterClientCertProvider: remoteClusterClientCertProvider, + RWMutex: sync.RWMutex{}, + settings: tlsConfig, + scope: scope, + logger: logger, + cachedRemoteClusterClientConfig: make(map[string]*tls.Config), } provider.initialize() return provider, nil @@ -155,6 +163,26 @@ func (s *localStoreTlsProvider) GetFrontendClientConfig() (*tls.Config, error) { ) } +func (s *localStoreTlsProvider) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + groupTLS, ok := s.settings.RemoteClusters[hostname] + if !ok { + return nil, nil + } + + return s.getOrCreateRemoteClusterClientConfig( + hostname, + func() (*tls.Config, error) { + return newClientTLSConfig( + s.remoteClusterClientCertProvider[hostname], + groupTLS.Client.ServerName, + groupTLS.Server.RequireClientAuth, + false, + !groupTLS.Client.DisableHostVerification) + }, + groupTLS.IsClientEnabled(), + ) +} + func (s *localStoreTlsProvider) GetFrontendServerConfig() (*tls.Config, error) { return s.getOrCreateConfig( &s.cachedFrontendServerConfig, @@ -239,6 +267,41 @@ func (s *localStoreTlsProvider) getOrCreateConfig( return *cachedConfig, nil } +func (s *localStoreTlsProvider) getOrCreateRemoteClusterClientConfig( + hostname string, + configConstructor tlsConfigConstructor, + isEnabled bool, +) (*tls.Config, error) { + if !isEnabled { + return nil, nil + } + + // Check if exists under a read lock first + s.RLock() + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + defer s.RUnlock() + return clientConfig, nil + } + // Not found, promote to write lock to initialize + s.RUnlock() + s.Lock() + defer s.Unlock() + // Check if someone got here first while waiting for write lock + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + return clientConfig, nil + } + + // Load configuration + localConfig, err := configConstructor() + + if err != nil { + return nil, err + } + + s.cachedRemoteClusterClientConfig[hostname] = localConfig + return localConfig, nil +} + func newServerTLSConfig( certProvider CertProvider, perHostCertProviderMap PerHostCertProviderMap, @@ -321,8 +384,13 @@ func getServerTLSConfigFromCertProvider( logger), nil } -func newClientTLSConfig(clientProvider CertProvider, serverName string, isAuthRequired bool, - isWorker bool, enableHostVerification bool) (*tls.Config, error) { +func newClientTLSConfig( + clientProvider CertProvider, + serverName string, + isAuthRequired bool, + isWorker bool, + enableHostVerification bool, +) (*tls.Config, error) { // Optional ServerCA for client if not already trusted by host serverCa, err := clientProvider.FetchServerRootCAsForClient(isWorker) if err != nil { diff --git a/common/rpc/encryption/testDynamicTLSConfigProvider.go b/common/rpc/encryption/testDynamicTLSConfigProvider.go index 69e9662773d..279ad413433 100644 --- a/common/rpc/encryption/testDynamicTLSConfigProvider.go +++ b/common/rpc/encryption/testDynamicTLSConfigProvider.go @@ -72,6 +72,11 @@ func (t *TestDynamicTLSConfigProvider) GetExpiringCerts(timeWindow time.Duration panic("implement me") } +func (t *TestDynamicTLSConfigProvider) GetRemoteClusterClientConfig(hostName string) (*tls.Config, error) { + //TODO implement me + panic("implement me") +} + var _ TLSConfigProvider = (*TestDynamicTLSConfigProvider)(nil) func NewTestDynamicTLSConfigProvider( diff --git a/common/rpc/encryption/tlsFactory.go b/common/rpc/encryption/tlsFactory.go index 3b97c8dc7c0..5e7c1bf9a6d 100644 --- a/common/rpc/encryption/tlsFactory.go +++ b/common/rpc/encryption/tlsFactory.go @@ -44,6 +44,7 @@ type ( GetInternodeClientConfig() (*tls.Config, error) GetFrontendServerConfig() (*tls.Config, error) GetFrontendClientConfig() (*tls.Config, error) + GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) GetExpiringCerts(timeWindow time.Duration) (expiring CertExpirationMap, expired CertExpirationMap, err error) } diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 297144275b2..6938501fe5c 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -31,23 +31,24 @@ import ( "sync" "github.com/uber/tchannel-go" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/rpc/encryption" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // RPCFactory is an implementation of service.RPCFactory interface type RPCFactory struct { - config *config.RPC - serviceName string - logger log.Logger - dc *dynamicconfig.Collection + config *config.RPC + serviceName string + logger log.Logger + dc *dynamicconfig.Collection + clusterMetadata *cluster.Config sync.Mutex grpcListener net.Listener @@ -57,13 +58,21 @@ type RPCFactory struct { // NewFactory builds a new RPCFactory // conforming to the underlying configuration -func NewFactory(cfg *config.RPC, sName string, logger log.Logger, tlsProvider encryption.TLSConfigProvider, dc *dynamicconfig.Collection) *RPCFactory { +func NewFactory( + cfg *config.RPC, + sName string, + logger log.Logger, + tlsProvider encryption.TLSConfigProvider, + dc *dynamicconfig.Collection, + clusterMetadata *cluster.Config, +) *RPCFactory { return &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - dc: dc, - tlsFactory: tlsProvider, + config: cfg, + serviceName: sName, + logger: logger, + dc: dc, + tlsFactory: tlsProvider, + clusterMetadata: clusterMetadata, } } @@ -92,6 +101,14 @@ func (d *RPCFactory) GetFrontendClientTlsConfig() (*tls.Config, error) { return nil, nil } +func (d *RPCFactory) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + if d.tlsFactory != nil { + return d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + + return nil, nil +} + func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) { var opts []grpc.ServerOption @@ -237,18 +254,29 @@ func getListenIP(cfg *config.RPC, logger log.Logger) net.IP { } // CreateFrontendGRPCConnection creates connection for gRPC calls -func (d *RPCFactory) CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn { +func (d *RPCFactory) CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn { var tlsClientConfig *tls.Config var err error if d.tlsFactory != nil { - tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + currCluster := d.clusterMetadata.ClusterInformation[d.clusterMetadata.CurrentClusterName] + + if currCluster.RPCAddress == rpcAddress { + tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + } else { + hostname, _, err2 := net.SplitHostPort(rpcAddress) + if err2 != nil { + d.logger.Fatal("Invalid rpcAddress for remote cluster", tag.Error(err2)) + } + tlsClientConfig, err = d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + if err != nil { d.logger.Fatal("Failed to create tls config for gRPC connection", tag.Error(err)) return nil } } - return d.dial(hostName, tlsClientConfig) + return d.dial(rpcAddress, tlsClientConfig) } // CreateInternodeGRPCConnection creates connection for gRPC calls diff --git a/common/rpc/test/rpc_common_test.go b/common/rpc/test/rpc_common_test.go index accc5c21f4c..2ecdfd698d0 100644 --- a/common/rpc/test/rpc_common_test.go +++ b/common/rpc/test/rpc_common_test.go @@ -28,19 +28,19 @@ import ( "context" "crypto/tls" "math/rand" + "net" "strings" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/peer" - "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - "google.golang.org/grpc/examples/helloworld/helloworld" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/log" "go.temporal.io/server/common/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/examples/helloworld/helloworld" + "google.golang.org/grpc/peer" ) // HelloServer is used to implement helloworld.GreeterServer. @@ -53,6 +53,7 @@ type ServerUsageType int32 const ( Frontend ServerUsageType = iota Internode + RemoteCluster ) const ( @@ -82,6 +83,10 @@ var ( BroadcastAddress: localhostIPv4, }, } + clusterMetadata = &cluster.Config{ + CurrentClusterName: "test", + ClusterInformation: map[string]cluster.ClusterInformation{"test": {RPCAddress: localhostIPv4 + ":1234"}}, + } ) func startHelloWorldServer(s suite.Suite, factory *TestFactory) (*grpc.Server, string) { @@ -166,15 +171,21 @@ func dialHelloAndGetTLSInfo( logger := log.NewNoopLogger() var cfg *tls.Config var err error - if serverType == Internode { + switch serverType { + case Internode: cfg, err = clientFactory.GetInternodeClientTlsConfig() - } else { + case Frontend: cfg, err = clientFactory.GetFrontendClientTlsConfig() + case RemoteCluster: + host, _, err := net.SplitHostPort(hostport) + s.NoError(err) + cfg, err = clientFactory.GetRemoteClusterClientConfig(host) } - s.NoError(err) + clientConn, err := rpc.Dial(hostport, cfg, logger) s.NoError(err) + client := helloworld.NewGreeterClient(clientConn) request := &helloworld.HelloRequest{Name: convert.Uint64ToString(rand.Uint64())} diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index bf37bb80877..6827b2b4230 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -76,6 +76,7 @@ type localStoreRPCSuite struct { internodeDynamicTLSFactory *TestFactory internodeMutualTLSRPCRefreshFactory *TestFactory frontendMutualTLSRPCRefreshFactory *TestFactory + remoteClusterMutualTLSRPCFactory *TestFactory frontendConfigRootCAForceTLSFactory *TestFactory internodeCertDir string @@ -138,7 +139,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, nil, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -331,24 +332,32 @@ func (s *localStoreRPCSuite) setupFrontend() { }, } + localStoreMutualTLSRemoteCluster := &config.Global{ + Membership: s.membershipConfig, + TLS: config.RootTLS{ + Frontend: s.frontendConfigPerHostOverrides, + RemoteClusters: map[string]config.GroupTLS{localhostIPv4: s.frontendConfigPerHostOverrides}, + }, + } + provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, nil, s.logger, nil) s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -362,7 +371,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.frontendRollingCerts, s.dynamicCACertPool, s.wrongCACertPool) - dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection()) + dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -370,9 +379,15 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) + + provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, nil, s.logger, nil) + s.NoError(err) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + s.NotNil(remoteClusterMutualTLSRPCFactory) + s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } func (s *localStoreRPCSuite) setupInternode() { @@ -406,22 +421,22 @@ func (s *localStoreRPCSuite) setupInternode() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) @@ -454,16 +469,16 @@ func (s *localStoreRPCSuite) setupInternodeRingpop() { provider, err := encryption.NewTLSConfigProviderFromConfig(ringpopMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryA) - ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryB) provider, err = encryption.NewTLSConfigProviderFromConfig(ringpopServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryA) - ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryB) s.ringpopMutualTLSRPCFactoryA = i(ringpopMutualTLSFactoryA) @@ -580,6 +595,10 @@ func i(r *rpc.RPCFactory) *TestFactory { return &TestFactory{serverUsage: Internode, RPCFactory: r} } +func r(r *rpc.RPCFactory) *TestFactory { + return &TestFactory{serverUsage: RemoteCluster, RPCFactory: r} +} + func convertFileToBase64(file string) string { fileBytes, err := os.ReadFile(file) if err != nil { @@ -605,6 +624,10 @@ func (s *localStoreRPCSuite) TestMutualTLSFrontendToFrontend() { runHelloWorldTest(s.Suite, localhostIPv4, s.frontendMutualTLSRPCFactory, s.frontendMutualTLSRPCFactory, true) } +func (s *localStoreRPCSuite) TestMutualTLSFrontendToRemoteCluster() { + runHelloWorldTest(s.Suite, localhostIPv4, s.remoteClusterMutualTLSRPCFactory, s.remoteClusterMutualTLSRPCFactory, true) +} + func (s *localStoreRPCSuite) TestMutualTLSButClientInsecure() { runHelloWorldTest(s.Suite, localhostIPv4, s.internodeMutualTLSRPCFactory, s.insecureRPCFactory, false) } diff --git a/temporal/server_impl.go b/temporal/server_impl.go index 6a7c4850e09..21af9b10cf1 100644 --- a/temporal/server_impl.go +++ b/temporal/server_impl.go @@ -191,7 +191,7 @@ func newBootstrapParams( } svcCfg := cfg.Services[svcName] - rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc) + rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc, clusterMetadata) params.RPCFactory = rpcFactory // Ringpop uses a different port to register handlers, this map is needed to resolve From 749d2c602846ab2b3c792810ab16550e73a5cc95 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 24 Feb 2022 11:27:26 -0800 Subject: [PATCH 06/14] Preserve the info when updating is_global_namepsace_enabled (#2540) --- temporal/fx.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 50d53b32314..10521956c0a 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -596,22 +596,15 @@ func ApplyClusterMetadataConfigProvider( // Allow updating cluster metadata if global namespace is disabled if !resp.IsGlobalNamespaceEnabled && clusterData.EnableGlobalNamespace { - resp.InitialFailoverVersion = clusterInfo.InitialFailoverVersion - resp.FailoverVersionIncrement = clusterData.FailoverVersionIncrement + currentMetadata := resp.ClusterMetadata + currentMetadata.IsGlobalNamespaceEnabled = clusterData.EnableGlobalNamespace + currentMetadata.InitialFailoverVersion = clusterInfo.InitialFailoverVersion + currentMetadata.FailoverVersionIncrement = clusterData.FailoverVersionIncrement applied, err = clusterMetadataManager.SaveClusterMetadata( &persistence.SaveClusterMetadataRequest{ - ClusterMetadata: persistencespb.ClusterMetadata{ - HistoryShardCount: resp.HistoryShardCount, - ClusterName: resp.ClusterName, - ClusterId: resp.ClusterId, - ClusterAddress: resp.ClusterAddress, - FailoverVersionIncrement: resp.FailoverVersionIncrement, - InitialFailoverVersion: resp.InitialFailoverVersion, - IsGlobalNamespaceEnabled: clusterData.EnableGlobalNamespace, - IsConnectionEnabled: resp.IsConnectionEnabled, - }, - Version: resp.Version, + ClusterMetadata: currentMetadata, + Version: resp.Version, }) if !applied || err != nil { return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while updating cluster metadata: %w", err) From 3b6d586026a72816a0d0d9d3544ef3a4e6500e1b Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 24 Feb 2022 12:13:41 -0800 Subject: [PATCH 07/14] Check is global ns enabled in handler (#2541) --- service/history/handler.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/service/history/handler.go b/service/history/handler.go index 1c30376a7de..03031cde558 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -1096,6 +1096,10 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if namespaceID == "" { return nil, h.convertError(errNamespaceNotSet) @@ -1160,6 +1164,10 @@ func (h *Handler) SyncActivity(ctx context.Context, request *historyservice.Sync return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if request.GetNamespaceId() == "" || uuid.Parse(request.GetNamespaceId()) == nil { return nil, h.convertError(errNamespaceNotSet) @@ -1195,6 +1203,9 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } var wg sync.WaitGroup wg.Add(len(request.Tokens)) @@ -1248,6 +1259,9 @@ func (h *Handler) GetDLQReplicationMessages(ctx context.Context, request *histor if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } taskInfoPerShard := map[int32][]*replicationspb.ReplicationTaskInfo{} // do batch based on workflow ID and run ID @@ -1487,6 +1501,9 @@ func (h *Handler) GetReplicationStatus( if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } resp := &historyservice.GetReplicationStatusResponse{} for _, shardID := range h.controller.ShardIDs() { @@ -1525,6 +1542,13 @@ func (h *Handler) convertError(err error) error { return err } +func (h *Handler) validateReplicationConfig() error { + if !h.clusterMetadata.IsGlobalNamespaceEnabled() { + return serviceerror.NewUnavailable("The cluster has global namespace disabled. The operation is not supported.") + } + return nil +} + func validateTaskToken(taskToken *tokenspb.Task) error { if taskToken.GetWorkflowId() == "" { return errWorkflowIDNotSet From 24eb5cfbb7d4e0a01ab13678d82b55bdfdc38876 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Fri, 25 Feb 2022 12:50:18 -0800 Subject: [PATCH 08/14] reduce unnecessary logging (#2549) --- client/history/metricClient.go | 11 ++++++++++- client/matching/metricClient.go | 12 +++++++++++- service/history/replicationTaskProcessor.go | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/client/history/metricClient.go b/client/history/metricClient.go index 1d3faa931ca..548484e21ff 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -27,6 +27,7 @@ package history import ( "context" + "go.temporal.io/api/serviceerror" "google.golang.org/grpc" "go.temporal.io/server/api/historyservice/v1" @@ -617,7 +618,15 @@ func (c *metricClient) finishMetricsRecording( err error, ) { if err != nil { - c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err)) + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err)) + } scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) } stopwatch.Stop() diff --git a/client/matching/metricClient.go b/client/matching/metricClient.go index 8c114b1bb81..16c3d97fb1b 100644 --- a/client/matching/metricClient.go +++ b/client/matching/metricClient.go @@ -28,6 +28,7 @@ import ( "context" "strings" + "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "google.golang.org/grpc" @@ -256,7 +257,16 @@ func (c *metricClient) finishMetricsRecording( err error, ) { if err != nil { - c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err)) + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + + c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err)) + } scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) } stopwatch.Stop() diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index 55dda181ea5..0dc01954696 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -492,7 +492,7 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTasks() error { return nil } - p.logger.Info("cleaning up replication task queue", tag.ReadLevel(*minAckedTaskID)) + p.logger.Debug("cleaning up replication task queue", tag.ReadLevel(*minAckedTaskID)) p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupCount) p.metricsClient.Scope( metrics.ReplicationTaskFetcherScope, From a9b223b98605387cf921a9888cc1b956c2571a26 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Fri, 25 Feb 2022 13:00:23 -0800 Subject: [PATCH 09/14] Backoff failed workflow task (#2548) * Backoff failed workflow task --- host/client_integration_test.go | 84 ++++++++++++++++--- .../history/workflowTaskHandlerCallbacks.go | 4 + 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/host/client_integration_test.go b/host/client_integration_test.go index 6991ed0bbe6..d76f92decc1 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -585,18 +585,16 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() { //s.printHistory(id, workflowRun.GetRunID()) } +// This test simulates workflow try to complete itself while there is buffered event. +// Event sequence: +// 1st WorkflowTask runs a local activity. +// While local activity is running, a signal is received by server. +// After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. +// Server failed the complete request because there is unhandled signal. +// Server rescheduled a new workflow task. +// Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. +// Server complete workflow as requested. func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { - /* - Event sequence: - 1st WorkflowTask runs a local activity. - While local activity is running, a signal is received by server. - After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. - Server failed the complete request because there is unhandled signal. - Server rescheduled a new workflow task. - Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. - Server complete workflow as requested. - */ - sigReadyToSendChan := make(chan struct{}, 1) sigSendDoneChan := make(chan struct{}) localActivityFn := func(ctx context.Context) error { @@ -694,6 +692,70 @@ func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) } +// This test simulates workflow generate command with invalid attributes. +// Server is expected to fail the workflow task and schedule a retry immediately for first attempt, +// but if workflow task keeps failing, server will drop the task and wait for timeout to schedule additional retries. +// This is the same behavior as the SDK used to do, but now we would do on server. +func (s *clientIntegrationSuite) Test_InvalidCommandAttribute() { + activityFn := func(ctx context.Context) error { + return nil + } + + var calledTime []time.Time + workflowFn := func(ctx workflow.Context) error { + calledTime = append(calledTime, time.Now().UTC()) + ao := workflow.ActivityOptions{} // invalid activity option without StartToClose timeout + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, activityFn).Get(ctx, nil) + return err + } + + s.worker.RegisterWorkflow(workflowFn) + s.worker.RegisterActivity(activityFn) + + id := "integration-test-invalid-command-attributes" + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: id, + TaskQueue: s.taskQueue, + // With 3s TaskTimeout and 5s RunTimeout, we expect to see total of 3 attempts. + // First attempt follow by immediate retry follow by timeout and 3rd attempt after WorkflowTaskTimeout. + WorkflowTaskTimeout: 3 * time.Second, + WorkflowRunTimeout: 5 * time.Second, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn) + if err != nil { + s.Logger.Fatal("Start workflow failed with err", tag.Error(err)) + } + + s.NotNil(workflowRun) + s.True(workflowRun.GetRunID() != "") + + // wait until workflow close (it will be timeout) + err = workflowRun.Get(ctx, nil) + s.Error(err) + s.Contains(err.Error(), "timeout") + + // verify event sequence + expectedHistory := []enumspb.EventType{ + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT, + } + s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) + + // assert workflow task retried 3 times + s.Equal(3, len(calledTime)) + + s.True(calledTime[1].Sub(calledTime[0]) < time.Second) // retry immediately + s.True(calledTime[2].Sub(calledTime[1]) > time.Second*3) // retry after WorkflowTaskTimeout +} + func (s *clientIntegrationSuite) Test_BufferedQuery() { localActivityFn := func(ctx context.Context) error { time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 70c3fb0fc53..6c5480e143f 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -485,6 +485,10 @@ Update_History_Loop: tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), tag.WorkflowNamespaceID(namespaceID.String())) + if currentWorkflowTask.Attempt > 1 { + // drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout. + return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) + } msBuilder, err = handler.historyEngine.failWorkflowTask(weContext, scheduleID, startedID, wtFailedCause, request) if err != nil { return nil, err From ba35d7e13ae86ad3cdac95d70cbb2c918f615aac Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 22 Feb 2022 13:23:49 -0800 Subject: [PATCH 10/14] Allow customizing retry behavior for timeout failure (#2524) --- common/constants.go | 8 ++++ common/util.go | 12 ++++++ common/util_test.go | 37 +++++++++++++++++++ .../history/timerQueueActiveTaskExecutor.go | 3 +- service/history/workflow/retry.go | 35 ++++++++++++++---- service/history/workflow/retry_test.go | 7 ++++ 6 files changed, 93 insertions(+), 9 deletions(-) diff --git a/common/constants.go b/common/constants.go index a4af323538e..48c446542ab 100644 --- a/common/constants.go +++ b/common/constants.go @@ -111,3 +111,11 @@ const ( // DefaultTransactionSizeLimit is the largest allowed transaction size to persistence DefaultTransactionSizeLimit = 4 * 1024 * 1024 ) + +const ( + // TimeoutFailureTypePrefix is the prefix for timeout failure types + // used in retry policy + // the actual failure type will be prefix + enums.TimeoutType.String() + // e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat" + TimeoutFailureTypePrefix = "TemporalTimeout:" +) diff --git a/common/util.go b/common/util.go index 196b71e2ace..21bc461ed41 100644 --- a/common/util.go +++ b/common/util.go @@ -30,6 +30,7 @@ import ( "fmt" "math/rand" "sort" + "strings" "sync" "time" @@ -471,6 +472,17 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error { if policy.GetMaximumAttempts() < 0 { return serviceerror.NewInvalidArgument("MaximumAttempts cannot be negative on retry policy.") } + + for _, nrt := range policy.NonRetryableErrorTypes { + if strings.HasPrefix(nrt, TimeoutFailureTypePrefix) { + timeoutTypeValue := nrt[len(TimeoutFailureTypePrefix):] + timeoutType, ok := enumspb.TimeoutType_value[timeoutTypeValue] + if !ok || enumspb.TimeoutType(timeoutType) == enumspb.TIMEOUT_TYPE_UNSPECIFIED { + return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid timeout type value: %v.", timeoutTypeValue)) + } + } + } + return nil } diff --git a/common/util_test.go b/common/util_test.go index fe5a524b4a6..01ecfd31e12 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/dynamicconfig" @@ -105,6 +106,42 @@ func TestValidateRetryPolicy(t *testing.T) { wantErr: true, wantErrString: "MaximumAttempts cannot be negative on retry policy.", }, + { + name: "timeout nonretryable error - valid type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String(), + }, + }, + wantErr: false, + wantErrString: "", + }, + { + name: "timeout nonretryable error - unspecified type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_UNSPECIFIED.String(), + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: Unspecified.", + }, + { + name: "timeout nonretryable error - unknown type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + "unknown", + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: unknown.", + }, } for _, tt := range testCases { diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 2bf7d2d55b6..fab78a6f3fe 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -241,7 +241,8 @@ Loop: break Loop } - timeoutFailure := failure.NewTimeoutFailure("activity timeout", timerSequenceID.TimerType) + failureMsg := fmt.Sprintf("activity %v timeout", timerSequenceID.TimerType.String()) + timeoutFailure := failure.NewTimeoutFailure(failureMsg, timerSequenceID.TimerType) var retryState enumspb.RetryState if retryState, err = mutableState.RetryActivity( activityInfo, diff --git a/service/history/workflow/retry.go b/service/history/workflow/retry.go index 59c4866bc98..3355f2e160c 100644 --- a/service/history/workflow/retry.go +++ b/service/history/workflow/retry.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" workflowspb "go.temporal.io/server/api/workflow/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" @@ -118,8 +119,16 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { } if failure.GetTimeoutFailureInfo() != nil { - return failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || - failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_HEARTBEAT + timeoutType := failure.GetTimeoutFailureInfo().GetTimeoutType() + if timeoutType == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || + timeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT { + return !matchNonRetryableTypes( + common.TimeoutFailureTypePrefix+timeoutType.String(), + nonRetryableTypes, + ) + } + + return false } if failure.GetServerFailureInfo() != nil { @@ -131,16 +140,26 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { return false } - failureType := failure.GetApplicationFailureInfo().GetType() - for _, nrt := range nonRetryableTypes { - if nrt == failureType { - return false - } - } + return !matchNonRetryableTypes( + failure.GetApplicationFailureInfo().GetType(), + nonRetryableTypes, + ) } return true } +func matchNonRetryableTypes( + failureType string, + nonRetryableTypes []string, +) bool { + for _, nrt := range nonRetryableTypes { + if nrt == failureType { + return true + } + } + return false +} + // Helpers for creating new retry/cron workflows: func SetupNewWorkflowForRetryOrCron( diff --git a/service/history/workflow/retry_test.go b/service/history/workflow/retry_test.go index f5adb21bcf6..e06597ca013 100644 --- a/service/history/workflow/retry_test.go +++ b/service/history/workflow/retry_test.go @@ -33,6 +33,7 @@ import ( failurepb "go.temporal.io/api/failure/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/failure" @@ -65,6 +66,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -72,6 +74,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -79,6 +82,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -86,6 +90,9 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + "unknown timeout type string"})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_ServerFailureInfo{ServerFailureInfo: &failurepb.ServerFailureInfo{ From aa9a8b1b56187d138cb111ff015858d1fa5c5ab1 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Fri, 25 Feb 2022 20:13:45 -0800 Subject: [PATCH 11/14] Bump server and cli version to 1.15.1 (#2554) --- common/headers/versionChecker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index eb4a116ad99..b679c2389a5 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -41,8 +41,8 @@ const ( ClientNameTypeScriptSDK = "temporal-typescript" ClientNameCLI = "temporal-cli" - ServerVersion = "1.15.0" - CLIVersion = "1.15.0" + ServerVersion = "1.15.1" + CLIVersion = "1.15.1" // SupportedServerVersions is used by CLI and inter role communication. SupportedServerVersions = ">=1.0.0 <2.0.0" From 9d892c634e82e3744a408ffeb488263fb4f46d0c Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Mon, 28 Feb 2022 18:09:58 -0800 Subject: [PATCH 12/14] Return NOT_FOUND for invalid workflow ID (#2559) --- service/history/workflow/cache.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/history/workflow/cache.go b/service/history/workflow/cache.go index 5b5e9c54080..6af597ff3dc 100644 --- a/service/history/workflow/cache.go +++ b/service/history/workflow/cache.go @@ -30,6 +30,7 @@ import ( "context" "sync/atomic" "time" + "unicode/utf8" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" @@ -234,6 +235,11 @@ func (c *CacheImpl) validateWorkflowExecutionInfo( return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.") } + if !utf8.ValidString(execution.GetWorkflowId()) { + // We know workflow cannot exist with invalid utf8 string as WorkflowID. + return serviceerror.NewNotFound("Workflow not exists.") + } + // RunID is not provided, lets try to retrieve the RunID for current active execution if execution.GetRunId() == "" { response, err := c.getCurrentExecutionWithRetry(&persistence.GetCurrentExecutionRequest{ From 6d90f6540ff3813ab83810b788e661ca29436e22 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 4 Mar 2022 08:54:19 -0800 Subject: [PATCH 13/14] Fix potential shard deadlock issue (#2570) --- service/history/shard/context_impl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index b6ac2a2bf7d..1beed7d74b8 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -1472,6 +1472,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { s.rLock() if s.state >= contextStateStopping { + s.rUnlock() return errStoppingContext } From 6788cbb69ad7b5fd5a52c2100d7047aeb9b98838 Mon Sep 17 00:00:00 2001 From: Mike Cutalo Date: Thu, 14 Apr 2022 08:42:57 -0700 Subject: [PATCH 14/14] support statd seperator --- common/metrics/tally/statsd/reporter.go | 43 +++++++++++++++++++++---- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/common/metrics/tally/statsd/reporter.go b/common/metrics/tally/statsd/reporter.go index 546d683e97c..096a29c6b3d 100644 --- a/common/metrics/tally/statsd/reporter.go +++ b/common/metrics/tally/statsd/reporter.go @@ -27,6 +27,7 @@ package statsd import ( "bytes" "sort" + "strings" "time" "github.com/cactus/go-statsd-client/statsd" @@ -37,6 +38,7 @@ import ( type temporalTallyStatsdReporter struct { //Wrapper on top of "github.com/uber-go/tally/statsd" tallystatsd tally.StatsReporter + separator string } func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string { @@ -63,22 +65,23 @@ func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, ta func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter { return &temporalTallyStatsdReporter{ tallystatsd: tallystatsdreporter.NewReporter(statsd, opts), + separator: ".__", } } func (r *temporalTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportCounter(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportCounter(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportGauge(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportGauge(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportTimer(newName, map[string]string{}, interval) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportTimer(r.taggedName(name, tags), map[string]string{}, interval) } func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples( @@ -112,3 +115,31 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities { func (r *temporalTallyStatsdReporter) Flush() { r.tallystatsd.Flush() } + +// https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd +func (r *temporalTallyStatsdReporter) taggedName(name string, tags map[string]string) string { + var b strings.Builder + b.WriteString(name) + for k, v := range tags { + b.WriteString(r.separator) + b.WriteString(replaceChars(k)) + b.WriteByte('=') + b.WriteString(replaceChars(v)) + } + return b.String() +} + +// Replace problematic characters in tags. +func replaceChars(s string) string { + var b strings.Builder + b.Grow(len(s)) + for i := 0; i < len(s); i++ { + switch s[i] { + case '.', ':', '|', '-', '=': + b.WriteByte('_') + default: + b.WriteByte(s[i]) + } + } + return b.String() +}