Skip to content

Commit

Permalink
feat: refactor internal packages to pkg (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
avtakkar authored Apr 15, 2024
1 parent e0efc39 commit 0a8a32c
Show file tree
Hide file tree
Showing 38 changed files with 389 additions and 344 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ GOLINT = golangci-lint run
# Source repository variables.
ROOT_DIR := $(shell git rev-parse --show-toplevel)
BIN_DIR = $(ROOT_DIR)/bin
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/mocks') # Exclude generated and mock code.
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/discovery/routing/mocks') # Exclude generated and mock code.
TESTS_BIN_DIR = $(BIN_DIR)/tests
COVERAGE_DIR=$(BIN_DIR)/coverage
SCRIPTS_DIR=$(ROOT_DIR)/build/ci/scripts
Expand Down
14 changes: 7 additions & 7 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"time"

"github.com/alexflint/go-arg"
p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/handlers"
"github.com/azure/peerd/pkg/containerd"
"github.com/azure/peerd/pkg/discovery"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/content/provider"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/k8s"
"github.com/azure/peerd/pkg/k8s/events"
"github.com/azure/peerd/pkg/metrics"
Expand All @@ -43,10 +43,10 @@ func main() {
zerolog.SetGlobalLevel(ll)
zerolog.TimeFieldFormat = time.RFC3339Nano

l := zerolog.New(os.Stdout).With().Timestamp().Str("self", p2pcontext.NodeName).Str("version", version).Logger()
l := zerolog.New(os.Stdout).With().Timestamp().Str("self", pcontext.NodeName).Str("version", version).Logger()
ctx := l.WithContext(context.Background())

ctx, err = metrics.WithContext(ctx, p2pcontext.NodeName, "peerd")
ctx, err = metrics.WithContext(ctx, pcontext.NodeName, "peerd")
if err != nil {
l.Error().Err(err).Msg("failed to initialize metrics")
os.Exit(1)
Expand Down Expand Up @@ -86,7 +86,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {
return err
}

clientset, err := k8s.NewKubernetesInterface(p2pcontext.KubeConfigPath, p2pcontext.NodeName)
clientset, err := k8s.NewKubernetesInterface(pcontext.KubeConfigPath, pcontext.NodeName)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
discovery.Provide(ctx, r, containerdStore, filesStore.Subscribe())
provider.Provide(ctx, r, containerdStore, filesStore.Subscribe())
return nil
})

Expand Down
25 changes: 11 additions & 14 deletions internal/handlers/files/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"os"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)

// FilesHandler describes a handler for files.
Expand All @@ -20,11 +19,9 @@ type FilesHandler struct {
metricsRecorder metrics.Metrics
}

var _ gin.HandlerFunc = (&FilesHandler{}).Handle

