Skip to content

Commit

Permalink
feat: big refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
avtakkar committed Apr 15, 2024
1 parent 26f5338 commit b67c616
Show file tree
Hide file tree
Showing 36 changed files with 842 additions and 222 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ GOLINT = golangci-lint run
# Source repository variables.
ROOT_DIR := $(shell git rev-parse --show-toplevel)
BIN_DIR = $(ROOT_DIR)/bin
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/mocks') # Exclude generated and mock code.
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/discovery/routing/mocks') # Exclude generated and mock code.
TESTS_BIN_DIR = $(BIN_DIR)/tests
COVERAGE_DIR=$(BIN_DIR)/coverage
SCRIPTS_DIR=$(ROOT_DIR)/build/ci/scripts
Expand Down
8 changes: 4 additions & 4 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"time"

"github.com/alexflint/go-arg"
p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/handlers"
"github.com/azure/peerd/pkg/containerd"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/content/provider"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/k8s"
Expand All @@ -43,10 +43,10 @@ func main() {
zerolog.SetGlobalLevel(ll)
zerolog.TimeFieldFormat = time.RFC3339Nano

l := zerolog.New(os.Stdout).With().Timestamp().Str("self", p2pcontext.NodeName).Str("version", version).Logger()
l := zerolog.New(os.Stdout).With().Timestamp().Str("self", pcontext.NodeName).Str("version", version).Logger()
ctx := l.WithContext(context.Background())

ctx, err = metrics.WithContext(ctx, p2pcontext.NodeName, "peerd")
ctx, err = metrics.WithContext(ctx, pcontext.NodeName, "peerd")
if err != nil {
l.Error().Err(err).Msg("failed to initialize metrics")
os.Exit(1)
Expand Down Expand Up @@ -86,7 +86,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {
return err
}

clientset, err := k8s.NewKubernetesInterface(p2pcontext.KubeConfigPath, p2pcontext.NodeName)
clientset, err := k8s.NewKubernetesInterface(pcontext.KubeConfigPath, pcontext.NodeName)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"io"

"github.com/azure/peerd/internal/remote"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/azure/peerd/pkg/math"
)

Expand All @@ -23,7 +23,7 @@ func FileChunkKey(name string, offset, cacheBlockSize int64) string {
}

// Fetchfile gets the content of a file from the given offset using a remote reader.
func FetchFile(r remote.Reader, name string, offset int64, count int) ([]byte, error) {
func FetchFile(r reader.Reader, name string, offset int64, count int) ([]byte, error) {
d := make([]byte, count)
l := r.Log().With().Str("name", name).Int64("offset", offset).Int("count", count).Logger()
l.Debug().Msg("fetch file start")
Expand Down
4 changes: 2 additions & 2 deletions internal/files/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strconv"
"testing"

"github.com/azure/peerd/internal/remote"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -83,4 +83,4 @@ func (m *mockReader) PreadRemote(buf []byte, offset int64) (int, error) {
}
}

var _ remote.Reader = &mockReader{}
var _ reader.Reader = &mockReader{}
4 changes: 2 additions & 2 deletions internal/files/store/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"

"github.com/azure/peerd/internal/files"
"github.com/azure/peerd/internal/remote"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/azure/peerd/pkg/math"
)

Expand All @@ -27,7 +27,7 @@ type file struct {

chunkOffset int64

reader remote.Reader
reader reader.Reader
store *store
}

Expand Down
22 changes: 11 additions & 11 deletions internal/files/store/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import (
"testing"

"github.com/azure/peerd/internal/files"
remotetests "github.com/azure/peerd/internal/remote/tests"
"github.com/azure/peerd/pkg/cache"
"github.com/azure/peerd/pkg/discovery/routing/tests"
readermocks "github.com/azure/peerd/pkg/discovery/content/reader/mocks"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
)

func TestReadAtWithChunkOffset(t *testing.T) {
data := []byte("hello world")

files.CacheBlockSize = 1 // 1 byte

s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

fWithChunkOffset := &file{
Name: "test",
reader: remotetests.NewMockReader(data),
reader: readermocks.NewMockReader(data),
store: s.(*store),
chunkOffset: 4,
}
Expand Down Expand Up @@ -78,14 +78,14 @@ func TestReadAt(t *testing.T) {

files.CacheBlockSize = 1 // 1 byte

s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

f := &file{
Name: "test",
reader: remotetests.NewMockReader(data),
reader: readermocks.NewMockReader(data),
store: s.(*store),
}
size, err := f.Fstat()
Expand Down Expand Up @@ -128,14 +128,14 @@ func TestReadAt(t *testing.T) {
func TestSeek(t *testing.T) {
data := []byte("hello world")

s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

f := &file{
Name: "test",
reader: remotetests.NewMockReader(data),
reader: readermocks.NewMockReader(data),
store: s.(*store),
}
size, err := f.Fstat()
Expand Down Expand Up @@ -192,14 +192,14 @@ func TestFstat(t *testing.T) {
t.Fatal(err)
}

s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

f := &file{
Name: "test",
reader: remotetests.NewMockReader(data),
reader: readermocks.NewMockReader(data),
store: s.(*store),
}

Expand All @@ -212,7 +212,7 @@ func TestFstat(t *testing.T) {

f = &file{
Name: "test2",
reader: remotetests.NewMockReader(data),
reader: readermocks.NewMockReader(data),
store: s.(*store),
chunkOffset: 14,
}
Expand Down
6 changes: 3 additions & 3 deletions internal/files/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ package store
import (
"time"

"github.com/gin-gonic/gin"
"github.com/azure/peerd/pkg/context"
"github.com/opencontainers/go-digest"
)

// FilesStore describes a store for files.
type FilesStore interface {
// Key tries to find the cache key for the requested content or returns empty.
Key(c *gin.Context) (key string, d digest.Digest, err error)
Key(c context.Context) (key string, d digest.Digest, err error)

// Open opens the requested file and starts prefetching it. It also returns the size of the file.
Open(c *gin.Context) (File, error)
Open(c context.Context) (File, error)

// Subscribe returns a channel that will be notified when a blob is added to the store.
Subscribe() chan string
Expand Down
27 changes: 13 additions & 14 deletions internal/files/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"strings"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files"
"github.com/azure/peerd/internal/remote"
"github.com/azure/peerd/pkg/cache"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/urlparser"
"github.com/gin-gonic/gin"
"github.com/opencontainers/go-digest"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -58,7 +57,7 @@ type prefetchableSegment struct {
offset int64
count int

reader remote.Reader
reader reader.Reader
}

// store describes a content store whose contents can come from disk or a remote source.
Expand All @@ -82,15 +81,15 @@ func (s *store) Subscribe() chan string {
}

// Open opens the requested file and starts prefetching it.
func (s *store) Open(c *gin.Context) (File, error) {
func (s *store) Open(c pcontext.Context) (File, error) {

chunkKey := c.GetString(p2pcontext.FileChunkCtxKey)
chunkKey := c.GetString(pcontext.FileChunkCtxKey)
tokens := strings.Split(chunkKey, files.FileChunkKeySep)
name := tokens[0]
alignedOff, _ := strconv.ParseInt(tokens[1], 10, 64)

log := p2pcontext.Logger(c)
if p2pcontext.IsRequestFromAPeer(c) {
log := pcontext.Logger(c)
if pcontext.IsRequestFromAPeer(c) {
// This request came from a peer. Don't serve it unless we have the requested range cached.
if ok := s.cache.Exists(name, alignedOff); !ok {
log.Info().Str("name", name).Msg("peer request not cached")
Expand All @@ -103,10 +102,10 @@ func (s *store) Open(c *gin.Context) (File, error) {
store: s,
cur: 0,
size: 0,
reader: remote.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout, s.metricsRecorder),
reader: reader.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout, s.metricsRecorder),
}

if p2pcontext.IsRequestFromAPeer(c) {
if pcontext.IsRequestFromAPeer(c) {
// Ensure this file can only serve the requested chunk.
// This is to prevent infinite loops when a peer requests a file that is not cached.
f.chunkOffset = alignedOff
Expand All @@ -122,18 +121,18 @@ func (s *store) Open(c *gin.Context) (File, error) {
}

// Key tries to find the cache key for the requested content or returns empty.
func (s *store) Key(c *gin.Context) (string, digest.Digest, error) {
log := p2pcontext.Logger(c)
func (s *store) Key(c pcontext.Context) (string, digest.Digest, error) {
log := pcontext.Logger(c)

blobUrl := p2pcontext.BlobUrl(c)
blobUrl := pcontext.BlobUrl(c)
d, err := s.parser.ParseDigest(blobUrl)
if err != nil {
log.Error().Err(err).Msg("store key")
}

startIndex := int64(0) // Default to 0 for HEADs.
if c.Request.Method == "GET" {
startIndex, err = p2pcontext.RangeStartIndex(c.Request.Header.Get("Range"))
startIndex, err = pcontext.RangeStartIndex(c.Request.Header.Get("Range"))
if err != nil {
return "", "", err
}
Expand Down
24 changes: 12 additions & 12 deletions internal/files/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"os"
"testing"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files"
"github.com/azure/peerd/pkg/discovery/routing/tests"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
"github.com/gin-gonic/gin"
"github.com/opencontainers/go-digest"
)
Expand All @@ -29,7 +29,7 @@ func TestOpenP2p(t *testing.T) {
t.Fatal(err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%v-%v", files.CacheBlockSize, files.CacheBlockSize+172))
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

expD := "sha256:d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d"
expK := fmt.Sprintf("%v%v%v", expD, files.FileChunkKeySep, files.CacheBlockSize)
Expand All @@ -40,15 +40,15 @@ func TestOpenP2p(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "url", Value: hostAndPath},
}
ctx.Set(p2pcontext.FileChunkCtxKey, expK)
ctx.Set(pcontext.FileChunkCtxKey, expK)

PrefetchWorkers = 0 // turn off prefetching
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

_, err = s.Open(ctx)
_, err = s.Open(pcontext.Context{Context: ctx})
if err != os.ErrNotExist {
t.Errorf("expected %v, got %v", os.ErrNotExist, err)
}
Expand All @@ -71,17 +71,17 @@ func TestOpenNonP2p(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "url", Value: hostAndPath},
}
ctx.Set(p2pcontext.FileChunkCtxKey, expK)
ctx.Set(pcontext.FileChunkCtxKey, expK)

PrefetchWorkers = 0 // turn off prefetching
s, err := NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewMockStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

s.Cache().PutSize(expD, 200)

_, err = s.Open(ctx)
_, err = s.Open(pcontext.Context{Context: ctx})
if err != nil {
t.Fatal(err)
}
Expand All @@ -105,12 +105,12 @@ func TestKey(t *testing.T) {
{Key: "url", Value: hostAndPath},
}

s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

k, d, err := s.Key(ctx)
k, d, err := s.Key(pcontext.Context{Context: ctx})
if err != nil {
t.Fatal(err)
}
Expand All @@ -125,7 +125,7 @@ func TestKey(t *testing.T) {
}

func TestSubscribe(t *testing.T) {
s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit b67c616

Please sign in to comment.