Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd separator 1.15.2 #3

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17.3
go-version: 1.17.6
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
Expand Down
81 changes: 69 additions & 12 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,36 @@ before:
hooks:
- go mod download
- ./develop/scripts/create_build_info_data.sh

archives:
- id: default
builds:
- temporal-server
- tctl
- temporal-cassandra-tool
- temporal-sql-tool
name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
format_overrides:
- goos: windows
format: zip
files:
- ./config/*

- id: no-cgo
builds:
- temporal-server-no-cgo
- tctl-no-cgo
- temporal-cassandra-tool-no-cgo
- temporal-sql-tool-no-cgo
name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}_no_cgo"
format_overrides:
- goos: windows
format: zip
files:
- ./config/*

builds:
- id: "temporal-server"
- id: temporal-server
dir: cmd/server
binary: temporal-server
goos:
Expand All @@ -13,7 +41,7 @@ builds:
goarch:
- amd64
- arm64
- id: "temporal-server-no-cgo"
- id: temporal-server-no-cgo
dir: cmd/server
binary: temporal-server
env:
Expand All @@ -23,7 +51,7 @@ builds:
goarch:
- amd64
- arm64
- id: "tctl"
- id: tctl
dir: cmd/tools/cli
binary: tctl
goos:
Expand All @@ -33,7 +61,17 @@ builds:
goarch:
- amd64
- arm64
- id: "temporal-cassandra-tool"
- id: tctl-no-cgo
dir: cmd/tools/cli
binary: tctl
env:
- CGO_ENABLED=0
goos:
- linux
goarch:
- amd64
- arm64
- id: temporal-cassandra-tool
dir: cmd/tools/cassandra
binary: temporal-cassandra-tool
goos:
Expand All @@ -43,7 +81,17 @@ builds:
goarch:
- amd64
- arm64
- id: "temporal-sql-tool"
- id: temporal-cassandra-tool-no-cgo
dir: cmd/tools/cassandra
binary: temporal-cassandra-tool
env:
- CGO_ENABLED=0
goos:
- linux
goarch:
- amd64
- arm64
- id: temporal-sql-tool
dir: cmd/tools/sql
binary: temporal-sql-tool
goos:
Expand All @@ -53,14 +101,23 @@ builds:
goarch:
- amd64
- arm64
- id: temporal-sql-tool-no-cgo
dir: cmd/tools/sql
binary: temporal-sql-tool
env:
- CGO_ENABLED=0
goos:
- linux
goarch:
- amd64
- arm64

checksum:
name_template: 'checksums.txt'
algorithm: sha256
snapshot:
name_template: "{{ .Tag }}-next"

changelog:
sort: asc
filters:
exclude:
- '^docs:'
- '^test:'
skip: true

announce:
skip: "true"
11 changes: 10 additions & 1 deletion client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

"go.temporal.io/api/serviceerror"
"google.golang.org/grpc"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -617,7 +618,15 @@ func (c *metricClient) finishMetricsRecording(
err error,
) {
if err != nil {
c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err))
switch err.(type) {
case *serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:
c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err))
}
scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures)
}
stopwatch.Stop()
Expand Down
12 changes: 11 additions & 1 deletion client/matching/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"strings"

"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"google.golang.org/grpc"

Expand Down Expand Up @@ -256,7 +257,16 @@ func (c *metricClient) finishMetricsRecording(
err error,
) {
if err != nil {
c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err))
switch err.(type) {
case *serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:

c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err))
}
scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures)
}
stopwatch.Stop()
Expand Down
11 changes: 8 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type (
Frontend GroupTLS `yaml:"frontend"`
// SystemWorker controls TLS setting for System Workers connecting to Frontend.
SystemWorker WorkerTLS `yaml:"systemWorker"`
// RemoteFrontendClients controls TLS setting for talking to remote cluster.
RemoteClusters map[string]GroupTLS `yaml:"remoteClusters"`
// ExpirationChecks defines settings for periodic checks for expiration of certificates
ExpirationChecks CertExpirationValidation `yaml:"expirationChecks"`
// Interval between refreshes of certificates loaded from files
Expand Down Expand Up @@ -484,9 +486,12 @@ func (c *Config) String() string {
return maskedYaml
}

func (r *GroupTLS) IsEnabled() bool {
return r.Server.KeyFile != "" || r.Server.KeyData != "" ||
len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 ||
func (r *GroupTLS) IsServerEnabled() bool {
return r.Server.KeyFile != "" || r.Server.KeyData != ""
}

func (r *GroupTLS) IsClientEnabled() bool {
return len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 ||
r.Client.ForceTLS
}

Expand Down
8 changes: 8 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,11 @@ const (
// DefaultTransactionSizeLimit is the largest allowed transaction size to persistence
DefaultTransactionSizeLimit = 4 * 1024 * 1024
)

const (
// TimeoutFailureTypePrefix is the prefix for timeout failure types
// used in retry policy
// the actual failure type will be prefix + enums.TimeoutType.String()
// e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat"
TimeoutFailureTypePrefix = "TemporalTimeout:"
)
4 changes: 2 additions & 2 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
ClientNameTypeScriptSDK = "temporal-typescript"
ClientNameCLI = "temporal-cli"

ServerVersion = "1.15.0"
CLIVersion = "1.15.0"
ServerVersion = "1.15.1"
CLIVersion = "1.15.1"

// SupportedServerVersions is used by CLI and inter role communication.
SupportedServerVersions = ">=1.0.0 <2.0.0"
Expand Down
43 changes: 37 additions & 6 deletions common/metrics/tally/statsd/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package statsd
import (
"bytes"
"sort"
"strings"
"time"

"github.com/cactus/go-statsd-client/statsd"
Expand All @@ -37,6 +38,7 @@ import (
type temporalTallyStatsdReporter struct {
//Wrapper on top of "github.com/uber-go/tally/statsd"
tallystatsd tally.StatsReporter
separator string
}

func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
Expand All @@ -63,22 +65,23 @@ func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, ta
func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter {
return &temporalTallyStatsdReporter{
tallystatsd: tallystatsdreporter.NewReporter(statsd, opts),
separator: ".__",
}
}

func (r *temporalTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(newName, map[string]string{}, value)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(r.taggedName(name, tags), map[string]string{}, value)
}

func (r *temporalTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(newName, map[string]string{}, value)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(r.taggedName(name, tags), map[string]string{}, value)
}

func (r *temporalTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(newName, map[string]string{}, interval)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(r.taggedName(name, tags), map[string]string{}, interval)
}

func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples(
Expand Down Expand Up @@ -112,3 +115,31 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities {
func (r *temporalTallyStatsdReporter) Flush() {
r.tallystatsd.Flush()
}

// https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd
func (r *temporalTallyStatsdReporter) taggedName(name string, tags map[string]string) string {
var b strings.Builder
b.WriteString(name)
for k, v := range tags {
b.WriteString(r.separator)
b.WriteString(replaceChars(k))
b.WriteByte('=')
b.WriteString(replaceChars(v))
}
return b.String()
}

// Replace problematic characters in tags.
func replaceChars(s string) string {
var b strings.Builder
b.Grow(len(s))
for i := 0; i < len(s); i++ {
switch s[i] {
case '.', ':', '|', '-', '=':
b.WriteByte('_')
default:
b.WriteByte(s[i])
}
}
return b.String()
}
2 changes: 1 addition & 1 deletion common/persistence/cassandra/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func applyWorkflowMutationBatch(
namespaceID,
workflowID,
runID,
workflowMutation.ExecutionInfo,
workflowMutation.ExecutionInfoBlob,
workflowMutation.ExecutionState,
workflowMutation.ExecutionStateBlob,
workflowMutation.NextEventID,
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
UpsertRequestCancelInfos: make(map[int64]*commonpb.DataBlob),
UpsertSignalInfos: make(map[int64]*commonpb.DataBlob),

ExecutionInfo: input.ExecutionInfo,
ExecutionState: input.ExecutionState,

DeleteActivityInfos: input.DeleteActivityInfos,
Expand All @@ -476,7 +477,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
NextEventID: input.NextEventID,
}

result.ExecutionInfo, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3)
result.ExecutionInfoBlob, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ type (
WorkflowID string
RunID string

ExecutionInfo *commonpb.DataBlob
ExecutionInfo *persistencespb.WorkflowExecutionInfo
ExecutionInfoBlob *commonpb.DataBlob
ExecutionState *persistencespb.WorkflowExecutionState
ExecutionStateBlob *commonpb.DataBlob
NextEventID int64
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func statusOfInternalWorkflowMutation(
return nil
}

executionInfoSize := sizeOfBlob(mutation.ExecutionInfo)
executionInfoSize := sizeOfBlob(mutation.ExecutionInfoBlob)
executionStateSize := sizeOfBlob(mutation.ExecutionStateBlob)

activityInfoCount := len(mutation.UpsertActivityInfos)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func applyWorkflowMutationTx(
tx,
namespaceID,
workflowID,
workflowMutation.ExecutionInfo,
workflowMutation.ExecutionInfoBlob,
workflowMutation.ExecutionState,
workflowMutation.NextEventID,
lastWriteVersion,
Expand Down
4 changes: 2 additions & 2 deletions common/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (
GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error)
GetGRPCListener() net.Listener
GetRingpopChannel() *tchannel.Channel
CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn
CreateInternodeGRPCConnection(hostName string) *grpc.ClientConn
CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn
CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn
}
)
Loading