// Handle handles a request for a file.
func (h *FilesHandler) Handle(c *gin.Context) {
log := p2pcontext.Logger(c).With().Str("blob", p2pcontext.BlobUrl(c)).Bool("p2p", p2pcontext.IsRequestFromAPeer(c)).Logger()
func (h *FilesHandler) Handle(c pcontext.Context) {
log := pcontext.Logger(c).With().Str("blob", pcontext.BlobUrl(c)).Bool("p2p", pcontext.IsRequestFromAPeer(c)).Logger()
log.Debug().Msg("files handler start")
s := time.Now()
defer func() {
Expand Down Expand Up @@ -56,25 +53,25 @@ func (h *FilesHandler) Handle(c *gin.Context) {

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Del("Content-Length")
w.Header().Set(p2pcontext.NodeHeaderKey, p2pcontext.NodeName)
w.Header().Set(p2pcontext.CorrelationHeaderKey, c.GetString(p2pcontext.CorrelationIdCtxKey))
w.Header().Set(pcontext.NodeHeaderKey, pcontext.NodeName)
w.Header().Set(pcontext.CorrelationHeaderKey, c.GetString(pcontext.CorrelationIdCtxKey))

http.ServeContent(w, c.Request, "file", time.Now(), f)
}

// fill fills the context with handler specific information.
func (h *FilesHandler) fill(c *gin.Context) error {
func (h *FilesHandler) fill(c pcontext.Context) error {
c.Set("handler", "files")

key, d, err := h.store.Key(c)
if err != nil {
return err
}

c.Set(p2pcontext.DigestCtxKey, d.String())
c.Set(p2pcontext.FileChunkCtxKey, key)
c.Set(p2pcontext.BlobUrlCtxKey, p2pcontext.BlobUrl(c))
c.Set(p2pcontext.BlobRangeCtxKey, c.Request.Header.Get("Range"))
c.Set(pcontext.DigestCtxKey, d.String())
c.Set(pcontext.FileChunkCtxKey, key)
c.Set(pcontext.BlobUrlCtxKey, pcontext.BlobUrl(c))
c.Set(pcontext.BlobRangeCtxKey, c.Request.Header.Get("Range"))

return nil
}
Expand Down
46 changes: 26 additions & 20 deletions internal/handlers/files/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"net/http/httptest"
"testing"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/pkg/discovery/routing/tests"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
"github.com/azure/peerd/pkg/files"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)
Expand All @@ -34,7 +34,7 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", 12, 100)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

expD := "sha256:d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d"

Expand All @@ -47,7 +47,7 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewMockStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -63,11 +63,13 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
return []byte(content), nil
})

h.Handle(ctx)
pctx := pcontext.FromContext(ctx)

h.Handle(pctx)
resp := recorder.Result()

if resp.StatusCode != http.StatusPartialContent {
t.Errorf("expected %v, got %v", http.StatusOK, ctx.Writer.Status())
t.Errorf("expected %v, got %v", http.StatusOK, pctx.Writer.Status())
}

ret, err := io.ReadAll(resp.Body)
Expand All @@ -87,7 +89,7 @@ func TestNotFoundInP2PMode(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", files.CacheBlockSize, files.CacheBlockSize+172)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

// Create a new context with the request.
ctx, _ := gin.CreateTestContext(httptest.NewRecorder())
Expand All @@ -97,14 +99,16 @@ func TestNotFoundInP2PMode(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

h := New(ctxWithMetrics, s)

h.Handle(ctx)
pmc := pcontext.FromContext(ctx)

h.Handle(pmc)
if ctx.Writer.Status() != http.StatusNotFound {
t.Errorf("expected %v, got %v", http.StatusNotFound, ctx.Writer.Status())
}
Expand All @@ -118,7 +122,7 @@ func TestFill(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", files.CacheBlockSize, files.CacheBlockSize+172)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

expD := "sha256:d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d"
expK := fmt.Sprintf("%v_%v", expD, files.CacheBlockSize)
Expand All @@ -131,26 +135,28 @@ func TestFill(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

h := New(ctxWithMetrics, s)

err = h.fill(ctx)
pmc := pcontext.FromContext(ctx)

err = h.fill(pmc)
if err != nil {
t.Fatal(err)
}
if ctx.GetString(p2pcontext.FileChunkCtxKey) != expK {
t.Errorf("expected %v, got %v", expK, ctx.GetString(p2pcontext.FileChunkCtxKey))
if ctx.GetString(pcontext.FileChunkCtxKey) != expK {
t.Errorf("expected %v, got %v", expK, ctx.GetString(pcontext.FileChunkCtxKey))
}

if ctx.GetString(p2pcontext.BlobRangeCtxKey) != expRange {
t.Errorf("expected %v, got %v", expRange, ctx.GetString(p2pcontext.BlobRangeCtxKey))
if ctx.GetString(pcontext.BlobRangeCtxKey) != expRange {
t.Errorf("expected %v, got %v", expRange, ctx.GetString(pcontext.BlobRangeCtxKey))
}

if ctx.GetString(p2pcontext.BlobUrlCtxKey) != hostAndPath+query {
t.Errorf("expected %v, got %v", hostAndPath+query, ctx.GetString(p2pcontext.BlobUrlCtxKey))
if ctx.GetString(pcontext.BlobUrlCtxKey) != hostAndPath+query {
t.Errorf("expected %v, got %v", hostAndPath+query, ctx.GetString(pcontext.BlobUrlCtxKey))
}
}
17 changes: 10 additions & 7 deletions internal/handlers/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"net/http"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
filesStore "github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/handlers/files"
v2 "github.com/azure/peerd/internal/handlers/v2"
"github.com/azure/peerd/pkg/containerd"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
filesStore "github.com/azure/peerd/pkg/files/store"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -44,10 +44,13 @@ func newEngine(ctx context.Context) *gin.Engine {
baseLog := zerolog.Ctx(ctx)

engine.Use(func(c *gin.Context) {
p2pcontext.FillCorrelationId(c)
c.Set(p2pcontext.LoggerCtxKey, baseLog)

l := p2pcontext.Logger(c)
pc := pcontext.FromContext(c)

pcontext.FillCorrelationId(pc)
c.Set(pcontext.LoggerCtxKey, baseLog)

l := pcontext.Logger(pc)
l.Debug().Msg("request start")
s := time.Now()

Expand Down Expand Up @@ -94,7 +97,7 @@ func registerRoutes(engine *gin.Engine, f, v gin.HandlerFunc) {
// @Failure 404 {string} string "Not Found"
// @Router /blobs/{url} [get]
func fileHandler(c *gin.Context) {
fh.Handle(c)
fh.Handle(pcontext.FromContext(c))
}

// v2Handler is a handler function for the /v2 API
Expand All @@ -107,5 +110,5 @@ func fileHandler(c *gin.Context) {
// @Router /v2/{repo}/manifests/{reference} [get]
// @Router /v2/{repo}/blobs/{digest} [get]
func v2Handler(c *gin.Context) {
v2h.Handle(c)
v2h.Handle(pcontext.FromContext(c))
}
6 changes: 3 additions & 3 deletions internal/handlers/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"net/http/httptest"
"testing"

"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/pkg/containerd"
"github.com/azure/peerd/pkg/discovery/routing/tests"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestNewEngine(t *testing.T) {
}

func TestHandler(t *testing.T) {
mr := tests.NewMockRouter(map[string][]string{})
mr := mocks.NewMockRouter(map[string][]string{})
ms := containerd.NewMockContainerdStore(nil)
mfs, err := store.NewMockStore(ctxWithMetrics, mr)
if err != nil {
Expand Down
23 changes: 10 additions & 13 deletions internal/handlers/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"path"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/pkg/containerd"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/oci/distribution"
"github.com/gin-gonic/gin"
)

// V2Handler describes a handler for OCI content.
Expand All @@ -23,17 +22,15 @@ type V2Handler struct {
metricsRecorder metrics.Metrics
}

var _ gin.HandlerFunc = (&V2Handler{}).Handle

// Handle handles a request for a file.
func (h *V2Handler) Handle(c *gin.Context) {
l := p2pcontext.Logger(c).With().Bool("p2p", p2pcontext.IsRequestFromAPeer(c)).Logger()
func (h *V2Handler) Handle(c pcontext.Context) {
l := pcontext.Logger(c).With().Bool("p2p", pcontext.IsRequestFromAPeer(c)).Logger()
l.Debug().Msg("v2 handler start")
s := time.Now()
defer func() {
dur := time.Since(s)
h.metricsRecorder.RecordRequest(c.Request.Method, "oci", dur.Seconds())
l.Debug().Dur("duration", dur).Str("ns", c.GetString(p2pcontext.NamespaceCtxKey)).Str("ref", c.GetString(p2pcontext.ReferenceCtxKey)).Str("digest", c.GetString(p2pcontext.DigestCtxKey)).Msg("v2 handler stop")
l.Debug().Dur("duration", dur).Str("ns", c.GetString(pcontext.NamespaceCtxKey)).Str("ref", c.GetString(pcontext.ReferenceCtxKey)).Str("digest", c.GetString(pcontext.DigestCtxKey)).Msg("v2 handler stop")
}()

p := path.Clean(c.Request.URL.Path)
Expand All @@ -54,7 +51,7 @@ func (h *V2Handler) Handle(c *gin.Context) {
return
}

if p2pcontext.IsRequestFromAPeer(c) {
if pcontext.IsRequestFromAPeer(c) {
h.registry.Handle(c)
return
} else {
Expand All @@ -64,24 +61,24 @@ func (h *V2Handler) Handle(c *gin.Context) {
}

// fill fills the context with handler specific information.
func (h *V2Handler) fill(c *gin.Context) error {
func (h *V2Handler) fill(c pcontext.Context) error {
c.Set("handler", "v2")

ns := c.Query("ns")
if ns == "" {
ns = "docker.io"
}

c.Set(p2pcontext.NamespaceCtxKey, ns)
c.Set(pcontext.NamespaceCtxKey, ns)

ref, dgst, refType, err := distribution.ParsePathComponents(ns, c.Request.URL.Path)
if err != nil {
return err
}

c.Set(p2pcontext.ReferenceCtxKey, ref)
c.Set(p2pcontext.DigestCtxKey, dgst.String())
c.Set(p2pcontext.RefTypeCtxKey, refType)
c.Set(pcontext.ReferenceCtxKey, ref)
c.Set(pcontext.DigestCtxKey, dgst.String())
c.Set(pcontext.RefTypeCtxKey, refType)

return nil
}
Expand Down
Loading

0 comments on commit 0a8a32c

Please sign in to comment.