diff --git a/client.go b/client.go index 5b861b7..8df63d6 100644 --- a/client.go +++ b/client.go @@ -20,6 +20,9 @@ var ( // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor() + + // ClientStatsHandler is a gRPC client-side stats.Handler that provides Prometheus monitoring for RPCs. + ClientStatsHandler = DefaultClientMetrics.NewClientStatsHandler() ) func init() { @@ -55,3 +58,21 @@ func EnableClientStreamSendTimeHistogram(opts ...HistogramOption) { DefaultClientMetrics.EnableClientStreamSendTimeHistogram(opts...) prom.Register(DefaultClientMetrics.clientStreamSendHistogram) } + +// EnableClientMsgSizeReceivedBytesHistogram turns on recording of +// single message send time of streaming RPCs. +// This function acts on the DefaultClientMetrics variable and the +// default Prometheus metrics registry. +func EnableClientMsgSizeReceivedBytesHistogram(opts ...HistogramOption) { + DefaultClientMetrics.EnableMsgSizeReceivedBytesHistogram(opts...) + prom.Register(DefaultClientMetrics.clientMsgSizeReceivedHistogram) +} + +// EnableClientMsgSizeSentBytesHistogram turns on recording of +// single message send time of streaming RPCs. +// This function acts on the DefaultClientMetrics variable and the +// default Prometheus metrics registry. +func EnableClientMsgSizeSentBytesHistogram(opts ...HistogramOption) { + DefaultClientMetrics.EnableMsgSizeSentBytesHistogram(opts...) + prom.Register(DefaultClientMetrics.clientMsgSizeSentHistogram) +} diff --git a/client_metrics.go b/client_metrics.go index a344084..75aa2c8 100644 --- a/client_metrics.go +++ b/client_metrics.go @@ -7,16 +7,18 @@ import ( prom "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) // ClientMetrics represents a collection of metrics to be registered on a // Prometheus metrics registry for a gRPC client. type ClientMetrics struct { - clientStartedCounter *prom.CounterVec - clientHandledCounter *prom.CounterVec - clientStreamMsgReceived *prom.CounterVec - clientStreamMsgSent *prom.CounterVec + clientStartedCounter *prom.CounterVec + clientStartedCounterOpts prom.CounterOpts + clientHandledCounter *prom.CounterVec + clientStreamMsgReceived *prom.CounterVec + clientStreamMsgSent *prom.CounterVec clientHandledHistogramEnabled bool clientHandledHistogramOpts prom.HistogramOpts @@ -29,6 +31,14 @@ type ClientMetrics struct { clientStreamSendHistogramEnabled bool clientStreamSendHistogramOpts prom.HistogramOpts clientStreamSendHistogram *prom.HistogramVec + + clientMsgSizeReceivedHistogramEnabled bool + clientMsgSizeReceivedHistogramOpts prom.HistogramOpts + clientMsgSizeReceivedHistogram *prom.HistogramVec + + clientMsgSizeSentHistogramEnabled bool + clientMsgSizeSentHistogramOpts prom.HistogramOpts + clientMsgSizeSentHistogram *prom.HistogramVec } // NewClientMetrics returns a ClientMetrics object. Use a new instance of @@ -82,7 +92,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics { Help: "Histogram of response latency (seconds) of the gRPC single message send.", Buckets: prom.DefBuckets, }, - clientStreamSendHistogram: nil, + clientStreamSendHistogram: nil, + clientMsgSizeReceivedHistogramEnabled: false, + clientMsgSizeReceivedHistogramOpts: prom.HistogramOpts{ + Name: "grpc_client_msg_size_received_bytes", + Help: "Histogram of message sizes received by the client.", + Buckets: defMsgBytesBuckets, + }, + clientMsgSizeReceivedHistogram: nil, + clientMsgSizeSentHistogramEnabled: false, + clientMsgSizeSentHistogramOpts: prom.HistogramOpts{ + Name: "grpc_client_msg_size_sent_bytes", + Help: "Histogram of message sizes sent by the client.", + Buckets: defMsgBytesBuckets, + }, + clientMsgSizeSentHistogram: nil, } } @@ -103,6 +127,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) { if m.clientStreamSendHistogramEnabled { m.clientStreamSendHistogram.Describe(ch) } + if m.clientMsgSizeReceivedHistogramEnabled { + m.clientMsgSizeReceivedHistogram.Describe(ch) + } + if m.clientMsgSizeSentHistogramEnabled { + m.clientMsgSizeSentHistogram.Describe(ch) + } } // Collect is called by the Prometheus registry when collecting @@ -122,6 +152,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) { if m.clientStreamSendHistogramEnabled { m.clientStreamSendHistogram.Collect(ch) } + if m.clientMsgSizeReceivedHistogramEnabled { + m.clientMsgSizeReceivedHistogram.Collect(ch) + } + if m.clientMsgSizeSentHistogramEnabled { + m.clientMsgSizeSentHistogram.Collect(ch) + } } // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs. @@ -173,6 +209,38 @@ func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOpt m.clientStreamSendHistogramEnabled = true } +// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. It takes +// options to configure histogram options such as the defined buckets. +func (m *ClientMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.clientMsgSizeReceivedHistogramOpts) + } + if !m.clientMsgSizeReceivedHistogramEnabled { + m.clientMsgSizeReceivedHistogram = prom.NewHistogramVec( + m.clientMsgSizeReceivedHistogramOpts, + []string{"grpc_service", "grpc_method", "grpc_stats"}, + ) + } + m.clientMsgSizeReceivedHistogramEnabled = true +} + +// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. It +// takes options to configure histogram options such as the defined buckets. +func (m *ClientMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.clientMsgSizeSentHistogramOpts) + } + if !m.clientMsgSizeSentHistogramEnabled { + m.clientMsgSizeSentHistogram = prom.NewHistogramVec( + m.clientMsgSizeSentHistogramOpts, + []string{"grpc_service", "grpc_method", "grpc_stats"}, + ) + } + m.clientMsgSizeSentHistogramEnabled = true +} + // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs. func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -202,6 +270,13 @@ func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc } } +// NewClientStatsHandler is a gRPC client-side stats.Handler that providers Prometheus monitoring for RPCs. +func (m *ClientMetrics) NewClientStatsHandler() stats.Handler { + return &clientStatsHandler{ + clientMetrics: m, + } +} + func clientStreamType(desc *grpc.StreamDesc) grpcType { if desc.ClientStreams && !desc.ServerStreams { return ClientStream diff --git a/client_reporter.go b/client_reporter.go index 286d657..7364595 100644 --- a/client_reporter.go +++ b/client_reporter.go @@ -4,6 +4,7 @@ package grpc_prometheus import ( + "fmt" "time" "github.com/prometheus/client_golang/prometheus" @@ -31,6 +32,16 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c return r } +func newClientReporterForStatsHanlder(startTime time.Time, m *ClientMetrics, fullMethod string) *clientReporter { + r := &clientReporter{ + metrics: m, + rpcType: Unary, + startTime: startTime, + } + r.serviceName, r.methodName = splitMethodName(fullMethod) + return r +} + // timer is a helper interface to time functions. type timer interface { ObserveDuration() time.Duration @@ -54,10 +65,25 @@ func (r *clientReporter) ReceiveMessageTimer() timer { return emptyTimer } +func (r *clientReporter) StartedConn() { + r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() +} + func (r *clientReporter) ReceivedMessage() { r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() } +// ReceivedMessageSize counts the size of received messages on client-side +func (r *clientReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) { + if rpcStats == Payload { + r.ReceivedMessage() + } + + if r.metrics.clientMsgSizeReceivedHistogramEnabled { + r.metrics.clientMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size) + } +} + func (r *clientReporter) SendMessageTimer() timer { if r.metrics.clientStreamSendHistogramEnabled { hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName) @@ -71,9 +97,26 @@ func (r *clientReporter) SentMessage() { r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() } +// SentMessageSize counts the size of sent messages on client-side +func (r *clientReporter) SentMessageSize(rpcStats grpcStats, size float64) { + if rpcStats == Payload { + r.SentMessage() + } + + if r.metrics.clientMsgSizeSentHistogramEnabled { + r.metrics.clientMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size) + } +} + +// StartTime is used to reset the value of the startTime +func (r *clientReporter) StartTime(t time.Time) { + r.startTime = t +} + func (r *clientReporter) Handled(code codes.Code) { r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc() if r.metrics.clientHandledHistogramEnabled { + fmt.Printf("client handled count + 1: %v,%f\n", code, time.Since(r.startTime).Seconds()) r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds()) } } diff --git a/client_stats_handler.go b/client_stats_handler.go new file mode 100644 index 0000000..23a6516 --- /dev/null +++ b/client_stats_handler.go @@ -0,0 +1,59 @@ +package grpc_prometheus + +import ( + "context" + + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type clientStatsHandler struct { + clientMetrics *ClientMetrics +} + +// TagRPC implements the stats.Hanlder interface. +func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + rpcInfo := newRPCInfo(info.FullMethodName) + return context.WithValue(ctx, &rpcInfoKey, rpcInfo) +} + +// HandleRPC implements the stats.Hanlder interface. +func (h *clientStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { + v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo) + if !ok { + return + } + monitor := newClientReporterForStatsHanlder(v.startTime, h.clientMetrics, v.fullMethodName) + switch s := s.(type) { + case *stats.Begin: + v.startTime = s.BeginTime + monitor.StartedConn() + case *stats.End: + monitor.Handled(status.Code(s.Error)) + case *stats.InHeader: + monitor.ReceivedMessageSize(Header, float64(s.WireLength)) + case *stats.InPayload: + // TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset + // See: https://github.com/grpc/grpc-go/issues/1647 + // TODO(tonywang): response latency (seconds) of the gRPC single message received + monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5)) + case *stats.InTrailer: + monitor.ReceivedMessageSize(Tailer, float64(s.WireLength)) + case *stats.OutHeader: + // TODO: Add the sent header message size stats, if the wire length of the send header is provided + case *stats.OutPayload: + // TODO(tonywang): response latency (seconds) of the gRPC single message send + monitor.SentMessageSize(Payload, float64(s.WireLength)) + case *stats.OutTrailer: + monitor.SentMessageSize(Tailer, float64(s.WireLength)) + } +} + +// TagConn implements the stats.Hanlder interface. +func (h *clientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn implements the stats.Hanlder interface. +func (h *clientStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) { +} diff --git a/client_stats_handler_test.go b/client_stats_handler_test.go new file mode 100644 index 0000000..f4c1b9b --- /dev/null +++ b/client_stats_handler_test.go @@ -0,0 +1,174 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package grpc_prometheus + +import ( + "context" + "io" + "net" + "testing" + "time" + + pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + // client metrics must satisfy the Collector interface + _ prometheus.Collector = NewClientMetrics() +) + +func TestClientStatsHandlerSuite(t *testing.T) { + suite.Run(t, &ClientStatsHandlerTestSuite{}) +} + +type ClientStatsHandlerTestSuite struct { + suite.Suite + + serverListener net.Listener + server *grpc.Server + clientConn *grpc.ClientConn + testClient pb_testproto.TestServiceClient + ctx context.Context + cancel context.CancelFunc +} + +func (s *ClientStatsHandlerTestSuite) SetupSuite() { + var err error + + EnableClientHandlingTimeHistogram() + + EnableClientMsgSizeSentBytesHistogram() + + EnableClientMsgSizeReceivedBytesHistogram() + + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + // This is the point where we hook up the interceptor + s.server = grpc.NewServer() + pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()}) + + go func() { + s.server.Serve(s.serverListener) + }() + + s.clientConn, err = grpc.Dial( + s.serverListener.Addr().String(), + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithStatsHandler(ClientStatsHandler), + grpc.WithTimeout(2*time.Second)) + require.NoError(s.T(), err, "must not error on client Dial") + s.testClient = pb_testproto.NewTestServiceClient(s.clientConn) +} + +func (s *ClientStatsHandlerTestSuite) SetupTest() { + // Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests. + s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second) + + // Make sure every test starts with same fresh, intialized metric state. + DefaultClientMetrics.clientStartedCounter.Reset() + DefaultClientMetrics.clientHandledCounter.Reset() + DefaultClientMetrics.clientHandledHistogram.Reset() + DefaultClientMetrics.clientStreamMsgReceived.Reset() + DefaultClientMetrics.clientStreamMsgSent.Reset() + DefaultClientMetrics.clientMsgSizeReceivedHistogram.Reset() + DefaultClientMetrics.clientMsgSizeSentHistogram.Reset() + +} + +func (s *ClientStatsHandlerTestSuite) TearDownSuite() { + if s.serverListener != nil { + s.server.Stop() + s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String()) + s.serverListener.Close() + + } + if s.clientConn != nil { + s.clientConn.Close() + } +} + +func (s *ClientStatsHandlerTestSuite) TearDownTest() { + s.cancel() +} + +func (s *ClientStatsHandlerTestSuite) TestUnaryIncrementsMetrics() { + _, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK + require.NoError(s.T(), err) + requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty")) + requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty", "OK")) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty")) + + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String())) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String())) + + _, err = s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.Error(s.T(), err) + requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError")) + requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError", "FailedPrecondition")) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError")) + + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingError", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingError", Tailer.String())) +} + +func (s *ClientStatsHandlerTestSuite) TestStartedStreamingIncrementsStarted() { + _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) + require.NoError(s.T(), err) + requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + _, err = s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immediately") + requireValue(s.T(), 2, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) +} + +func (s *ClientStatsHandlerTestSuite) TestStreamingIncrementsMetrics() { + ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK + // Do a read, just for kicks. + count := 0 + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading pingList shouldn't fail") + count++ + } + require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match") + + requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "OK")) + requireValue(s.T(), countListResponses, DefaultClientMetrics.clientStreamMsgReceived.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValue(s.T(), 1, DefaultClientMetrics.clientStreamMsgSent.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), countListResponses, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String())) + + ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immediately") + + // Do a read, just to progate errors. + _, err = ss.Recv() + st, _ := status.FromError(err) + require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong") + + requireValue(s.T(), 2, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition")) + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String())) +} diff --git a/client_test.go b/client_test.go index dc6e0b2..ab222e3 100644 --- a/client_test.go +++ b/client_test.go @@ -150,3 +150,33 @@ func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMetrics() { requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition")) requireValueHistCount(s.T(), 2, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList")) } + +func (s *ClientInterceptorTestSuite) SetupSuiteWithStatsHanlder() { + var err error + + EnableClientHandlingTimeHistogram() + + EnableClientMsgSizeSentBytesHistogram() + + EnableClientMsgSizeReceivedBytesHistogram() + + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + // This is the point where we hook up the interceptor + s.server = grpc.NewServer() + pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()}) + + go func() { + s.server.Serve(s.serverListener) + }() + + s.clientConn, err = grpc.Dial( + s.serverListener.Addr().String(), + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithStatsHandler(ClientStatsHandler), + grpc.WithTimeout(2*time.Second)) + require.NoError(s.T(), err, "must not error on client Dial") + s.testClient = pb_testproto.NewTestServiceClient(s.clientConn) +} diff --git a/go.mod b/go.mod index c49d110..b0540df 100644 --- a/go.mod +++ b/go.mod @@ -8,3 +8,5 @@ require ( golang.org/x/net v0.0.0-20190213061140-3a22650c66bd google.golang.org/grpc v1.18.0 ) + +go 1.13 diff --git a/go.sum b/go.sum index 485e90a..11d9de6 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/server.go b/server.go index 322f990..4c7ef9f 100644 --- a/server.go +++ b/server.go @@ -21,6 +21,9 @@ var ( // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs. StreamServerInterceptor = DefaultServerMetrics.StreamServerInterceptor() + + // ServerStatsHandler is a gRPC server-side stats.Handler that provides Prometheus monitoring + ServerStatsHandler = DefaultServerMetrics.NewServerStatsHandler() ) func init() { @@ -46,3 +49,21 @@ func EnableHandlingTimeHistogram(opts ...HistogramOption) { DefaultServerMetrics.EnableHandlingTimeHistogram(opts...) prom.Register(DefaultServerMetrics.serverHandledHistogram) } + +// EnableServerMsgSizeReceivedBytesHistogram turns on recording of handling time +// of RPCs. Histogram metrics can be very expensive for Prometheus +// to retain and query. This function acts on the DefaultServerMetrics +// variable and the default Prometheus metrics registry. +func EnableServerMsgSizeReceivedBytesHistogram(opts ...HistogramOption) { + DefaultServerMetrics.EnableMsgSizeReceivedBytesHistogram(opts...) + prom.Register(DefaultServerMetrics.serverMsgSizeReceivedHistogram) +} + +// EnableServerMsgSizeSentBytesHistogram turns on recording of handling time +// of RPCs. Histogram metrics can be very expensive for Prometheus +// to retain and query. This function acts on the DefaultServerMetrics +// variable and the default Prometheus metrics registry. +func EnableServerMsgSizeSentBytesHistogram(opts ...HistogramOption) { + DefaultServerMetrics.EnableMsgSizeSentBytesHistogram(opts...) + prom.Register(DefaultServerMetrics.serverMsgSizeSentHistogram) +} diff --git a/server_metrics.go b/server_metrics.go index d28a46e..cdbdeb1 100644 --- a/server_metrics.go +++ b/server_metrics.go @@ -2,22 +2,30 @@ package grpc_prometheus import ( "context" + "github.com/grpc-ecosystem/go-grpc-prometheus/packages/grpcstatus" prom "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/stats" ) // ServerMetrics represents a collection of metrics to be registered on a // Prometheus metrics registry for a gRPC server. type ServerMetrics struct { - serverStartedCounter *prom.CounterVec - serverHandledCounter *prom.CounterVec - serverStreamMsgReceived *prom.CounterVec - serverStreamMsgSent *prom.CounterVec - serverHandledHistogramEnabled bool - serverHandledHistogramOpts prom.HistogramOpts - serverHandledHistogram *prom.HistogramVec + serverStartedCounter *prom.CounterVec + serverHandledCounter *prom.CounterVec + serverStreamMsgReceived *prom.CounterVec + serverStreamMsgSent *prom.CounterVec + serverHandledHistogramEnabled bool + serverHandledHistogramOpts prom.HistogramOpts + serverHandledHistogram *prom.HistogramVec + serverMsgSizeReceivedHistogramEnabled bool + serverMsgSizeReceivedHistogramOpts prom.HistogramOpts + serverMsgSizeReceivedHistogram *prom.HistogramVec + serverMsgSizeSentHistogramEnabled bool + serverMsgSizeSentHistogramOpts prom.HistogramOpts + serverMsgSizeSentHistogram *prom.HistogramVec } // NewServerMetrics returns a ServerMetrics object. Use a new instance of @@ -53,8 +61,54 @@ func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics { Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", Buckets: prom.DefBuckets, }, - serverHandledHistogram: nil, + serverHandledHistogram: nil, + serverMsgSizeReceivedHistogramEnabled: false, + serverMsgSizeReceivedHistogramOpts: prom.HistogramOpts{ + Name: "grpc_server_msg_size_received_bytes", + Help: "Histogram of message sizes received by the server.", + Buckets: defMsgBytesBuckets, + }, + serverMsgSizeReceivedHistogram: nil, + serverMsgSizeSentHistogramEnabled: false, + serverMsgSizeSentHistogramOpts: prom.HistogramOpts{ + Name: "grpc_server_msg_size_sent_bytes", + Help: "Histogram of message sizes sent by the server.", + Buckets: defMsgBytesBuckets, + }, + serverMsgSizeSentHistogram: nil, + } +} + +// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. It takes +// options to configure histogram options such as the defined buckets. +func (m *ServerMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.serverMsgSizeReceivedHistogramOpts) + } + if !m.serverMsgSizeReceivedHistogramEnabled { + m.serverMsgSizeReceivedHistogram = prom.NewHistogramVec( + m.serverMsgSizeReceivedHistogramOpts, + []string{"grpc_service", "grpc_method", "grpc_stats"}, + ) + } + m.serverMsgSizeReceivedHistogramEnabled = true +} + +// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. It takes +// options to configure histogram options such as the defined buckets. +func (m *ServerMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.serverMsgSizeSentHistogramOpts) } + if !m.serverMsgSizeSentHistogramEnabled { + m.serverMsgSizeSentHistogram = prom.NewHistogramVec( + m.serverMsgSizeSentHistogramOpts, + []string{"grpc_service", "grpc_method", "grpc_stats"}, + ) + } + m.serverMsgSizeSentHistogramEnabled = true } // EnableHandlingTimeHistogram enables histograms being registered when @@ -85,6 +139,12 @@ func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) { if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Describe(ch) } + if m.serverMsgSizeReceivedHistogramEnabled { + m.serverMsgSizeReceivedHistogram.Describe(ch) + } + if m.serverMsgSizeSentHistogramEnabled { + m.serverMsgSizeSentHistogram.Describe(ch) + } } // Collect is called by the Prometheus registry when collecting @@ -98,6 +158,12 @@ func (m *ServerMetrics) Collect(ch chan<- prom.Metric) { if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Collect(ch) } + if m.serverMsgSizeReceivedHistogramEnabled { + m.serverMsgSizeReceivedHistogram.Collect(ch) + } + if m.serverMsgSizeSentHistogramEnabled { + m.serverMsgSizeSentHistogram.Collect(ch) + } } // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs. @@ -126,6 +192,13 @@ func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc. } } +// NewServerStatsHandler is a gRPC server-side stats.Handler that providers Prometheus monitoring for RPCs. +func (m *ServerMetrics) NewServerStatsHandler() stats.Handler { + return &serverStatsHandler{ + serverMetrics: m, + } +} + // InitializeMetrics initializes all metrics, with their appropriate null // value, for all gRPC methods registered on a gRPC server. This is useful, to // ensure that all metrics exist when collecting and querying. @@ -180,6 +253,16 @@ func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.M if metrics.serverHandledHistogramEnabled { metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName) } + if metrics.serverMsgSizeReceivedHistogramEnabled { + for _, stats := range allStatss { + metrics.serverMsgSizeReceivedHistogram.GetMetricWithLabelValues(serviceName, methodName, stats.String()) + } + } + if metrics.serverMsgSizeSentHistogramEnabled { + for _, stats := range allStatss { + metrics.serverMsgSizeSentHistogram.GetMetricWithLabelValues(serviceName, methodName, stats.String()) + } + } for _, code := range allCodes { metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String()) } diff --git a/server_reporter.go b/server_reporter.go index aa9db54..53815a4 100644 --- a/server_reporter.go +++ b/server_reporter.go @@ -4,6 +4,7 @@ package grpc_prometheus import ( + "fmt" "time" "google.golang.org/grpc/codes" @@ -30,17 +31,57 @@ func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *s return r } +func newServerReporterForStatsHanlder(startTime time.Time, m *ServerMetrics, fullMethod string) *serverReporter { + r := &serverReporter{ + metrics: m, + rpcType: Unary, + startTime: startTime, + } + r.serviceName, r.methodName = splitMethodName(fullMethod) + return r +} + +func (r *serverReporter) StartedConn() { + r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() +} + func (r *serverReporter) ReceivedMessage() { r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() } +// ReceivedMessageSize counts the size of received messages on server-side +func (r *serverReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) { + if rpcStats == Payload { + r.ReceivedMessage() + } + if r.metrics.serverMsgSizeReceivedHistogramEnabled { + r.metrics.serverMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, rpcStats.String()).Observe(size) + } +} + func (r *serverReporter) SentMessage() { r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() } +// SentMessageSize counts the size of sent messages on server-side +func (r *serverReporter) SentMessageSize(rpcStats grpcStats, size float64) { + if rpcStats == Payload { + r.SentMessage() + } + if r.metrics.serverMsgSizeSentHistogramEnabled { + r.metrics.serverMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, rpcStats.String()).Observe(size) + } +} + +// StartTime is used to reset the value of the startTime +func (r *serverReporter) StartTime(t time.Time) { + r.startTime = t +} + func (r *serverReporter) Handled(code codes.Code) { r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc() if r.metrics.serverHandledHistogramEnabled { + fmt.Printf("server handled count + 1: %v,%f\n", code, time.Since(r.startTime).Seconds()) r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds()) } } diff --git a/server_stats_handler.go b/server_stats_handler.go new file mode 100644 index 0000000..b1c7175 --- /dev/null +++ b/server_stats_handler.go @@ -0,0 +1,58 @@ +package grpc_prometheus + +import ( + "context" + + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type serverStatsHandler struct { + serverMetrics *ServerMetrics +} + +// TagRPC implements the stats.Hanlder interface. +func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + rpcInfo := newRPCInfo(info.FullMethodName) + return context.WithValue(ctx, &rpcInfoKey, rpcInfo) +} + +// HandleRPC implements the stats.Hanlder interface. +func (h *serverStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { + v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo) + if !ok { + return + } + monitor := newServerReporterForStatsHanlder(v.startTime, h.serverMetrics, v.fullMethodName) + switch s := s.(type) { + case *stats.Begin: + v.startTime = s.BeginTime + monitor.StartedConn() + case *stats.End: + monitor.Handled(status.Code(s.Error)) + case *stats.InHeader: + monitor.ReceivedMessageSize(Header, float64(s.WireLength)) + case *stats.InPayload: + monitor.ReceivedMessage() + // TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset + // See: https://github.com/grpc/grpc-go/issues/1647 + monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5)) + case *stats.InTrailer: + monitor.ReceivedMessageSize(Tailer, float64(s.WireLength)) + case *stats.OutHeader: + // TODO: Add the sent header message size stats, if the wire length of the send header is provided + case *stats.OutPayload: + monitor.SentMessageSize(Payload, float64(s.WireLength)) + case *stats.OutTrailer: + monitor.SentMessageSize(Tailer, float64(s.WireLength)) + } +} + +// TagConn implements the stats.Hanlder interface. +func (h *serverStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn implements the stats.Hanlder interface. +func (h *serverStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) { +} diff --git a/server_stats_handler_test.go b/server_stats_handler_test.go new file mode 100644 index 0000000..7936d01 --- /dev/null +++ b/server_stats_handler_test.go @@ -0,0 +1,173 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package grpc_prometheus + +import ( + "context" + "io" + "net" + "testing" + "time" + + pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestServerStatsHandlerSuite(t *testing.T) { + suite.Run(t, &ServerStatsHandlerTestSuite{}) +} + +type ServerStatsHandlerTestSuite struct { + suite.Suite + + serverListener net.Listener + server *grpc.Server + clientConn *grpc.ClientConn + testClient pb_testproto.TestServiceClient + ctx context.Context + cancel context.CancelFunc +} + +func (s *ServerStatsHandlerTestSuite) SetupSuite() { + var err error + + EnableHandlingTimeHistogram() + + EnableServerMsgSizeReceivedBytesHistogram() + + EnableServerMsgSizeSentBytesHistogram() + + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + // This is the point where we hook up the interceptor + s.server = grpc.NewServer( + grpc.StatsHandler(ServerStatsHandler), + ) + pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()}) + + go func() { + s.server.Serve(s.serverListener) + }() + + s.clientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second)) + require.NoError(s.T(), err, "must not error on client Dial") + s.testClient = pb_testproto.NewTestServiceClient(s.clientConn) +} + +func (s *ServerStatsHandlerTestSuite) SetupTest() { + // Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests. + s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second) + + // Make sure every test starts with same fresh, intialized metric state. + DefaultServerMetrics.serverStartedCounter.Reset() + DefaultServerMetrics.serverHandledCounter.Reset() + DefaultServerMetrics.serverHandledHistogram.Reset() + DefaultServerMetrics.serverStreamMsgReceived.Reset() + DefaultServerMetrics.serverStreamMsgSent.Reset() + DefaultServerMetrics.serverMsgSizeReceivedHistogram.Reset() + DefaultServerMetrics.serverMsgSizeSentHistogram.Reset() + Register(s.server) +} + +func (s *ServerStatsHandlerTestSuite) TearDownSuite() { + if s.serverListener != nil { + s.server.Stop() + s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String()) + s.serverListener.Close() + + } + if s.clientConn != nil { + s.clientConn.Close() + } +} + +func (s *ServerStatsHandlerTestSuite) TearDownTest() { + s.cancel() +} + +func (s *ServerStatsHandlerTestSuite) TestUnaryIncrementsMetrics() { + _, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK + require.NoError(s.T(), err) + requireValue(s.T(), 1, DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty")) + requireValue(s.T(), 1, DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty", "OK")) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty")) + + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String())) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String())) + + _, err = s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.Error(s.T(), err) + requireValue(s.T(), 1, DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError")) + requireValue(s.T(), 1, DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError", "FailedPrecondition")) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError")) + + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String())) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String())) + requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String())) + +} + +func (s *ServerStatsHandlerTestSuite) TestStartedStreamingIncrementsStarted() { + _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) + require.NoError(s.T(), err) + requireValueWithRetry(s.ctx, s.T(), 1, + DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + _, err = s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immediately") + requireValueWithRetry(s.ctx, s.T(), 2, + DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) +} + +func (s *ServerStatsHandlerTestSuite) TestStreamingIncrementsMetrics() { + ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK + // Do a read, just for kicks. + count := 0 + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading pingList shouldn't fail") + count++ + } + require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match") + + requireValueWithRetry(s.ctx, s.T(), 1, + DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValueWithRetry(s.ctx, s.T(), 1, + DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "OK")) + requireValueWithRetry(s.ctx, s.T(), countListResponses, + DefaultServerMetrics.serverStreamMsgSent.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValueWithRetry(s.ctx, s.T(), 1, + DefaultServerMetrics.serverStreamMsgReceived.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, + DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), countListResponses, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String())) + + _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immediately") + + requireValueWithRetry(s.ctx, s.T(), 2, + DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + requireValueWithRetry(s.ctx, s.T(), 1, + DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition")) + requireValueWithRetryHistCount(s.ctx, s.T(), 2, + DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList")) + + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String())) + requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String())) + +} diff --git a/util.go b/util.go index 7987de3..41c86cf 100644 --- a/util.go +++ b/util.go @@ -5,6 +5,7 @@ package grpc_prometheus import ( "strings" + "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -26,6 +27,12 @@ var ( codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal, codes.Unavailable, codes.DataLoss, } + + allStatss = []grpcStats{Header, Payload, Tailer} + + rpcInfoKey = "rpc-info" + + defMsgBytesBuckets = []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 8192, 32768, 131072, 524288} ) func splitMethodName(fullMethodName string) (string, string) { @@ -48,3 +55,30 @@ func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType { } return BidiStream } + +type grpcStats string + +const ( + // Header indicates that the stats is the header + Header grpcStats = "header" + + // Payload indicates that the stats is the Payload + Payload grpcStats = "payload" + + // Tailer indicates that the stats is the Payload + Tailer grpcStats = "tailer" +) + +// String function returns the grpcStats with string format. +func (s grpcStats) String() string { + return string(s) +} + +type rpcInfo struct { + fullMethodName string + startTime time.Time +} + +func newRPCInfo(fullMethodName string) *rpcInfo { + return &rpcInfo{fullMethodName: fullMethodName} +}