Skip to content

Commit

Permalink
Experimental: Unix Perm add support for SyncManager's upload (#7948)
Browse files Browse the repository at this point in the history
* Experimental: Unix Perm add support for SyncManager's upload

* CR Fixes
  • Loading branch information
N-o-Z authored Jul 2, 2024
1 parent 7653f2e commit 9bed622
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 29 deletions.
26 changes: 22 additions & 4 deletions pkg/local/sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package local

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -304,18 +305,35 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U
metadata := map[string]string{
ClientMtimeMetadataKey: strconv.FormatInt(fileStat.ModTime().Unix(), 10),
}
reader := fileWrapper{

reader := b.Reader(f)
if s.includePerm {
if strings.HasSuffix(path, uri.PathSeparator) { // Create a 0 byte reader for directories
reader = bytes.NewReader([]byte{})
}
permissions, err := getUnixPermissionFromFileInfo(fileStat)
if err != nil {
return err
}
data, err := json.Marshal(permissions)
if err != nil {
return err
}
metadata[UnixPermissionsMetadataKey] = string(data)
}

readerWrapper := fileWrapper{
file: f,
reader: b.Reader(f),
reader: reader,
}
if s.flags.Presign {
_, err = helpers.ClientUploadPreSign(
ctx, s.client, s.httpClient, remote.Repository, remote.Ref, dest, metadata, "", reader, s.flags.PresignMultipart)
ctx, s.client, s.httpClient, remote.Repository, remote.Ref, dest, metadata, "", readerWrapper, s.flags.PresignMultipart)
return err
}
// not pre-signed
_, err = helpers.ClientUpload(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader)
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", readerWrapper)
return err
}

Expand Down
167 changes: 146 additions & 21 deletions pkg/local/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func TestSyncManager_download(t *testing.T) {
ctx := context.Background()

testCases := []struct {
Name string
Contents []byte
Metadata map[string]string
Path string
UnixPerm bool
Name string
Contents []byte
Metadata map[string]string
Path string
UnixPermEnabled bool
}{
{
Name: "basic download",
Expand All @@ -60,36 +60,36 @@ func TestSyncManager_download(t *testing.T) {
Path: "my_object",
},
{
Name: "download file unix perm enabled no metadata",
Contents: []byte("foobar\n"),
Metadata: map[string]string{},
Path: "my_object",
UnixPerm: true,
Name: "download file unix perm enabled no metadata",
Contents: []byte("foobar\n"),
Metadata: map[string]string{},
Path: "my_object",
UnixPermEnabled: true,
},
{
Name: "download file unix perm enabled with metadata",
Contents: []byte("foobar\n"),
Metadata: map[string]string{
local.UnixPermissionsMetadataKey: fmt.Sprintf("{\"UID\":%d, \"GID\": %d, \"Mode\":%d}", currentUID, currentGID, 0o100755),
},
Path: "my_object",
UnixPerm: true,
Path: "my_object",
UnixPermEnabled: true,
},
{
Name: "download folder unix perm no metadata",
Contents: nil,
Metadata: map[string]string{},
Path: "folder1/",
UnixPerm: true,
Name: "download folder unix perm no metadata",
Contents: nil,
Metadata: map[string]string{},
Path: "folder1/",
UnixPermEnabled: true,
},
{
Name: "download folder unix perm with metadata",
Contents: nil,
Metadata: map[string]string{
local.UnixPermissionsMetadataKey: fmt.Sprintf("{\"UID\":%d, \"GID\": %d, \"Mode\":%d}", currentUID, currentGID, 0o40770),
},
Path: "folder2/",
UnixPerm: true,
Path: "folder2/",
UnixPermEnabled: true,
},
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSyncManager_download(t *testing.T) {
Parallelism: 1,
Presign: false,
PresignMultipart: false,
}, tt.UnixPerm)
}, tt.UnixPermEnabled)
u := &uri.URI{
Repository: "repo",
Ref: "main",
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestSyncManager_download(t *testing.T) {
expectedMode = local.DefaultDirectoryPermissions - umask
}

if tt.UnixPerm {
if tt.UnixPermEnabled {
if value, ok := tt.Metadata[local.UnixPermissionsMetadataKey]; ok {
unixPerm := &local.UnixPermissions{}
require.NoError(t, json.Unmarshal([]byte(value), &unixPerm))
Expand All @@ -203,6 +203,131 @@ func TestSyncManager_download(t *testing.T) {
}
}

func TestSyncManager_upload(t *testing.T) {
ctx := context.Background()

testCases := []struct {
Name string
Path string
Permissions *local.UnixPermissions
UnixPermEnabled bool
Mtime int64
}{
{
Name: "basic upload",
Path: "my_object",
Mtime: time.Now().Unix(),
},
{
Name: "download with client mtime",
Mtime: time.Now().Add(24 * time.Hour).Unix(),
Path: "my_object",
},
{
Name: "download file unix perm metadata disabled",
Permissions: &local.UnixPermissions{Mode: local.DefaultDirectoryPermissions},
Path: "my_object",
},
{
Name: "download file unix perm enabled no metadata",
Path: "my_object",
UnixPermEnabled: true,
},
{
Name: "download file unix perm enabled with metadata",
Path: "my_object",
UnixPermEnabled: true,
Permissions: &local.UnixPermissions{Mode: os.FileMode(0o100755)},
},
{
Name: "download folder unix perm no metadata",
Path: "folder1/",
UnixPermEnabled: true,
},
{
Name: "download folder unix perm with metadata",
Path: "folder2/",
UnixPermEnabled: true,
Permissions: &local.UnixPermissions{Mode: os.FileMode(0o40770)},
},
}

for _, tt := range testCases {
umask := syscall.Umask(0)
syscall.Umask(umask)

t.Run(tt.Name, func(t *testing.T) {
// We must create the test at the user home dir otherwise we will file to chown
home, err := os.UserHomeDir()
require.NoError(t, err)
testFolderPath := filepath.Join(home, fmt.Sprintf("sync_manager_test_%s", xid.New().String()))
require.NoError(t, os.MkdirAll(testFolderPath, os.ModePerm))
defer func() {
_ = os.RemoveAll(testFolderPath)
}()
mode := os.FileMode(local.DefaultFilePermissions)
if tt.Permissions != nil {
mode = tt.Permissions.Mode
}
localPath := fmt.Sprintf("%s%c%s", testFolderPath, os.PathSeparator, tt.Path) // Have to build manually due to Clean
isDir := strings.HasSuffix(localPath, uri.PathSeparator)
if isDir {
require.NoError(t, os.Mkdir(localPath, mode))
} else {
require.NoError(t, os.WriteFile(localPath, []byte("foobar\n"), mode))
}
require.NoError(t, os.Chtimes(localPath, time.Now(), time.Unix(tt.Mtime, 0)))

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/objects"):
// Check Chown
perm := local.UnixPermissions{}
data := []byte(r.Header.Get(apiutil.LakeFSHeaderInternalPrefix + "unix-permissions"))
if len(data) > 0 {
require.NoError(t, json.Unmarshal(data, &perm))
} else {
perm = local.GetDefaultPermissions(isDir)
}
expectedPerm := perm
if tt.UnixPermEnabled && tt.Permissions != nil {
expectedPerm.Mode = tt.Permissions.Mode
}
require.Equal(t, expectedPerm, perm)

// Check Mtime
require.Equal(t, fmt.Sprintf("%d", tt.Mtime), r.Header.Get(apiutil.LakeFSHeaderInternalPrefix+"client-mtime"))
default:
t.Fatal("Unexpected request")
}
w.WriteHeader(http.StatusOK)
})
server := httptest.NewServer(h)
defer server.Close()

testClient := getTestClient(t, server.URL)
s := local.NewSyncManager(ctx, testClient, server.Client(), local.SyncFlags{
Parallelism: 1,
Presign: false,
PresignMultipart: false,
}, tt.UnixPermEnabled)
u := &uri.URI{
Repository: "repo",
Ref: "main",
Path: nil,
}
changes := make(chan *local.Change, 2)
changes <- &local.Change{
Source: local.ChangeSourceLocal,
Path: tt.Path,
Type: local.ChangeTypeAdded,
}
close(changes)
require.NoError(t, s.Sync(testFolderPath, u, changes))
})
}
}

