Skip to content

Commit

Permalink
Test debt (#7889)
Browse files Browse the repository at this point in the history
* resolve rebase conflict

* Test multipart copy (#5326)

* Test multipart copy range (#5326)

* Review fixes (#5326)

* Abort case (#5326)

* Separate source for mp tests (#5326)

* presigned URL test (#5326)

* azure tests (#5326)

* review fixes (#5326)

* review fixes (#5326)

* review fixes (#5326)

* review fixes (#5326)

* review fixes (#5326)
  • Loading branch information
nadavsteindler authored Jun 30, 2024
1 parent 5831446 commit 7121a94
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 138 deletions.
3 changes: 3 additions & 0 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type BlockstoreMetadata struct {
Region *string
}

// Adapter abstract Storage Adapter for persistence of version controlled data. The methods generally map to S3 API methods
// - Generally some type of Object Storage
// - Can also be block storage or even in-memory
type Adapter interface {
Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error
Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/block/azure/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createDNSResolver() {

func runAzurite(dockerPool *dockertest.Pool) (string, func()) {
ctx := context.Background()
resource, err := dockerPool.Run("mcr.microsoft.com/azure-storage/azurite", "3.26.0", []string{
resource, err := dockerPool.Run("mcr.microsoft.com/azure-storage/azurite", "3.31.0", []string{
fmt.Sprintf("AZURITE_ACCOUNTS=%s:%s", accountName, accountKey),
})
if err != nil {
Expand Down Expand Up @@ -99,6 +99,7 @@ func runAzurite(dockerPool *dockertest.Pool) (string, func()) {
return url, closer
}

// Runs a container with mock Azure Blob Storage for use in package tests
func TestMain(m *testing.M) {
var err error
pool, err = dockertest.NewPool("")
Expand Down
203 changes: 70 additions & 133 deletions pkg/block/blocktest/adapter.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
package blocktest

import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/go-test/deep"
"github.com/stretchr/testify/require"
"github.com/thanhpk/randstr"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/ingest/store"
)

// AdapterTest Test suite of basic adapter functionality
func AdapterTest(t *testing.T, adapter block.Adapter, storageNamespace, externalPath string) {
t.Run("Adapter_PutGet", func(t *testing.T) { testAdapterPutGet(t, adapter, storageNamespace, externalPath) })
t.Run("Adapter_Copy", func(t *testing.T) { testAdapterCopy(t, adapter, storageNamespace) })
t.Run("Adapter_Remove", func(t *testing.T) { testAdapterRemove(t, adapter, storageNamespace) })
t.Run("Adapter_MultipartUpload", func(t *testing.T) { testAdapterMultipartUpload(t, adapter, storageNamespace) })
t.Run("Adapter_AbortMultiPartUpload", func(t *testing.T) { testAdapterAbortMultipartUpload(t, adapter, storageNamespace) })
t.Run("Adapter_CopyPart", func(t *testing.T) { testAdapterCopyPart(t, adapter, storageNamespace) })
t.Run("Adapter_CopyPartRange", func(t *testing.T) { testAdapterCopyPartRange(t, adapter, storageNamespace) })
t.Run("Adapter_Exists", func(t *testing.T) { testAdapterExists(t, adapter, storageNamespace) })
t.Run("Adapter_GetRange", func(t *testing.T) { testAdapterGetRange(t, adapter, storageNamespace) })
t.Run("Adapter_Walker", func(t *testing.T) { testAdapterWalker(t, adapter, storageNamespace) })
t.Run("Adapter_GetPreSignedURL", func(t *testing.T) { testGetPreSignedURL(t, adapter, storageNamespace) })
}

// Parameterized test to first Put object via Storage Adapter then Get it and check that the contents match
func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, externalPath string) {
ctx := context.Background()
const contents = "test_file"
Expand Down Expand Up @@ -69,6 +73,7 @@ func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, ex
}
}

// Test to Copy an object via Storage Adapter, then check that the contents of the copied object matches the original
func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
contents := "foo bar baz quux"
Expand All @@ -93,6 +98,7 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin
require.Equal(t, contents, string(got))
}

// Parameterized test to test valid and invalid cases for Removing an object via the adaptor
func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
const content = "Content used for testing"
Expand Down Expand Up @@ -166,136 +172,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str
}
}

func dumpPathTree(t testing.TB, ctx context.Context, adapter block.Adapter, qk block.QualifiedKey) []string {
t.Helper()
tree := make([]string, 0)

uri, err := url.Parse(qk.Format())
require.NoError(t, err)

w, err := adapter.GetWalker(uri)
require.NoError(t, err)

walker := store.NewWrapper(w, uri)
require.NoError(t, err)

err = walker.Walk(ctx, block.WalkOptions{}, func(e block.ObjectStoreEntry) error {
_, p, _ := strings.Cut(e.Address, uri.String())
tree = append(tree, p)
return nil
})
if err != nil {
t.Fatalf("walking on '%s': %s", uri.String(), err)
}
sort.Strings(tree)
return tree
}

func createMultipartFile() ([][]byte, []byte) {
const (
multipartNumberOfParts = 3
multipartPartSize = 5 * 1024 * 1024
)
parts := make([][]byte, multipartNumberOfParts)
var partsConcat []byte
for i := 0; i < multipartNumberOfParts; i++ {
parts[i] = randstr.Bytes(multipartPartSize + i)
partsConcat = append(partsConcat, parts[i]...)
}
return parts, partsConcat
}

func testAdapterMultipartUpload(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
parts, full := createMultipartFile()

cases := []struct {
name string
path string
}{
{"simple", "abc"},
{"nested", "foo/bar"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
blockstoreType := adapter.BlockstoreType()
obj := block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: c.path,
IdentifierType: block.IdentifierTypeRelative,
}
// List parts on non-existing part
_, err := adapter.ListParts(ctx, obj, "invalidId", block.ListPartsOpts{})
if blockstoreType != block.BlockstoreTypeS3 {
require.ErrorIs(t, err, block.ErrOperationNotSupported)
} else {
require.NotNil(t, err)
}

resp, err := adapter.CreateMultiPartUpload(ctx, obj, nil, block.CreateMultiPartUploadOpts{})
require.NoError(t, err)

multiParts := make([]block.MultipartPart, len(parts))
for i, content := range parts {
partNumber := i + 1
partResp, err := adapter.UploadPart(ctx, obj, int64(len(content)), bytes.NewReader(content), resp.UploadID, partNumber)
require.NoError(t, err)
multiParts[i].PartNumber = partNumber
multiParts[i].ETag = partResp.ETag
}

// List parts after upload
listResp, err := adapter.ListParts(ctx, obj, resp.UploadID, block.ListPartsOpts{})
if blockstoreType != block.BlockstoreTypeS3 {
require.ErrorIs(t, err, block.ErrOperationNotSupported)
} else {
require.NoError(t, err)
require.Equal(t, len(parts), len(listResp.Parts))
for i, part := range listResp.Parts {
require.Equal(t, multiParts[i].PartNumber, part.PartNumber)
require.Equal(t, int64(len(parts[i])), part.Size)
require.Equal(t, multiParts[i].ETag, part.ETag)
require.False(t, listResp.IsTruncated)
}
}

// List parts partial
const maxPartsConst = 2
maxParts := int32(maxPartsConst)
listResp, err = adapter.ListParts(ctx, obj, resp.UploadID, block.ListPartsOpts{MaxParts: &maxParts})
if blockstoreType != block.BlockstoreTypeS3 {
require.ErrorIs(t, err, block.ErrOperationNotSupported)
} else {
require.NoError(t, err)
require.Equal(t, int(maxParts), len(listResp.Parts))
require.True(t, listResp.IsTruncated)
require.Equal(t, strconv.Itoa(int(maxParts)), *listResp.NextPartNumberMarker)
}

_, err = adapter.CompleteMultiPartUpload(ctx, obj, resp.UploadID, &block.MultipartUploadCompletion{
Part: multiParts,
})
require.NoError(t, err)

// List parts after complete should fail
_, err = adapter.ListParts(ctx, obj, resp.UploadID, block.ListPartsOpts{})
if blockstoreType != block.BlockstoreTypeS3 {
require.ErrorIs(t, err, block.ErrOperationNotSupported)
} else {
require.NotNil(t, err)
}

reader, err := adapter.Get(ctx, obj)
require.NoError(t, err)

got, err := io.ReadAll(reader)
require.NoError(t, err)

require.Equal(t, full, got)
})
}
}

// Parameterized test of the object Exists method of the Storage adapter
func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace string) {
// TODO (niro): Test abs paths
const contents = "exists"
Expand Down Expand Up @@ -338,6 +215,7 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str
}
}

// Parameterized test of the GetRange functionality
func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
part1 := "this is the first part "
Expand Down Expand Up @@ -381,6 +259,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s
}
}

// Parameterized test to GetWalker from the Storage Adapter and check that it works
func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
const (
Expand Down Expand Up @@ -463,3 +342,61 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str
})
}
}

// Test request for a presigned URL for temporary access
func testGetPreSignedURL(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
obj, _ := objPointers(storageNamespace)

preSignedURL, exp, err := adapter.GetPreSignedURL(ctx, obj, block.PreSignModeRead)

if adapter.BlockstoreType() == block.BlockstoreTypeGS {
require.ErrorContains(t, err, "no credentials found")
return
} else if adapter.BlockstoreType() == block.BlockstoreTypeLocal {
require.ErrorIs(t, err, block.ErrOperationNotSupported)
return
}
require.NoError(t, err)
expectedExpiry := expectedURLExp(adapter)
require.Equal(t, expectedExpiry, exp)
_, err = url.Parse(preSignedURL)
require.NoError(t, err)
}

func expectedURLExp(adapter block.Adapter) time.Time {
if adapter.BlockstoreType() == block.BlockstoreTypeAzure {
// we didn't implement expiry for Azure yet
return time.Time{}
} else {
return NowMockDefault().Add(block.DefaultPreSignExpiryDuration)
}
}

func dumpPathTree(t testing.TB, ctx context.Context, adapter block.Adapter, qk block.QualifiedKey) []string {
t.Helper()
tree := make([]string, 0)

uri, err := url.Parse(qk.Format())
require.NoError(t, err, "URL Parse Error")

w, err := adapter.GetWalker(uri)
require.NoError(t, err, "GetWalker failed")

walker := store.NewWrapper(w, uri)

err = walker.Walk(ctx, block.WalkOptions{}, func(e block.ObjectStoreEntry) error {
_, p, _ := strings.Cut(e.Address, uri.String())
tree = append(tree, p)
return nil
})
if err != nil {
t.Fatalf("walking on '%s': %s", uri.String(), err)
}
sort.Strings(tree)
return tree
}

func NowMockDefault() time.Time {
return time.Date(2024, time.January, 0, 0, 0, 0, 0, time.UTC)
}
Loading

0 comments on commit 7121a94

Please sign in to comment.