Skip to content

Commit

Permalink
refactor: move metrics and distribution to pkgs (#40)
Browse files Browse the repository at this point in the history
* refactor: move metrics to pkg

* refactor: move distribution to pkgs
  • Loading branch information
avtakkar authored Apr 12, 2024
1 parent 1c858d5 commit 8f5b807
Show file tree
Hide file tree
Showing 29 changed files with 160 additions and 112 deletions.
2 changes: 1 addition & 1 deletion build/ci/scripts/kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ print_p2p_metrics() {
for pod in $( echo "$p" | tr -s " " "\012" ); do
echo "checking pod '$pod' for metrics"
kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "cat /var/log/peerdmetrics"
kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "curl http://localhost:5004/metrics/prometheus"
kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "curl http://localhost:5004/metrics/prometheus" | head -n 10
done
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/k8s"
"github.com/azure/peerd/pkg/k8s/events"
"github.com/azure/peerd/pkg/metrics"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"github.com/spf13/afero"
Expand All @@ -45,6 +46,12 @@ func main() {
l := zerolog.New(os.Stdout).With().Timestamp().Str("self", p2pcontext.NodeName).Str("version", version).Logger()
ctx := l.WithContext(context.Background())

ctx, err = metrics.WithContext(ctx, p2pcontext.NodeName, "peerd")
if err != nil {
l.Error().Err(err).Msg("failed to initialize metrics")
os.Exit(1)
}

err = run(ctx, args)
if err != nil {
l.Error().Err(err).Msg("server error")
Expand Down
2 changes: 1 addition & 1 deletion internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Logger(c *gin.Context) zerolog.Logger {
l = *ctxLog
}

return l.With().Str("correlationid", c.GetString(CorrelationIdCtxKey)).Str("url", c.Request.URL.String()).Str("range", c.Request.Header.Get("Range")).Bool("p2p", IsRequestFromAPeer(c)).Str("ip", c.ClientIP()).Str("peer", c.Request.Header.Get(NodeHeaderKey)).Logger()
return l.With().Str("correlationid", c.GetString(CorrelationIdCtxKey)).Str("url", c.Request.URL.String()).Str("range", c.Request.Header.Get("Range")).Bool("requestfrompeer", IsRequestFromAPeer(c)).Str("clientip", c.ClientIP()).Str("clientname", c.Request.Header.Get(NodeHeaderKey)).Logger()
}

// BlobUrl extracts the blob URL from the incoming request URL.
Expand Down
5 changes: 2 additions & 3 deletions internal/files/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sync/atomic"
"time"

syncmap "github.com/azure/peerd/internal/cache"
"github.com/azure/peerd/internal/files"
"github.com/dgraph-io/ristretto"
"github.com/rs/zerolog"
Expand All @@ -22,7 +21,7 @@ import (
// fileCache implements FileCache.
type fileCache struct {
fileCache *ristretto.Cache
metadataCache *syncmap.SyncMap
metadataCache *SyncMap
path string
lock sync.RWMutex
log zerolog.Logger
Expand Down Expand Up @@ -162,7 +161,7 @@ func New(ctx context.Context) Cache {
cache := &fileCache{
log: log,
path: Path,
metadataCache: syncmap.MakeSyncMap(1e7),
metadataCache: NewSyncMap(1e7),
}

var err error
Expand Down
17 changes: 9 additions & 8 deletions internal/cache/syncmap.go → internal/files/cache/syncmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sync"
)

const defaultEvictionPercentage int = 5 //The default eviction percentage used when map reaches its capacity at insertion
const defaultEvictionPercentage int = 5 // The default eviction percentage. Used when the map reaches its capacity at insertion.

// SyncMap is a map with synchronized access support
// SyncMap is a map that can be safely accessed concurrently.
type SyncMap struct {
mapObj *map[string]interface{}
lock *sync.RWMutex
Expand All @@ -32,14 +32,15 @@ func (sm *SyncMap) Get(key string) (entry interface{}, ok bool) {
func (sm *SyncMap) Set(key string, entry interface{}) {
sm.lock.Lock()
defer sm.lock.Unlock()
if _, ok := (*sm.mapObj)[key]; !ok { //We will need to add an entry
if numEntries := len(*sm.mapObj); numEntries >= sm.capacity { //exceeding capacity, remove evictionPercentage of the entries

if _, ok := (*sm.mapObj)[key]; !ok {
if numEntries := len(*sm.mapObj); numEntries >= sm.capacity {
numToEvict := numEntries * sm.evictionPercentage / 100
if numToEvict <= 1 { //We will evict one as the minimum
if numToEvict <= 1 {
numToEvict = 1
}
numEvicted := 0
for k := range *sm.mapObj { // GO map iterator will randomize the order. We just delete the first in the iterator
for k := range *sm.mapObj {
delete(*sm.mapObj, k)
numEvicted++
if numEvicted >= numToEvict {
Expand All @@ -60,9 +61,9 @@ func (sm *SyncMap) Delete(key string) {
delete(*sm.mapObj, key)
}

// MakeSyncMap creates a new SyncMap with the specified maximum number of entries.
// NewSyncMap creates a new SyncMap with the specified maximum number of entries.
// If the maximum number of entries is less than or equal to 0, it will be set to 1.
func MakeSyncMap(maxEntries int) *SyncMap {
func NewSyncMap(maxEntries int) *SyncMap {
if maxEntries <= 0 {
maxEntries = 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestSyncMapAddEvict(t *testing.T) {
sm := MakeSyncMap(100)
sm := NewSyncMap(100)
sm.evictionPercentage = 10
var wg sync.WaitGroup
addEntry := func(key string, value int) {
Expand All @@ -34,7 +34,7 @@ func TestSyncMapAddEvict(t *testing.T) {
}

func TestSyncMapAddDelete(t *testing.T) {
sm := MakeSyncMap(10)
sm := NewSyncMap(10)
var wg sync.WaitGroup

addEntry := func(key string, value int) {
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestSyncMapAddDelete(t *testing.T) {
}

func TestSyncMapUpdate(t *testing.T) {
sm := MakeSyncMap(10)
sm := NewSyncMap(10)
var wg sync.WaitGroup
addEntry := func(key string, value int) {
sm.Set(key, value)
Expand Down
15 changes: 4 additions & 11 deletions internal/files/store/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package store

import (
"context"
"crypto/rand"
"io"
"os"
Expand All @@ -17,12 +16,11 @@ import (
)

func TestReadAtWithChunkOffset(t *testing.T) {
ctx := context.Background()
data := []byte("hello world")

files.CacheBlockSize = 1 // 1 byte

s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -76,12 +74,11 @@ func TestReadAtWithChunkOffset(t *testing.T) {
}

func TestReadAt(t *testing.T) {
ctx := context.Background()
data := []byte("hello world")

files.CacheBlockSize = 1 // 1 byte

s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -129,11 +126,9 @@ func TestReadAt(t *testing.T) {
}

func TestSeek(t *testing.T) {
ctx := context.Background()

data := []byte("hello world")

s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -192,14 +187,12 @@ func TestSeek(t *testing.T) {
}

func TestFstat(t *testing.T) {
ctx := context.Background()

data, err := randomBytesN(100)
if err != nil {
t.Fatal(err)
}

s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/files/store/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
package store

import (
"context"
"crypto/rand"
"fmt"
"os"
"testing"

"github.com/azure/peerd/internal/files/cache"
"github.com/azure/peerd/pkg/metrics"
)

var (
ctxWithMetrics, _ = metrics.WithContext(context.Background(), "test", "peerd")
)

func TestMain(m *testing.M) {
Expand Down
37 changes: 20 additions & 17 deletions internal/files/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/azure/peerd/internal/files/cache"
"github.com/azure/peerd/internal/remote"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/urlparser"
"github.com/gin-gonic/gin"
"github.com/opencontainers/go-digest"
Expand All @@ -23,14 +24,15 @@ import (
// NewFilesStore creates a new store.
func NewFilesStore(ctx context.Context, r routing.Router) (FilesStore, error) {
fs := &store{
cache: cache.New(ctx),
prefetchChan: make(chan prefetchableSegment, PrefetchWorkers),
prefetchable: PrefetchWorkers > 0,
router: r,
resolveRetries: ResolveRetries,
resolveTimeout: ResolveTimeout,
blobsChan: make(chan string, 1000),
parser: urlparser.New(),
metricsRecorder: metrics.FromContext(ctx),
cache: cache.New(ctx),
prefetchChan: make(chan prefetchableSegment, PrefetchWorkers),
prefetchable: PrefetchWorkers > 0,
router: r,
resolveRetries: ResolveRetries,
resolveTimeout: ResolveTimeout,
blobsChan: make(chan string, 1000),
parser: urlparser.New(),
}

go func() {
Expand Down Expand Up @@ -61,14 +63,15 @@ type prefetchableSegment struct {

// store describes a content store whose contents can come from disk or a remote source.
type store struct {
cache cache.Cache
prefetchable bool
prefetchChan chan prefetchableSegment
router routing.Router
resolveRetries int
resolveTimeout time.Duration
blobsChan chan string
parser urlparser.Parser
metricsRecorder metrics.Metrics
cache cache.Cache
prefetchable bool
prefetchChan chan prefetchableSegment
router routing.Router
resolveRetries int
resolveTimeout time.Duration
blobsChan chan string
parser urlparser.Parser
}

var _ FilesStore = &store{}
Expand Down Expand Up @@ -100,7 +103,7 @@ func (s *store) Open(c *gin.Context) (File, error) {
store: s,
cur: 0,
size: 0,
reader: remote.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout),
reader: remote.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout, s.metricsRecorder),
}

if p2pcontext.IsRequestFromAPeer(c) {
Expand Down
9 changes: 4 additions & 5 deletions internal/files/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package store

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -44,7 +43,7 @@ func TestOpenP2p(t *testing.T) {
ctx.Set(p2pcontext.FileChunkCtxKey, expK)

PrefetchWorkers = 0 // turn off prefetching
s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -75,7 +74,7 @@ func TestOpenNonP2p(t *testing.T) {
ctx.Set(p2pcontext.FileChunkCtxKey, expK)

PrefetchWorkers = 0 // turn off prefetching
s, err := NewMockStore(context.Background(), tests.NewMockRouter(make(map[string][]string)))
s, err := NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestKey(t *testing.T) {
{Key: "url", Value: hostAndPath},
}

s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -126,7 +125,7 @@ func TestKey(t *testing.T) {
}

func TestSubscribe(t *testing.T) {
s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions internal/handlers/files/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/metrics"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)

// FilesHandler describes a handler for files.
type FilesHandler struct {
store store.FilesStore
store store.FilesStore
metricsRecorder metrics.Metrics
}

var _ gin.HandlerFunc = (&FilesHandler{}).Handle
Expand All @@ -28,7 +29,7 @@ func (h *FilesHandler) Handle(c *gin.Context) {
s := time.Now()
defer func() {
dur := time.Since(s)
metrics.Global.RecordRequest(c.Request.Method, "files", float64(dur.Milliseconds()))
h.metricsRecorder.RecordRequest(c.Request.Method, "files", float64(dur.Milliseconds()))
log.Debug().Dur("duration", dur).Msg("files handler stop")
}()

Expand Down Expand Up @@ -80,5 +81,5 @@ func (h *FilesHandler) fill(c *gin.Context) error {

// New creates a new files handler.
func New(ctx context.Context, fs store.FilesStore) *FilesHandler {
return &FilesHandler{fs}
return &FilesHandler{fs, metrics.FromContext(ctx)}
}
Loading

0 comments on commit 8f5b807

Please sign in to comment.