func getTestClient(t *testing.T, endpoint string) *apigen.ClientWithResponses {
t.Helper()
transport := http.DefaultTransport.(*http.Transport).Clone()
Expand Down
24 changes: 20 additions & 4 deletions pkg/local/unix_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local

import (
"encoding/json"
"errors"
"os"
"strings"
"sync"
Expand All @@ -19,11 +20,13 @@ const (
)

var (
// umask - internal, init only once. Use only via getDefaultPermissions call
// umask - internal, init only once. Use only via GetDefaultPermissions call
umask = -1
// defaultOwnership - internal, init only once. Use only via getDefaultPermissions call
// defaultOwnership - internal, init only once. Use only via GetDefaultPermissions call
defaultOwnership *unixOwnership
getOwnershipMutex sync.Mutex

ErrUnsupportedFS = errors.New("unsupported filesystem")
)

type unixOwnership struct {
Expand All @@ -44,7 +47,8 @@ func getUmask() int {
return umask
}

func getDefaultPermissions(isDir bool) UnixPermissions {
// GetDefaultPermissions - returns default permissions as defined by file system. Public for testing purposes
func GetDefaultPermissions(isDir bool) UnixPermissions {
getOwnershipMutex.Lock()
defer getOwnershipMutex.Unlock()
mode := DefaultFilePermissions - getUmask()
Expand All @@ -65,7 +69,7 @@ func getDefaultPermissions(isDir bool) UnixPermissions {

// getUnixPermissionFromStats - Get unix mode and ownership from object metadata, fallback to default permissions in case metadata doesn't exist
func getUnixPermissionFromStats(stats apigen.ObjectStats) (*UnixPermissions, error) {
permissions := getDefaultPermissions(strings.HasSuffix(stats.Path, uri.PathSeparator))
permissions := GetDefaultPermissions(strings.HasSuffix(stats.Path, uri.PathSeparator))
if stats.Metadata != nil {
unixPermissions, ok := stats.Metadata.Get(UnixPermissionsMetadataKey)
if ok {
Expand All @@ -78,3 +82,15 @@ func getUnixPermissionFromStats(stats apigen.ObjectStats) (*UnixPermissions, err

return &permissions, nil
}

func getUnixPermissionFromFileInfo(info os.FileInfo) (*UnixPermissions, error) {
p := UnixPermissions{}
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
p.UID = int(stat.Uid)
p.GID = int(stat.Gid)
p.Mode = os.FileMode(stat.Mode)
} else {
return nil, ErrUnsupportedFS
}
return &p, nil
}

0 comments on commit 9bed622

Please sign in to comment.