Skip to content

Commit

Permalink
Metric for open in-use connections (#7913)
Browse files Browse the repository at this point in the history
* Add metric for open connections

* Add metric for open connections

* Add metric for open connections

* Add metric for open connections

* fix comments

* fix comments

* fix comments

* fix comments

* fix comments
  • Loading branch information
itaiad200 authored Jun 26, 2024
1 parent 9fffdab commit c7fb429
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 5 deletions.
43 changes: 42 additions & 1 deletion pkg/auth/service.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/auth/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ func TestAPIAuthService_DeleteUser(t *testing.T) {
},
}
ctx := context.Background()
mockClient.EXPECT().DeleteUserWithResponse(ctx, tt.userName).Return(response, nil)
mockClient.EXPECT().DeleteUserWithResponse(gomock.Any(), tt.userName).Return(response, nil)
err := s.DeleteUser(ctx, tt.userName)
if !errors.Is(err, tt.expectedErr) {
t.Fatalf("DeleteUser: expected err: %v got: %v", tt.expectedErr, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type BlockstoreMetadata struct {
type Adapter interface {
Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error
Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error)

// GetWalker is never called on the server side.
// TODO(itaiad200): Remove it from this interface.
GetWalker(uri *url.URL) (Walker, error)

// GetPreSignedURL returns a pre-signed URL for accessing obj with mode, and the
Expand Down
9 changes: 9 additions & 0 deletions pkg/block/factory/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ const (
)

func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) {
adapter, err := buildBlockAdapter(ctx, statsCollector, c)
if err != nil {
return nil, err
}

return block.NewMetricsAdapter(adapter), nil
}

func buildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) {
blockstore := c.BlockstoreType()
logging.FromContext(ctx).
WithField("type", blockstore).
Expand Down
133 changes: 133 additions & 0 deletions pkg/block/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package block

import (
"context"
"io"
"net/http"
"net/url"
"time"

"github.com/treeverse/lakefs/pkg/httputil"
)

type MetricsAdapter struct {
adapter Adapter
}

func NewMetricsAdapter(adapter Adapter) Adapter {
return &MetricsAdapter{adapter: adapter}
}

func (m *MetricsAdapter) InnerAdapter() Adapter {
return m.adapter
}

func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Put(ctx, obj, sizeBytes, reader, opts)
}

func (m *MetricsAdapter) Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Get(ctx, obj)
}

func (m *MetricsAdapter) GetWalker(uri *url.URL) (Walker, error) {
return m.adapter.GetWalker(uri)
}

func (m *MetricsAdapter) GetPreSignedURL(ctx context.Context, obj ObjectPointer, mode PreSignMode) (string, time.Time, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.GetPreSignedURL(ctx, obj, mode)
}

func (m *MetricsAdapter) GetPresignUploadPartURL(ctx context.Context, obj ObjectPointer, uploadID string, partNumber int) (string, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.GetPresignUploadPartURL(ctx, obj, uploadID, partNumber)
}

func (m *MetricsAdapter) Exists(ctx context.Context, obj ObjectPointer) (bool, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Exists(ctx, obj)
}

func (m *MetricsAdapter) GetRange(ctx context.Context, obj ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.GetRange(ctx, obj, startPosition, endPosition)
}

func (m *MetricsAdapter) GetProperties(ctx context.Context, obj ObjectPointer) (Properties, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.GetProperties(ctx, obj)
}

func (m *MetricsAdapter) Remove(ctx context.Context, obj ObjectPointer) error {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Remove(ctx, obj)
}

func (m *MetricsAdapter) Copy(ctx context.Context, sourceObj, destinationObj ObjectPointer) error {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Copy(ctx, sourceObj, destinationObj)
}

func (m *MetricsAdapter) CreateMultiPartUpload(ctx context.Context, obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (*CreateMultiPartUploadResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.CreateMultiPartUpload(ctx, obj, r, opts)
}

func (m *MetricsAdapter) UploadPart(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*UploadPartResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.UploadPart(ctx, obj, sizeBytes, reader, uploadID, partNumber)
}

func (m *MetricsAdapter) ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.ListParts(ctx, obj, uploadID, opts)
}

func (m *MetricsAdapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.UploadCopyPart(ctx, sourceObj, destinationObj, uploadID, partNumber)
}

func (m *MetricsAdapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*UploadPartResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.UploadCopyPartRange(ctx, sourceObj, destinationObj, uploadID, partNumber, startPosition, endPosition)
}

func (m *MetricsAdapter) AbortMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string) error {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.AbortMultiPartUpload(ctx, obj, uploadID)
}

func (m *MetricsAdapter) CompleteMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string, multipartList *MultipartUploadCompletion) (*CompleteMultiPartUploadResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.CompleteMultiPartUpload(ctx, obj, uploadID, multipartList)
}

func (m *MetricsAdapter) BlockstoreType() string {
return m.adapter.BlockstoreType()
}

func (m *MetricsAdapter) BlockstoreMetadata(ctx context.Context) (*BlockstoreMetadata, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.BlockstoreMetadata(ctx)
}

func (m *MetricsAdapter) GetStorageNamespaceInfo() StorageNamespaceInfo {
return m.adapter.GetStorageNamespaceInfo()
}

func (m *MetricsAdapter) ResolveNamespace(storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error) {
return m.adapter.ResolveNamespace(storageNamespace, key, identifierType)
}

func (m *MetricsAdapter) GetRegion(ctx context.Context, storageNamespace string) (string, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.GetRegion(ctx, storageNamespace)
}

func (m *MetricsAdapter) RuntimeStats() map[string]string {
return m.adapter.RuntimeStats()
}
16 changes: 13 additions & 3 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/go-test/deep"
"github.com/spf13/viper"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/block/gs"
"github.com/treeverse/lakefs/pkg/block/local"
Expand Down Expand Up @@ -113,8 +114,12 @@ func TestConfig_BuildBlockAdapter(t *testing.T) {
testutil.Must(t, err)
adapter, err := factory.BuildBlockAdapter(ctx, nil, c)
testutil.Must(t, err)
if _, ok := adapter.(*local.Adapter); !ok {
t.Fatalf("expected a local block adapter, got something else instead")
metricsAdapter, ok := adapter.(*block.MetricsAdapter)
if !ok {
t.Fatalf("got a %T when expecting a MetricsAdapter", adapter)
}
if _, ok := metricsAdapter.InnerAdapter().(*local.Adapter); !ok {
t.Fatalf("got %T expected a local block adapter", metricsAdapter.InnerAdapter())
}
})

Expand All @@ -134,7 +139,12 @@ func TestConfig_BuildBlockAdapter(t *testing.T) {
testutil.Must(t, err)
adapter, err := factory.BuildBlockAdapter(ctx, nil, c)
testutil.Must(t, err)
if _, ok := adapter.(*gs.Adapter); !ok {

metricsAdapter, ok := adapter.(*block.MetricsAdapter)
if !ok {
t.Fatalf("expected a metrics block adapter, got something else instead")
}
if _, ok := metricsAdapter.InnerAdapter().(*gs.Adapter); !ok {
t.Fatalf("expected an gs block adapter, got something else instead")
}
})
Expand Down
29 changes: 29 additions & 0 deletions pkg/httputil/client_trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package httputil

import (
"context"
"net/http/httptrace"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Known limitation: for HTTP2 services, like cosmosDB,
// the gauge is never reduced. Hence, we'll treat it as a counter for new connections created.
var connectionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "out_in_use_conns",
Help: "A gauge of in-use TCP connections",
}, []string{"service"})

func SetClientTrace(ctx context.Context, service string) context.Context {
trace := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
connectionGauge.WithLabelValues(service).Inc()
},
PutIdleConn: func(err error) {
connectionGauge.WithLabelValues(service).Dec()
},
}

return httptrace.WithClientTrace(ctx, trace)
}
6 changes: 6 additions & 0 deletions pkg/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/treeverse/lakefs/pkg/httputil"
)

var (
Expand All @@ -31,6 +32,7 @@ type StoreMetricsWrapper struct {
func (s *StoreMetricsWrapper) Get(ctx context.Context, partitionKey, key []byte) (*ValueWithPredicate, error) {
const operation = "Get"
timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation))
ctx = httputil.SetClientTrace(ctx, s.StoreType)
defer timer.ObserveDuration()
res, err := s.Store.Get(ctx, partitionKey, key)
if err != nil {
Expand All @@ -42,6 +44,7 @@ func (s *StoreMetricsWrapper) Get(ctx context.Context, partitionKey, key []byte)
func (s *StoreMetricsWrapper) Set(ctx context.Context, partitionKey, key, value []byte) error {
const operation = "Set"
timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation))
ctx = httputil.SetClientTrace(ctx, s.StoreType)
defer timer.ObserveDuration()
err := s.Store.Set(ctx, partitionKey, key, value)
if err != nil {
Expand All @@ -53,6 +56,7 @@ func (s *StoreMetricsWrapper) Set(ctx context.Context, partitionKey, key, value
func (s *StoreMetricsWrapper) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate Predicate) error {
const operation = "SetIf"
timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation))
ctx = httputil.SetClientTrace(ctx, s.StoreType)
defer timer.ObserveDuration()
err := s.Store.SetIf(ctx, partitionKey, key, value, valuePredicate)
if err != nil {
Expand All @@ -64,6 +68,7 @@ func (s *StoreMetricsWrapper) SetIf(ctx context.Context, partitionKey, key, valu
func (s *StoreMetricsWrapper) Delete(ctx context.Context, partitionKey, key []byte) error {
const operation = "Delete"
timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation))
ctx = httputil.SetClientTrace(ctx, s.StoreType)
defer timer.ObserveDuration()
err := s.Store.Delete(ctx, partitionKey, key)
if err != nil {
Expand All @@ -75,6 +80,7 @@ func (s *StoreMetricsWrapper) Delete(ctx context.Context, partitionKey, key []by
func (s *StoreMetricsWrapper) Scan(ctx context.Context, partitionKey []byte, options ScanOptions) (EntriesIterator, error) {
const operation = "Scan"
timer := prometheus.NewTimer(requestDuration.WithLabelValues(s.StoreType, operation))
ctx = httputil.SetClientTrace(ctx, s.StoreType)
defer timer.ObserveDuration()
res, err := s.Store.Scan(ctx, partitionKey, options)
if err != nil {
Expand Down

0 comments on commit c7fb429

Please sign in to comment.