From 4bddc001288b6c05af85183ee71a44e4780bad21 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 8 Dec 2022 10:53:55 +0100 Subject: [PATCH 01/11] use decomposedfs pkg from experimental Signed-off-by: jkoberg --- .../utils/decomposedfs/decomposedfs.go | 292 ++++++- .../utils/decomposedfs/lookup/lookup.go | 6 +- .../utils/decomposedfs/lookup/lookup_test.go | 9 +- pkg/storage/utils/decomposedfs/node/node.go | 87 +- .../utils/decomposedfs/options/options.go | 32 + pkg/storage/utils/decomposedfs/revisions.go | 52 ++ pkg/storage/utils/decomposedfs/tree/tree.go | 9 +- pkg/storage/utils/decomposedfs/upload.go | 797 ++---------------- .../utils/decomposedfs/upload/processing.go | 415 +++++++++ .../utils/decomposedfs/upload/upload.go | 471 +++++++++++ pkg/storage/utils/decomposedfs/upload_test.go | 2 - .../utils/decomposedfs/xattrs/xattrs.go | 32 +- 12 files changed, 1413 insertions(+), 791 deletions(-) create mode 100644 pkg/storage/utils/decomposedfs/upload/processing.go create mode 100644 pkg/storage/utils/decomposedfs/upload/upload.go diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 354c5c20df..60d53fd945 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -23,7 +23,10 @@ package decomposedfs //go:generate make --no-print-directory -C ../../../.. mockery NAME=Tree import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" "io" "net/url" "os" @@ -32,6 +35,7 @@ import ( "strconv" "strings" "syscall" + "time" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" @@ -39,19 +43,24 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/server" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" + "github.com/cs3org/reva/v2/pkg/storage/cache" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storage/utils/templates" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/go-micro/plugins/v4/events/natsjs" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -99,6 +108,8 @@ type Decomposedfs struct { p PermissionsChecker chunkHandler *chunking.ChunkHandler permissionsClient CS3PermissionsClient + stream events.Stream + cache cache.StatCache } // NewDefault returns an instance with default components @@ -126,10 +137,10 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore) (storage.FS, error) // New returns an implementation of the storage.FS interface that talks to // a local filesystem. func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, permissionsClient CS3PermissionsClient) (storage.FS, error) { + log := logger.New() err := tp.Setup() if err != nil { - logger.New().Error().Err(err). - Msg("could not setup tree") + log.Error().Err(err).Msg("could not setup tree") return nil, errors.Wrap(err, "could not setup tree") } @@ -141,14 +152,266 @@ func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, p filelocks.SetLockCycleDurationFactor(o.LockCycleDurationFactor) } - return &Decomposedfs{ + var ev events.Stream + if o.Events.NatsAddress != "" { + evtsCfg := o.Events + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + var certBytes bytes.Buffer + if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { + return nil, err + } + + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) + evtsCfg.TLSInsecure = false + } + + tlsConf := &tls.Config{ + InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } + ev, err = server.NewNatsStream( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.NatsAddress), + natsjs.ClusterID(evtsCfg.NatsClusterID), + ) + if err != nil { + return nil, err + } + } + + fs := &Decomposedfs{ tp: tp, lu: lu, o: o, p: p, chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")), permissionsClient: permissionsClient, - }, nil + stream: ev, + cache: cache.GetStatCache(o.StatCache.CacheStore, o.StatCache.CacheNodes, o.StatCache.CacheDatabase, "stat", 0), + } + + if o.AsyncFileUploads { + if o.Events.NatsAddress == "" { + log.Error().Msg("need nats for async file processing") + return nil, errors.New("need nats for async file processing") + } + + ch, err := events.Consume(ev, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{}) + if err != nil { + return nil, err + } + + if o.Events.NumConsumers <= 0 { + o.Events.NumConsumers = 1 + } + + for i := 0; i < o.Events.NumConsumers; i++ { + go fs.Postprocessing(ch) + } + } + + return fs, nil +} + +// Postprocessing starts the postprocessing result collector +func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { + ctx := context.TODO() + log := logger.New() + for event := range ch { + switch ev := event.(type) { + case events.PostprocessingFinished: + up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't delete the blob + } + + var ( + failed bool + keepUpload bool + ) + + switch ev.Outcome { + default: + log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + fallthrough + case events.PPOutcomeAbort: + failed = true + keepUpload = true + case events.PPOutcomeContinue: + if err := up.Finalize(); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + case events.PPOutcomeDelete: + failed = true + } + + n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + continue + } + up.Node = n + + if p, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], n.ParentID, false); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + } else { + // update parent tmtime to propagate etag change + now := time.Now() + p.SetTMTime(&now) + if err := fs.tp.Propagate(ctx, p, 0); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + + upload.Cleanup(up, failed, keepUpload) + + if err := events.Publish( + fs.stream, + events.UploadReady{ + UploadID: ev.UploadID, + Failed: failed, + ExecutingUser: ev.ExecutingUser, + FileRef: &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: up.Info.MetaData["providerID"], + SpaceId: up.Info.Storage["SpaceRoot"], + OpaqueId: up.Info.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])), + }, + }, + ); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + } + case events.VirusscanFinished: + if ev.ErrorMsg != "" { + // scan failed somehow + // Should we handle this here? + continue + } + + var n *node.Node + switch ev.UploadID { + case "": + // uploadid is empty -> this was an on-demand scan + ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) + ref := &provider.Reference{ResourceId: ev.ResourceID} + + no, err := fs.lu.NodeFromResource(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") + continue + + } + n = no + if ev.Outcome == events.PPOutcomeDelete { + // antivir wants us to delete the file. We must obey and need to + + // check if there a previous versions existing + revs, err := fs.ListRevisions(ctx, ref) + if len(revs) == 0 { + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") + } + + // no versions -> trash file + err := fs.Delete(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") + continue + } + + // now purge it from the recycle bin + if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + + // we have versions - find the newest + versions := make(map[uint64]string) // remember all versions - we need them later + var nv uint64 + for _, v := range revs { + versions[v.Mtime] = v.Key + if v.Mtime > nv { + nv = v.Mtime + } + } + + // restore newest version + if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") + continue + } + + // now find infected version + revs, err = fs.ListRevisions(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") + } + + for _, v := range revs { + // we looking for a version that was previously not there + if _, ok := versions[v.Mtime]; ok { + continue + } + + if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + + default: + // uploadid is not empty -> this is an async upload + up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue + } + + no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false) + if err != nil { + log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") + continue + } + + n = no + } + + if err := n.SetScanData(ev.Description, ev.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results") + continue + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + + default: + log.Error().Interface("event", ev).Msg("Unknown event") + } + } + } // Shutdown shuts down the storage @@ -270,12 +533,11 @@ func (fs *Decomposedfs) GetHome(ctx context.Context) (string, error) { // GetPathByID returns the fn pointed by the file id, without the internal namespace func (fs *Decomposedfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (string, error) { - node, err := fs.lu.NodeFromID(ctx, id) + n, err := fs.lu.NodeFromID(ctx, id) if err != nil { return "", err } - - rp, err := fs.p.AssemblePermissions(ctx, node) + rp, err := fs.p.AssemblePermissions(ctx, n) switch { case err != nil: return "", errtypes.InternalError(err.Error()) @@ -287,7 +549,14 @@ func (fs *Decomposedfs) GetPathByID(ctx context.Context, id *provider.ResourceId return "", errtypes.NotFound(f) } - return fs.lu.Path(ctx, node) + hp := func(n *node.Node) bool { + perms, err := fs.p.AssemblePermissions(ctx, n) + if err != nil { + return false + } + return perms.GetPath + } + return fs.lu.Path(ctx, n, hp) } // CreateDir creates the specified directory @@ -555,10 +824,13 @@ func (fs *Decomposedfs) ListFolder(ctx context.Context, ref *provider.Reference, // add this childs permissions pset, _ := n.PermissionSet(ctx) node.AddPermissions(&np, &pset) - if ri, err := children[i].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)); err == nil { - finfos = append(finfos, ri) + ri, err := children[i].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)) + if err != nil { + return nil, errtypes.InternalError(err.Error()) } + finfos = append(finfos, ri) } + return } diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index 0bd6ede652..ea0cbe981a 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -105,7 +105,7 @@ func (lu *Lookup) NodeFromSpaceID(ctx context.Context, id *provider.ResourceId) } // Path returns the path for node -func (lu *Lookup) Path(ctx context.Context, n *node.Node) (p string, err error) { +func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission func(*node.Node) bool) (p string, err error) { root := n.SpaceRoot for n.ID != root.ID { p = filepath.Join(n.Name, p) @@ -117,6 +117,10 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node) (p string, err error) Msg("Path()") return } + + if !hasPermission(n) { + break + } } p = filepath.Join("/", p) return diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup_test.go b/pkg/storage/utils/decomposedfs/lookup/lookup_test.go index aacdcc1789..16aa345951 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup_test.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup_test.go @@ -20,6 +20,7 @@ package lookup_test import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" helpers "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/testhelpers" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" . "github.com/onsi/ginkgo/v2" @@ -51,7 +52,7 @@ var _ = Describe("Lookup", func() { }) Expect(err).ToNot(HaveOccurred()) - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) }) @@ -65,7 +66,7 @@ var _ = Describe("Lookup", func() { }) Expect(err).ToNot(HaveOccurred()) - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/subdir1/file2")) Expect(n.SpaceRoot.Name).To(Equal("userid")) @@ -88,7 +89,7 @@ var _ = Describe("Lookup", func() { Expect(n.SpaceRoot).ToNot(BeNil()) // Check if we got the right node and spaceRoot - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) Expect(n.SpaceRoot.Name).To(Equal("userid")) @@ -111,7 +112,7 @@ var _ = Describe("Lookup", func() { Expect(n.SpaceRoot).ToNot(BeNil()) // Check if we got the right node and spaceRoot - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) Expect(n.SpaceRoot.Name).To(Equal("userid")) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index dd0f571003..8777b457e7 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -68,6 +68,9 @@ const ( // RootID defines the root node's ID RootID = "root" + + // ProcessingStatus is the name of the status when processing a file + ProcessingStatus = "processing" ) // Node represents a node in the tree and provides methods to get a Parent or Child instance @@ -90,7 +93,7 @@ type Node struct { type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *Node) (path string, err error) + Path(ctx context.Context, n *Node, hasPermission func(*Node) bool) (path string, err error) } // New returns a new instance of Node @@ -125,6 +128,33 @@ func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { return } +// SetMetadata populates a given key with its value. +// Note that consumers should be aware of the metadata options on xattrs.go. +func (n *Node) SetMetadata(key string, val string) (err error) { + nodePath := n.InternalPath() + if err := xattrs.Set(nodePath, key, val); err != nil { + return errors.Wrap(err, "Decomposedfs: could not set extended attribute") + } + return nil +} + +// RemoveMetadata removes a given key +func (n *Node) RemoveMetadata(key string) (err error) { + if err = xattrs.Remove(n.InternalPath(), key); err == nil || xattrs.IsAttrUnset(err) { + return nil + } + return err +} + +// GetMetadata reads the metadata for the given key +func (n *Node) GetMetadata(key string) (val string, err error) { + nodePath := n.InternalPath() + if val, err = xattrs.Get(nodePath, key); err != nil { + return "", errors.Wrap(err, "Decomposedfs: could not get extended attribute") + } + return val, nil +} + // WriteAllNodeMetadata writes the Node metadata to disk func (n *Node) WriteAllNodeMetadata() (err error) { attribs := make(map[string]string) @@ -632,7 +662,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi case returnBasename: fn = n.Name default: - fn, err = n.lu.Path(ctx, n) + fn, err = n.lu.Path(ctx, n, func(*Node) bool { return true }) if err != nil { return nil, err } @@ -659,6 +689,10 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi Name: n.Name, } + if n.IsProcessing() { + ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "status", "processing") + } + if nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER { ts, err := n.GetTreeSize() if err == nil { @@ -804,6 +838,11 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi Metadata: metadata, } + // add virusscan information + if scanned, _, date := n.ScanData(); scanned { + ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "scantime", date.Format(time.RFC3339Nano)) + } + sublog.Debug(). Interface("ri", ri). Msg("AsResourceInfo") @@ -1183,12 +1222,56 @@ func (n *Node) FindStorageSpaceRoot() error { return nil } +// MarkProcessing marks the node as being processed +func (n *Node) MarkProcessing() error { + return n.SetMetadata(xattrs.StatusPrefix, ProcessingStatus) +} + +// UnmarkProcessing removes the processing flag from the node +func (n *Node) UnmarkProcessing() error { + return n.RemoveMetadata(xattrs.StatusPrefix) +} + +// IsProcessing returns true if the node is currently being processed +func (n *Node) IsProcessing() bool { + v, err := n.GetMetadata(xattrs.StatusPrefix) + return err == nil && v == ProcessingStatus +} + // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot() bool { _, err := n.Xattr(xattrs.SpaceNameAttr) return err == nil } +// SetScanData sets the virus scan info to the node +func (n *Node) SetScanData(info string, date time.Time) error { + return xattrs.SetMultiple(n.InternalPath(), map[string]string{ + xattrs.ScanStatusPrefix: info, + xattrs.ScanDatePrefix: date.Format(time.RFC3339Nano), + }) +} + +// ScanData returns scanning information of the node +func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { + ti, _ := n.GetMetadata(xattrs.ScanDatePrefix) + if ti == "" { + return // not scanned yet + } + + t, err := time.Parse(time.RFC3339Nano, ti) + if err != nil { + return + } + + i, err := n.GetMetadata(xattrs.ScanStatusPrefix) + if err != nil { + return + } + + return true, i, t +} + // CheckQuota checks if both disk space and available quota are sufficient // Overwrite must be set to true if the new file replaces the old file e.g. // when creating a new file version. In such a case the function will diff --git a/pkg/storage/utils/decomposedfs/options/options.go b/pkg/storage/utils/decomposedfs/options/options.go index 3ae5b5cbdf..4afe2e38d2 100644 --- a/pkg/storage/utils/decomposedfs/options/options.go +++ b/pkg/storage/utils/decomposedfs/options/options.go @@ -53,10 +53,42 @@ type Options struct { PersonalSpaceAliasTemplate string `mapstructure:"personalspacealias_template"` GeneralSpaceAliasTemplate string `mapstructure:"generalspacealias_template"` + AsyncFileUploads bool `mapstructure:"asyncfileuploads"` + + Events EventOptions `mapstructure:"events"` + + Tokens TokenOptions `mapstructure:"tokens"` + + StatCache CacheOptions `mapstructure:"statcache"` + MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"` LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"` } +// EventOptions are the configurable options for events +type EventOptions struct { + NatsAddress string `mapstructure:"natsaddress"` + NatsClusterID string `mapstructure:"natsclusterid"` + TLSInsecure bool `mapstructure:"tlsinsecure"` + TLSRootCACertificate string `mapstructure:"tlsrootcacertificate"` + NumConsumers int `mapstructure:"numconsumers"` +} + +// TokenOptions are the configurable option for tokens +type TokenOptions struct { + DownloadEndpoint string `mapstructure:"download_endpoint"` + DataGatewayEndpoint string `mapstructure:"datagateway_endpoint"` + TransferSharedSecret string `mapstructure:"transfer_shared_secret"` + TransferExpires int64 `mapstructure:"transfer_expires"` +} + +// CacheOptions contains options of configuring a cache +type CacheOptions struct { + CacheStore string `mapstructure:"cache_store"` + CacheNodes []string `mapstructure:"cache_nodes"` + CacheDatabase string `mapstructure:"cache_database"` +} + // New returns a new Options instance for the given configuration func New(m map[string]interface{}) (*Options, error) { o := &Options{} diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index dab017d137..e97b970392 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -278,3 +278,55 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer log.Error().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("original node does not exist") return nil } + +// DeleteRevision deletes the specified revision of the resource +func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Reference, revisionKey string) error { + n, err := fs.getRevisionNode(ctx, ref, revisionKey, func(rp *provider.ResourcePermissions) bool { + return rp.RestoreFileVersion + }) + if err != nil { + return err + } + + if err := os.RemoveAll(fs.lu.InternalPath(n.SpaceID, revisionKey)); err != nil { + return err + } + + return fs.tp.DeleteBlob(n) +} + +func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { + log := appctx.GetLogger(ctx) + + // verify revision key format + kp := strings.SplitN(revisionKey, node.RevisionIDDelimiter, 2) + if len(kp) != 2 { + log.Error().Str("revisionKey", revisionKey).Msg("malformed revisionKey") + return nil, errtypes.NotFound(revisionKey) + } + log.Debug().Str("revisionKey", revisionKey).Msg("DownloadRevision") + + spaceID := ref.ResourceId.SpaceId + // check if the node is available and has not been deleted + n, err := node.ReadNode(ctx, fs.lu, spaceID, kp[0], false) + if err != nil { + return nil, err + } + if !n.Exists { + err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) + return nil, err + } + + p, err := fs.p.AssemblePermissions(ctx, n) + switch { + case err != nil: + return nil, errtypes.InternalError(err.Error()) + case !hasPermission(&p): + return nil, errtypes.PermissionDenied(filepath.Join(n.ParentID, n.Name)) + } + + // Set space owner in context + storagespace.ContextSendSpaceOwnerID(ctx, n.SpaceOwnerOrManager(ctx)) + + return n, nil +} diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 7aab6712ba..5fe9e32a3d 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -19,6 +19,7 @@ package tree import ( + "bytes" "context" "fmt" "io" @@ -60,7 +61,7 @@ type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *node.Node) (path string, err error) + Path(ctx context.Context, n *node.Node, hasPermission func(*node.Node) bool) (path string, err error) } // Tree manages a hierarchical tree @@ -478,7 +479,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { } // get the original path - origin, err := t.lookup.Path(ctx, n) + origin, err := t.lookup.Path(ctx, n, func(*node.Node) bool { return true }) if err != nil { return } @@ -915,6 +916,10 @@ func (t *Tree) WriteBlob(node *node.Node, reader io.Reader) error { // ReadBlob reads a blob from the blobstore func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { + if node.BlobID == "" { + // there is no blob yet - we are dealing with a 0 byte file + return io.NopCloser(bytes.NewReader([]byte{})), nil + } return t.blobstore.Download(node) } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 66c62acc6b..b62dfecfb8 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -20,15 +20,7 @@ package decomposedfs import ( "context" - "crypto/md5" - "crypto/sha1" - "encoding/hex" - "encoding/json" - "fmt" - "hash" - "hash/adler32" "io" - iofs "io/fs" "os" "path/filepath" "regexp" @@ -45,30 +37,24 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" - "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/google/uuid" "github.com/pkg/errors" ) -var defaultFilePerm = os.FileMode(0664) - // Upload uploads data to the given resource // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { - upload, err := fs.GetUpload(ctx, ref.GetPath()) + up, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") } - uploadInfo := upload.(*fileUpload) + uploadInfo := up.(*upload.Upload) - p := uploadInfo.info.Storage["NodeName"] + p := uploadInfo.Info.Storage["NodeName"] if chunking.IsChunked(p) { // check chunking v1 var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) @@ -81,7 +67,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } - uploadInfo.info.Storage["NodeName"] = p + uploadInfo.Info.Storage["NodeName"] = p fd, err := os.Open(assembledFile) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") @@ -100,7 +86,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } if uff != nil { - info := uploadInfo.info + info := uploadInfo.Info uploadRef := &provider.Reference{ ResourceId: &provider.ResourceId{ StorageId: info.MetaData["providerID"], @@ -109,7 +95,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i }, Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) + owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } @@ -122,14 +108,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i ri := provider.ResourceInfo{ // fill with at least fileid, mtime and etag Id: &provider.ResourceId{ - StorageId: uploadInfo.info.MetaData["providerID"], - SpaceId: uploadInfo.info.Storage["SpaceRoot"], - OpaqueId: uploadInfo.info.Storage["NodeId"], + StorageId: uploadInfo.Info.MetaData["providerID"], + SpaceId: uploadInfo.Info.Storage["SpaceRoot"], + OpaqueId: uploadInfo.Info.Storage["NodeId"], }, - Etag: uploadInfo.info.MetaData["etag"], + Etag: uploadInfo.Info.MetaData["etag"], } - if mtime, err := utils.MTimeToTS(uploadInfo.info.MetaData["mtime"]); err == nil { + if mtime, err := utils.MTimeToTS(uploadInfo.Info.MetaData["mtime"]); err == nil { ri.Mtime = &mtime } @@ -154,7 +140,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere // permissions are checked in NewUpload below - relative, err := fs.lu.Path(ctx, n) + relative, err := fs.lu.Path(ctx, n, func(*node.Node) bool { return true }) if err != nil { return nil, err } @@ -235,225 +221,23 @@ func (fs *Decomposedfs) UseIn(composer *tusd.StoreComposer) { // - the upload needs to implement the tusd.Upload interface: WriteChunk, GetInfo, GetReader and FinishUpload // NewUpload returns a new tus Upload instance -func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") - - if info.MetaData["filename"] == "" { - return nil, errors.New("Decomposedfs: missing filename in metadata") - } - if info.MetaData["dir"] == "" { - return nil, errors.New("Decomposedfs: missing dir in metadata") - } - - n, err := fs.lu.NodeFromSpaceID(ctx, &provider.ResourceId{ - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error getting space root node") - } - - n, err = fs.lookupNode(ctx, n, filepath.Join(info.MetaData["dir"], info.MetaData["filename"])) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - log.Debug().Interface("info", info).Interface("node", n).Msg("Decomposedfs: resolved filename") - - // the parent owner will become the new owner - p, perr := n.Parent() - if perr != nil { - return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) - } - - // check permissions - var checkNode *node.Node - var f string - if n.Exists { - // check permissions of file to be overwritten - checkNode = n - f, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: n.SpaceID, - OpaqueId: n.ID, - }}) - } else { - // check permissions of parent - checkNode = p - f, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: p.SpaceID, - OpaqueId: p.ID, - }, Path: n.Name}) - } - rp, err := fs.p.AssemblePermissions(ctx, checkNode) - switch { - case err != nil: - return nil, errtypes.InternalError(err.Error()) - case !rp.InitiateFileUpload: - if rp.Stat { - return nil, errtypes.PermissionDenied(f) - } - return nil, errtypes.NotFound(f) - } - - // if we are trying to overwriting a folder with a file - if n.Exists && n.IsDir() { - return nil, errtypes.PreconditionFailed("resource is not a file") - } - - // check lock - if info.MetaData["lockid"] != "" { - ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData["lockid"]) - } - if err := n.CheckLock(ctx); err != nil { - return nil, err - } - - info.ID = uuid.New().String() - - binPath, err := fs.getUploadPath(ctx, info.ID) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error resolving upload path") - } - usr := ctxpkg.ContextMustGetUser(ctx) - - spaceRoot := n.SpaceRoot.ID - if info.Storage != nil && info.Storage["SpaceRoot"] != "" { - spaceRoot = info.Storage["SpaceRoot"] - } - - info.Storage = map[string]string{ - "Type": "OCISStore", - "BinPath": binPath, - - "NodeId": n.ID, - "NodeParentId": n.ParentID, - "NodeName": n.Name, - "SpaceRoot": spaceRoot, - "SpaceOwnerOrManager": info.Storage["SpaceOwnerOrManager"], - - "Idp": usr.Id.Idp, - "UserId": usr.Id.OpaqueId, - "UserType": utils.UserTypeToString(usr.Id.Type), - "UserName": usr.Username, - - "LogLevel": log.GetLevel().String(), - } - // Create binary file in the upload folder with no content - log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info") - file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) - if err != nil { - return nil, err - } - defer file.Close() - - u := &fileUpload{ - info: info, - binPath: binPath, - infoPath: filepath.Join(fs.o.Root, "uploads", info.ID+".info"), - fs: fs, - ctx: ctx, - } - - // writeInfo creates the file by itself if necessary - err = u.writeInfo() - if err != nil { - return nil, err - } - - return u, nil -} - -func (fs *Decomposedfs) getUploadPath(ctx context.Context, uploadID string) (string, error) { - return filepath.Join(fs.o.Root, "uploads", uploadID), nil -} - -func (fs *Decomposedfs) readInfo(id string) (tusd.FileInfo, error) { - infoPath := filepath.Join(fs.o.Root, "uploads", id+".info") - - info := tusd.FileInfo{} - data, err := os.ReadFile(infoPath) - if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - // Interpret os.ErrNotExist as 404 Not Found - err = tusd.ErrNotFound - } - return tusd.FileInfo{}, err - } - if err := json.Unmarshal(data, &info); err != nil { - return tusd.FileInfo{}, err - } - - stat, err := os.Stat(info.Storage["BinPath"]) - if err != nil { - return tusd.FileInfo{}, err - } - info.Offset = stat.Size() - - return info, nil +func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd.Upload, error) { + return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } // GetUpload returns the Upload for the given upload id func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - l := appctx.GetLogger(ctx) - sub := l.With().Int("pid", os.Getpid()).Logger() - info, err := fs.readInfo(id) - if err != nil { - return nil, err - } - - u := &userpb.User{ - Id: &userpb.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - Type: utils.UserTypeMap(info.Storage["UserType"]), - }, - Username: info.Storage["UserName"], - } - - ctx = ctxpkg.ContextSetUser(ctx, u) - ctx = appctx.WithLogger(ctx, &sub) - - return &fileUpload{ - info: info, - binPath: info.Storage["BinPath"], - infoPath: filepath.Join(fs.o.Root, "uploads", id+".info"), - fs: fs, - ctx: ctx, - }, nil + return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } // ListUploads returns a list of all incomplete uploads func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { - return fs.uploadInfos() -} - -func (fs *Decomposedfs) uploadInfos() ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} - infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) - if err != nil { - return nil, err - } - - idRegexp := regexp.MustCompile(".*/([^/]+).info") - for _, info := range infoFiles { - match := idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { - continue - } - info, err := fs.readInfo(match[1]) - if err != nil { - return nil, err - } - infos = append(infos, info) - } - return infos, nil + return fs.uploadInfos(context.Background()) } // PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos() + infos, err := fs.uploadInfos(context.Background()) if err != nil { return err } @@ -478,535 +262,50 @@ func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) err return nil } -// lookupNode looks up nodes by path. -// This method can also handle lookups for paths which contain chunking information. -func (fs *Decomposedfs) lookupNode(ctx context.Context, spaceRoot *node.Node, path string) (*node.Node, error) { - p := path - isChunked := chunking.IsChunked(path) - if isChunked { - chunkInfo, err := chunking.GetChunkBLOBInfo(path) - if err != nil { - return nil, err - } - p = chunkInfo.Path - } - - n, err := fs.lu.WalkPath(ctx, spaceRoot, p, true, func(ctx context.Context, n *node.Node) error { return nil }) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - if isChunked { - n.Name = filepath.Base(path) - } - return n, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *Decomposedfs - // a context with a user - // TODO add logger as well? - ctx context.Context -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) - if err != nil { - return 0, err - } - defer file.Close() - - // calculate cheksum here? needed for the TUS checksum extension. https://tus.io/protocols/resumable-upload.html#checksum - // TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ... - // It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used - // but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ... - n, err := io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for the ocis driver it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() // TODO info is written here ... we need to truncate in DiscardChunk - - return n, err -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { - return os.Open(upload.binPath) -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) - if err != nil { - return err - } - return os.WriteFile(upload.infoPath, data, defaultFilePerm) -} - -// FinishUpload finishes an upload and moves the file to the internal destination -// -// # upload steps -// check if match header to fail early -// copy blob -// lock metadata node -// check if match header again as safeguard -// read metadata -// create version node with current metadata -// update node metadata with new blobid etc -// remember size diff -// unlock metadata -// propagate size diff and new etag -// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant -// - propagation needs to propagate the diff -func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { - - // ensure cleanup - defer upload.discardChunk() - - fi, err := os.Stat(upload.binPath) - if err != nil { - appctx.GetLogger(upload.ctx).Err(err).Msg("Decomposedfs: could not stat uploaded file") - return - } - - spaceID := upload.info.Storage["SpaceRoot"] - newNode := node.New( - spaceID, - upload.info.Storage["NodeId"], - upload.info.Storage["NodeParentId"], - upload.info.Storage["NodeName"], - fi.Size(), - "", - nil, - upload.fs.lu, - ) - newNode.SpaceRoot = node.New(spaceID, spaceID, "", "", 0, "", nil, upload.fs.lu) - - // check lock - if upload.info.MetaData["lockid"] != "" { - ctx = ctxpkg.ContextSetLockID(ctx, upload.info.MetaData["lockid"]) - } - if err := newNode.CheckLock(ctx); err != nil { - return err - } - - overwrite := newNode.ID != "" - var oldSize int64 - if overwrite { - // read size from existing node - old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, newNode.ID, false) - oldSize = old.Blobsize - } else { - // create new fileid - newNode.ID = uuid.New().String() - upload.info.Storage["NodeId"] = newNode.ID - } - - if _, err = node.CheckQuota(newNode.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil { - return err - } - - targetPath := newNode.InternalPath() - sublog := appctx.GetLogger(upload.ctx). - With(). - Interface("info", upload.info). - Str("spaceid", spaceID). - Str("nodeid", newNode.ID). - Str("binPath", upload.binPath). - Str("targetPath", targetPath). - Logger() - - // calculate the checksum of the written bytes - // they will all be written to the metadata later, so we cannot omit any of them - // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present - // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... - sha1h := sha1.New() - md5h := md5.New() - adler32h := adler32.New() - { - f, err := os.Open(upload.binPath) - if err != nil { - sublog.Err(err).Msg("Decomposedfs: could not open file for checksumming") - // we can continue if no oc checksum header is set - } - defer f.Close() - - r1 := io.TeeReader(f, sha1h) - r2 := io.TeeReader(r1, md5h) - - if _, err := io.Copy(adler32h, r2); err != nil { - sublog.Err(err).Msg("Decomposedfs: could not copy bytes for checksumming") - } - } - // compare if they match the sent checksum - // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads - if upload.info.MetaData["checksum"] != "" { - parts := strings.SplitN(upload.info.MetaData["checksum"], " ", 2) - if len(parts) != 2 { - return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") - } - switch parts[0] { - case "sha1": - err = upload.checkHash(parts[1], sha1h) - case "md5": - err = upload.checkHash(parts[1], md5h) - case "adler32": - err = upload.checkHash(parts[1], adler32h) - default: - err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) - } - if err != nil { - return err - } - } - newNode.BlobID = upload.info.ID // This can be changed to a content hash in the future when reference counting for the blobs was added - - // defer writing the checksums until the node is in place - - // upload steps - // check if match header to fail early - - if fi, err = os.Stat(targetPath); err == nil { - // When the if-match header was set we need to check if the - // etag still matches before finishing the upload. - if ifMatch, ok := upload.info.MetaData["if-match"]; ok { - var targetEtag string - targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) - if err != nil { - return errtypes.InternalError(err.Error()) - } - if ifMatch != targetEtag { - return errtypes.Aborted("etag mismatch") - } - } - } else { - // create dir to node - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - sublog.Err(err).Msg("could not create node dir") - return errtypes.InternalError("could not create node dir") - } - } - - // copy blob - - file, err := os.Open(upload.binPath) - if err != nil { - return err - } - defer file.Close() - err = upload.fs.tp.WriteBlob(newNode, file) - if err != nil { - return errors.Wrap(err, "failed to upload file to blobstore") - } - - // prepare discarding the blob if something changed while writing it - discardBlob := func() { - if err := upload.fs.tp.DeleteBlob(newNode); err != nil { - sublog.Err(err).Str("blobid", newNode.BlobID).Msg("Decomposedfs: failed to discard blob in blobstore") - } - } - - // lock metadata node - lock, err := filelocks.AcquireWriteLock(targetPath) - if err != nil { - discardBlob() - return errtypes.InternalError(err.Error()) - } - releaseLock := func() { - // ReleaseLock returns nil if already unlocked - if err := filelocks.ReleaseLock(lock); err != nil { - sublog.Err(err).Msg("Decomposedfs:could not unlock node") - } - } - defer releaseLock() - - // check if match header again as safeguard - var oldMtime time.Time - versionsPath := "" - if fi, err = os.Stat(targetPath); err == nil { - // When the if-match header was set we need to check if the - // etag still matches before finishing the upload. - if ifMatch, ok := upload.info.MetaData["if-match"]; ok { - var targetEtag string - targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) - if err != nil { - discardBlob() - return errtypes.InternalError(err.Error()) - } - if ifMatch != targetEtag { - discardBlob() - return errtypes.Aborted("etag mismatch") - } - } - - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath = upload.fs.lu.InternalPath(spaceID, newNode.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - - // remember mtime of existing file so we can apply it to the version - oldMtime = fi.ModTime() - } - - // read metadata - - // attributes that will change - attrs := map[string]string{ - xattrs.BlobIDAttr: newNode.BlobID, - xattrs.BlobsizeAttr: strconv.FormatInt(newNode.Blobsize, 10), - - // update checksums - xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), - xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), - xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), - } - - // create version node with current metadata - - var newMtime time.Time - // if file already exists - if versionsPath != "" { - // touch version node - file, err := os.Create(versionsPath) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") - return errtypes.InternalError("could not create version node") - } - defer file.Close() - - fi, err := file.Stat() - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") - return errtypes.InternalError("could not stat version node") - } - newMtime = fi.ModTime() - - // copy blob metadata to version node - err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { - return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || - attributeName == xattrs.BlobIDAttr || - attributeName == xattrs.BlobsizeAttr - }, lock) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") - return errtypes.InternalError("failed to copy blob xattrs to version node") - } - - // keep mtime from previous version - if err := os.Chtimes(versionsPath, oldMtime, oldMtime); err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to change mtime of version node") - return errtypes.InternalError("failed to change mtime of version node") - } - - // we MUST bypass any cache here as we have to calculate the size diff atomically - oldSize, err = node.ReadBlobSizeAttr(targetPath) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to read old blobsize") - return errtypes.InternalError("failed to read old blobsize") - } - } else { - // touch metadata node - file, err := os.Create(targetPath) - if err != nil { - discardBlob() - sublog.Err(err).Msg("could not create node") - return errtypes.InternalError("could not create node") - } - file.Close() - - // basic node metadata - attrs[xattrs.ParentidAttr] = newNode.ParentID - attrs[xattrs.NameAttr] = newNode.Name - oldSize = 0 - } - - // update node metadata with new blobid etc - err = newNode.SetXattrsWithLock(attrs, lock) - if err != nil { - discardBlob() - return errors.Wrap(err, "Decomposedfs: could not write metadata") - } - - // update mtime - switch { - case upload.info.MetaData["mtime"] != "": - if err := newNode.SetMtimeString(upload.info.MetaData["mtime"]); err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not apply mtime from metadata") - return err - } - case newMtime != time.Time{}: - // we are creating a version - if err := newNode.SetMtime(newMtime); err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not change mtime of node") - return err - } - } - - // remember size diff - // old 10, new 5 (upload a smaller file) -> 5-10 = -5 - // old 5, new 10 (upload a bigger file) -> 10-5 = +5 - sizeDiff := newNode.Blobsize - oldSize - - // unlock metadata - err = filelocks.ReleaseLock(lock) - if err != nil { - return errtypes.InternalError(err.Error()) - } - - // link child name to parent if it is new - childNameLink := filepath.Join(newNode.ParentInternalPath(), newNode.Name) - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(newNode.ID, 4, 2)) - var link string - link, err = os.Readlink(childNameLink) - if err == nil && link != relativeNodePath { - sublog.Err(err). - Interface("node", newNode). - Str("childNameLink", childNameLink). - Str("link", link). - Msg("Decomposedfs: child name link has wrong target id, repairing") - - if err = os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") - } - } - if errors.Is(err, iofs.ErrNotExist) || link != relativeNodePath { - if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not symlink child entry") - } - } - - // fill metadata with current mtime - if fi, err = os.Stat(targetPath); err == nil { - upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) - upload.info.MetaData["etag"], _ = node.CalculateEtag(newNode.ID, fi.ModTime()) - } - - newNode.Exists = true - - // propagate size diff and new etag - // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant - sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") - return upload.fs.tp.Propagate(upload.ctx, newNode, sizeDiff) -} - -func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { - if expected != hex.EncodeToString(h.Sum(nil)) { - upload.discardChunk() - return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.info.MetaData["checksum"], h.Sum(nil))) - } - return nil -} - -func (upload *fileUpload) discardChunk() { - if err := os.Remove(upload.binPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("binPath", upload.binPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk") - return - } - } - if err := os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("infoPath", upload.infoPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk info") - return - } - } -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - // AsTerminatableUpload returns a TerminatableUpload -func (fs *Decomposedfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) -} - -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) error { - if err := os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - return err - } - } - if err := os.Remove(upload.binPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - return err - } - } - return nil +// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// the storage needs to implement AsTerminatableUpload +func (fs *Decomposedfs) AsTerminatableUpload(up tusd.Upload) tusd.TerminatableUpload { + return up.(*upload.Upload) } -// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation -// - the storage needs to implement AsLengthDeclarableUpload -// - the upload needs to implement DeclareLength - // AsLengthDeclarableUpload returns a LengthDeclarableUpload -func (fs *Decomposedfs) AsLengthDeclarableUpload(upload tusd.Upload) tusd.LengthDeclarableUpload { - return upload.(*fileUpload) -} - -// DeclareLength updates the upload length information -func (upload *fileUpload) DeclareLength(ctx context.Context, length int64) error { - upload.info.Size = length - upload.info.SizeIsDeferred = false - return upload.writeInfo() +// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation +// the storage needs to implement AsLengthDeclarableUpload +func (fs *Decomposedfs) AsLengthDeclarableUpload(up tusd.Upload) tusd.LengthDeclarableUpload { + return up.(*upload.Upload) } -// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation -// - the storage needs to implement AsConcatableUpload -// - the upload needs to implement ConcatUploads - // AsConcatableUpload returns a ConcatableUpload -func (fs *Decomposedfs) AsConcatableUpload(upload tusd.Upload) tusd.ConcatableUpload { - return upload.(*fileUpload) +// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation +// the storage needs to implement AsConcatableUpload +func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload { + return up.(*upload.Upload) } -// ConcatUploads concatenates multiple uploads -func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) +func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { + infos := []tusd.FileInfo{} + infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { - return err + return nil, err } - defer file.Close() - - for _, partialUpload := range uploads { - fileUpload := partialUpload.(*fileUpload) - src, err := os.Open(fileUpload.binPath) + idRegexp := regexp.MustCompile(".*/([^/]+).info") + for _, info := range infoFiles { + match := idRegexp.FindStringSubmatch(info) + if match == nil || len(match) < 2 { + continue + } + up, err := fs.GetUpload(ctx, match[1]) if err != nil { - return err + return nil, err } - defer src.Close() - - if _, err := io.Copy(file, src); err != nil { - return err + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err } - } - return + infos = append(infos, info) + } + return infos, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go new file mode 100644 index 0000000000..a7ae23db2d --- /dev/null +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -0,0 +1,415 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package upload + +import ( + "context" + "encoding/json" + iofs "io/fs" + "os" + "path/filepath" + "strconv" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/logger" + "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/google/uuid" + "github.com/pkg/errors" + tusd "github.com/tus/tusd/pkg/handler" +) + +var defaultFilePerm = os.FileMode(0664) + +// PermissionsChecker defines an interface for checking permissions on a Node +type PermissionsChecker interface { + AssemblePermissions(ctx context.Context, n *node.Node) (ap provider.ResourcePermissions, err error) +} + +// New returns a new processing instance +func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (upload *Upload, err error) { + + log := appctx.GetLogger(ctx) + log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") + + if info.MetaData["filename"] == "" { + return nil, errors.New("Decomposedfs: missing filename in metadata") + } + if info.MetaData["dir"] == "" { + return nil, errors.New("Decomposedfs: missing dir in metadata") + } + + n, err := lu.NodeFromSpaceID(ctx, &provider.ResourceId{ + SpaceId: info.Storage["SpaceRoot"], + OpaqueId: info.Storage["SpaceRoot"], + }) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error getting space root node") + } + + n, err = lookupNode(ctx, n, filepath.Join(info.MetaData["dir"], info.MetaData["filename"]), lu) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error walking path") + } + + log.Debug().Interface("info", info).Interface("node", n).Msg("Decomposedfs: resolved filename") + + // the parent owner will become the new owner + parent, perr := n.Parent() + if perr != nil { + return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) + } + + // check permissions + var ( + checkNode *node.Node + path string + ) + if n.Exists { + // check permissions of file to be overwritten + checkNode = n + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }}) + } else { + // check permissions of parent + checkNode = parent + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }, Path: n.Name}) + } + rp, err := p.AssemblePermissions(ctx, checkNode) + switch { + case err != nil: + return nil, errtypes.InternalError(err.Error()) + case !rp.InitiateFileUpload: + return nil, errtypes.PermissionDenied(path) + } + + // are we trying to overwriting a folder with a file? + if n.Exists && n.IsDir() { + return nil, errtypes.PreconditionFailed("resource is not a file") + } + + // check lock + if info.MetaData["lockid"] != "" { + ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData["lockid"]) + } + if err := n.CheckLock(ctx); err != nil { + return nil, err + } + + info.ID = uuid.New().String() + + binPath := filepath.Join(fsRoot, "uploads", info.ID) + usr := ctxpkg.ContextMustGetUser(ctx) + + var ( + spaceRoot string + ok bool + ) + if info.Storage != nil { + if spaceRoot, ok = info.Storage["SpaceRoot"]; !ok { + spaceRoot = n.SpaceRoot.ID + } + } else { + spaceRoot = n.SpaceRoot.ID + } + + info.Storage = map[string]string{ + "Type": "OCISStore", + "BinPath": binPath, + + "NodeId": n.ID, + "NodeParentId": n.ParentID, + "NodeName": n.Name, + "SpaceRoot": spaceRoot, + "SpaceOwnerOrManager": info.Storage["SpaceOwnerOrManager"], + + "Idp": usr.Id.Idp, + "UserId": usr.Id.OpaqueId, + "UserType": utils.UserTypeToString(usr.Id.Type), + "UserName": usr.Username, + + "LogLevel": log.GetLevel().String(), + } + // Create binary file in the upload folder with no content + log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info") + file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) + if err != nil { + return nil, err + } + defer file.Close() + + u := buildUpload(ctx, info, binPath, filepath.Join(fsRoot, "uploads", info.ID+".info"), lu, tp, pub, async, tknopts) + + // writeInfo creates the file by itself if necessary + err = u.writeInfo() + if err != nil { + return nil, err + } + + return u, nil +} + +// Get returns the Upload for the given upload id +func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (*Upload, error) { + infoPath := filepath.Join(fsRoot, "uploads", id+".info") + + info := tusd.FileInfo{} + data, err := os.ReadFile(infoPath) + if err != nil { + if errors.Is(err, iofs.ErrNotExist) { + // Interpret os.ErrNotExist as 404 Not Found + err = tusd.ErrNotFound + } + return nil, err + } + if err := json.Unmarshal(data, &info); err != nil { + return nil, err + } + + stat, err := os.Stat(info.Storage["BinPath"]) + if err != nil { + return nil, err + } + + info.Offset = stat.Size() + + u := &userpb.User{ + Id: &userpb.UserId{ + Idp: info.Storage["Idp"], + OpaqueId: info.Storage["UserId"], + Type: utils.UserTypeMap(info.Storage["UserType"]), + }, + Username: info.Storage["UserName"], + } + + ctx = ctxpkg.ContextSetUser(ctx, u) + // TODO configure the logger the same way ... store and add traceid in file info + + var opts []logger.Option + opts = append(opts, logger.WithLevel(info.Storage["LogLevel"])) + opts = append(opts, logger.WithWriter(os.Stderr, logger.ConsoleMode)) + l := logger.New(opts...) + + sub := l.With().Int("pid", os.Getpid()).Logger() + + ctx = appctx.WithLogger(ctx, &sub) + + up := buildUpload(ctx, info, info.Storage["BinPath"], infoPath, lu, tp, pub, async, tknopts) + up.versionsPath = info.MetaData["versionsPath"] + up.sizeDiff, _ = strconv.ParseInt(info.MetaData["sizeDiff"], 10, 64) + return up, nil +} + +// CreateNodeForUpload will create the target node for the Upload +func CreateNodeForUpload(upload *Upload) (*node.Node, error) { + fi, err := os.Stat(upload.binPath) + if err != nil { + return nil, err + } + + fsize := fi.Size() + spaceID := upload.Info.Storage["SpaceRoot"] + n := node.New( + spaceID, + upload.Info.Storage["NodeId"], + upload.Info.Storage["NodeParentId"], + upload.Info.Storage["NodeName"], + fsize, + upload.Info.ID, + nil, + upload.lu, + ) + n.SpaceRoot, err = node.ReadNode(upload.Ctx, upload.lu, spaceID, spaceID, false) + if err != nil { + return nil, err + } + + // check lock + if err := n.CheckLock(upload.Ctx); err != nil { + return nil, err + } + + switch n.ID { + case "": + err = initNewNode(upload, n, uint64(fsize)) + default: + err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + } + + if err != nil { + return nil, err + } + + // create/update node info + if err := n.WriteAllNodeMetadata(); err != nil { + return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") + } + + // update nodeid for later + upload.Info.Storage["NodeId"] = n.ID + if err := upload.writeInfo(); err != nil { + return nil, err + } + + return n, n.MarkProcessing() +} + +func initNewNode(upload *Upload, n *node.Node, fsize uint64) error { + n.ID = uuid.New().String() + + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + return err + } + + if _, err := os.Create(n.InternalPath()); err != nil { + return err + } + + if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { + return err + } + + // link child name to parent if it is new + childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) + var link string + link, err := os.Readlink(childNameLink) + if err == nil && link != "../"+n.ID { + if err := os.Remove(childNameLink); err != nil { + return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + } + } + if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + return errors.Wrap(err, "Decomposedfs: could not symlink child entry") + } + } + + // on a new file the sizeDiff is the fileSize + upload.sizeDiff = int64(fsize) + upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) + return nil +} + +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) error { + old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) + if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { + return err + } + + vfi, err := os.Stat(old.InternalPath()) + if err != nil { + return err + } + + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := upload.Info.MetaData["if-match"]; ok { + targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime()) + switch { + case err != nil: + return errtypes.InternalError(err.Error()) + case ifMatch != targetEtag: + return errtypes.Aborted("etag mismatch") + } + } + + upload.versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+vfi.ModTime().UTC().Format(time.RFC3339Nano)) + upload.sizeDiff = int64(fsize) - old.Blobsize + upload.Info.MetaData["versionsPath"] = upload.versionsPath + upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) + + targetPath := n.InternalPath() + + lock, err := filelocks.AcquireWriteLock(targetPath) + if err != nil { + // we cannot acquire a lock - we error for safety + return err + } + defer filelocks.ReleaseLock(lock) + + // This move drops all metadata!!! We copy it below with CopyMetadata + if err = os.Rename(targetPath, upload.versionsPath); err != nil { + return err + } + + if _, err := os.Create(targetPath); err != nil { + return err + } + + // copy grant and arbitrary metadata + // NOTE: now restoring an older revision might bring back a grant that was removed! + if err := xattrs.CopyMetadata(upload.versionsPath, targetPath, func(attributeName string) bool { + return true + // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties + /* + [> + return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants + strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata + strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites + strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file + */ + }); err != nil { + return err + } + + return nil +} + +// lookupNode looks up nodes by path. +// This method can also handle lookups for paths which contain chunking information. +func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *lookup.Lookup) (*node.Node, error) { + p := path + isChunked := chunking.IsChunked(path) + if isChunked { + chunkInfo, err := chunking.GetChunkBLOBInfo(path) + if err != nil { + return nil, err + } + p = chunkInfo.Path + } + + n, err := lu.WalkPath(ctx, spaceRoot, p, true, func(ctx context.Context, n *node.Node) error { return nil }) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error walking path") + } + + if isChunked { + n.Name = filepath.Base(path) + } + return n, nil +} diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go new file mode 100644 index 0000000000..7164396dfd --- /dev/null +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -0,0 +1,471 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package upload + +import ( + "context" + "crypto/md5" + "crypto/sha1" + "encoding/hex" + "encoding/json" + "fmt" + "hash" + "hash/adler32" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/golang-jwt/jwt" + "github.com/pkg/errors" + "github.com/rs/zerolog" + tusd "github.com/tus/tusd/pkg/handler" +) + +// Tree is used to manage a tree hierarchy +type Tree interface { + Setup() error + + GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error) + ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error) + // CreateHome(owner *userpb.UserId) (n *node.Node, err error) + CreateDir(ctx context.Context, node *node.Node) (err error) + // CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error + Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) + Delete(ctx context.Context, node *node.Node) (err error) + RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) + PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) + + WriteBlob(node *node.Node, reader io.Reader) error + ReadBlob(node *node.Node) (io.ReadCloser, error) + DeleteBlob(node *node.Node) error + + Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) +} + +// Upload processes the upload +// it implements tus tusd.Upload interface https://tus.io/protocols/resumable-upload.html#core-protocol +// it also implements its termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// it also implements its creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation +// it also implements its concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation +type Upload struct { + // we use a struct field on the upload as tus pkg will give us an empty context.Background + Ctx context.Context + // info stores the current information about the upload + Info tusd.FileInfo + // node for easy access + Node *node.Node + // infoPath is the path to the .info file + infoPath string + // binPath is the path to the binary file (which has no extension) + binPath string + // lu and tp needed for file operations + lu *lookup.Lookup + tp Tree + // versionsPath will be empty if there was no file before + versionsPath string + // sizeDiff size difference between new and old file version + sizeDiff int64 + // and a logger as well + log zerolog.Logger + // publisher used to publish events + pub events.Publisher + // async determines if uploads shoud be done asynchronously + async bool + // tknopts hold token signing information + tknopts options.TokenOptions +} + +func buildUpload(ctx context.Context, info tusd.FileInfo, binPath string, infoPath string, lu *lookup.Lookup, tp Tree, pub events.Publisher, async bool, tknopts options.TokenOptions) *Upload { + return &Upload{ + Info: info, + binPath: binPath, + infoPath: infoPath, + lu: lu, + tp: tp, + Ctx: ctx, + pub: pub, + async: async, + tknopts: tknopts, + log: appctx.GetLogger(ctx). + With(). + Interface("info", info). + Str("binPath", binPath). + Logger(), + } +} + +// Cleanup cleans the upload +func Cleanup(upload *Upload, failure bool, keepUpload bool) { + upload.cleanup(failure, !keepUpload, !keepUpload) + + // unset processing status + if upload.Node != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) + if err := upload.Node.UnmarkProcessing(); err != nil { + upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("unmarking processing failed") + } + } +} + +// WriteChunk writes the stream from the reader to the given offset of the upload +func (upload *Upload) WriteChunk(_ context.Context, offset int64, src io.Reader) (int64, error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return 0, err + } + defer file.Close() + + // calculate cheksum here? needed for the TUS checksum extension. https://tus.io/protocols/resumable-upload.html#checksum + // TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ... + // It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used + // but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ... + n, err := io.Copy(file, src) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for the ocis driver it's not important whether the stream has ended + // on purpose or accidentally. + if err != nil && err != io.ErrUnexpectedEOF { + return n, err + } + + upload.Info.Offset += n + return n, upload.writeInfo() +} + +// GetInfo returns the FileInfo +func (upload *Upload) GetInfo(_ context.Context) (tusd.FileInfo, error) { + return upload.Info, nil +} + +// GetReader returns an io.Reader for the upload +func (upload *Upload) GetReader(_ context.Context) (io.Reader, error) { + return os.Open(upload.binPath) +} + +// FinishUpload finishes an upload and moves the file to the internal destination +func (upload *Upload) FinishUpload(_ context.Context) error { + // set lockID to context + if upload.Info.MetaData["lockid"] != "" { + upload.Ctx = ctxpkg.ContextSetLockID(upload.Ctx, upload.Info.MetaData["lockid"]) + } + + log := appctx.GetLogger(upload.Ctx) + + // calculate the checksum of the written bytes + // they will all be written to the metadata later, so we cannot omit any of them + // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present + // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... + sha1h := sha1.New() + md5h := md5.New() + adler32h := adler32.New() + { + f, err := os.Open(upload.binPath) + if err != nil { + // we can continue if no oc checksum header is set + log.Info().Err(err).Str("binPath", upload.binPath).Msg("error opening binPath") + } + defer f.Close() + + r1 := io.TeeReader(f, sha1h) + r2 := io.TeeReader(r1, md5h) + + _, err = io.Copy(adler32h, r2) + if err != nil { + log.Info().Err(err).Msg("error copying checksums") + } + } + + // compare if they match the sent checksum + // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads + if upload.Info.MetaData["checksum"] != "" { + var err error + parts := strings.SplitN(upload.Info.MetaData["checksum"], " ", 2) + if len(parts) != 2 { + return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") + } + switch parts[0] { + case "sha1": + err = upload.checkHash(parts[1], sha1h) + case "md5": + err = upload.checkHash(parts[1], md5h) + case "adler32": + err = upload.checkHash(parts[1], adler32h) + default: + err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) + } + if err != nil { + Cleanup(upload, true, false) + return err + } + } + + n, err := CreateNodeForUpload(upload) + if err != nil { + Cleanup(upload, true, false) + return err + } + + upload.Node = n + + // now try write all checksums + tryWritingChecksum(log, upload.Node, "sha1", sha1h) + tryWritingChecksum(log, upload.Node, "md5", md5h) + tryWritingChecksum(log, upload.Node, "adler32", adler32h) + + if upload.pub != nil { + u, _ := ctxpkg.ContextGetUser(upload.Ctx) + s, err := upload.URL(upload.Ctx) + if err != nil { + return err + } + + if err := events.Publish(upload.pub, events.BytesReceived{ + UploadID: upload.Info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(upload.Ctx), + ExecutingUser: u, + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: upload.Info.Storage["NodeName"], + Filesize: uint64(upload.Info.Size), + }); err != nil { + return err + } + } + + if upload.async { + // handle postprocessing asynchronously but inform there is something in progress + return upload.tp.Propagate(upload.Ctx, n, upload.sizeDiff) + } + + err = upload.Finalize() + Cleanup(upload, err != nil, false) + if err != nil { + return err + } + + return upload.tp.Propagate(upload.Ctx, n, upload.sizeDiff) +} + +// Terminate terminates the upload +func (upload *Upload) Terminate(_ context.Context) error { + upload.cleanup(true, true, true) + return nil +} + +// DeclareLength updates the upload length information +func (upload *Upload) DeclareLength(_ context.Context, length int64) error { + upload.Info.Size = length + upload.Info.SizeIsDeferred = false + return upload.writeInfo() +} + +// ConcatUploads concatenates multiple uploads +func (upload *Upload) ConcatUploads(_ context.Context, uploads []tusd.Upload) (err error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return err + } + defer file.Close() + + for _, partialUpload := range uploads { + fileUpload := partialUpload.(*Upload) + + src, err := os.Open(fileUpload.binPath) + if err != nil { + return err + } + defer src.Close() + + if _, err := io.Copy(file, src); err != nil { + return err + } + } + + return +} + +// writeInfo updates the entire information. Everything will be overwritten. +func (upload *Upload) writeInfo() error { + data, err := json.Marshal(upload.Info) + if err != nil { + return err + } + return os.WriteFile(upload.infoPath, data, defaultFilePerm) +} + +// Finalize finalizes the upload (eg moves the file to the internal destination) +func (upload *Upload) Finalize() (err error) { + n := upload.Node + if n == nil { + var err error + n, err = node.ReadNode(upload.Ctx, upload.lu, upload.Info.Storage["SpaceRoot"], upload.Info.Storage["NodeId"], false) + if err != nil { + return err + } + upload.Node = n + } + + // upload the data to the blobstore + file, err := os.Open(upload.binPath) + if err != nil { + return err + } + defer file.Close() + + if err := upload.tp.WriteBlob(n, file); err != nil { + return errors.Wrap(err, "failed to upload file to blostore") + } + + if upload.async { + return nil + } + + sublog := appctx.GetLogger(upload.Ctx). + With(). + Interface("info", upload.Info). + Str("binPath", upload.binPath). + Str("targetPath", n.InternalPath()). + Str("spaceID", upload.Info.Storage["SpaceRoot"]). + Logger() + + // tests sometimes set the mtime + if upload.Info.MetaData["mtime"] != "" { + if err := n.SetMtimeString(upload.Info.MetaData["mtime"]); err != nil { + sublog.Err(err).Interface("info", upload.Info).Msg("Decomposedfs: could not set mtime metadata") + return err + } + } + + // some clients need the etag in the upload metadata + fi, err := os.Stat(n.InternalPath()) + if err != nil { + sublog.Err(err).Interface("info", upload.Info).Str("path", n.InternalPath()).Msg("Decomposedfs: could not stat file") + return err + } + upload.Info.MetaData["etag"], _ = node.CalculateEtag(n.ID, fi.ModTime()) + return nil +} + +func (upload *Upload) checkHash(expected string, h hash.Hash) error { + if expected != hex.EncodeToString(h.Sum(nil)) { + return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.Info.MetaData["checksum"], h.Sum(nil))) + } + return nil +} + +// cleanup cleans up after the upload is finished +func (upload *Upload) cleanup(cleanNode, cleanBin, cleanInfo bool) { + if cleanNode && upload.Node != nil { + switch p := upload.versionsPath; p { + case "": + // remove node + if err := utils.RemoveItem(upload.Node.InternalPath()); err != nil { + upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("removing node failed") + } + + // no old version was present - remove child entry + src := filepath.Join(upload.Node.ParentInternalPath(), upload.Node.Name) + if err := os.Remove(src); err != nil { + upload.log.Info().Str("path", upload.Node.ParentInternalPath()).Err(err).Msg("removing node from parent failed") + } + default: + // restore old version + if err := os.Rename(p, upload.Node.InternalPath()); err != nil { + upload.log.Info().Str("versionpath", p).Str("nodepath", upload.Node.InternalPath()).Err(err).Msg("renaming version node failed") + } + + } + } + + if cleanBin { + if err := os.Remove(upload.binPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + upload.log.Error().Str("path", upload.binPath).Err(err).Msg("removing upload failed") + } + } + + if cleanInfo { + if err := os.Remove(upload.infoPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + upload.log.Error().Str("path", upload.infoPath).Err(err).Msg("removing upload info failed") + } + } +} + +// URL returns a url to download an upload +func (upload *Upload) URL(_ context.Context) (string, error) { + type transferClaims struct { + jwt.StandardClaims + Target string `json:"target"` + } + + u := joinurl(upload.tknopts.DownloadEndpoint, "tus/", upload.Info.ID) + ttl := time.Duration(upload.tknopts.TransferExpires) * time.Second + claims := transferClaims{ + StandardClaims: jwt.StandardClaims{ + ExpiresAt: time.Now().Add(ttl).Unix(), + Audience: "reva", + IssuedAt: time.Now().Unix(), + }, + Target: u, + } + + t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), claims) + + tkn, err := t.SignedString([]byte(upload.tknopts.TransferSharedSecret)) + if err != nil { + return "", errors.Wrapf(err, "error signing token with claims %+v", claims) + } + + return joinurl(upload.tknopts.DataGatewayEndpoint, tkn), nil +} + +// replace with url.JoinPath after switching to go1.19 +func joinurl(paths ...string) string { + var s strings.Builder + l := len(paths) + for i, p := range paths { + s.WriteString(p) + if !strings.HasSuffix(p, "/") && i != l-1 { + s.WriteString("/") + } + } + + return s.String() +} + +func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { + if err := n.SetChecksum(algo, h); err != nil { + log.Err(err). + Str("csType", algo). + Bytes("hash", h.Sum(nil)). + Msg("Decomposedfs: could not write checksum") + // this is not critical, the bytes are there so we will continue + } +} diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 7f301fbe83..b00d9a4ab7 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -210,7 +210,6 @@ var _ = Describe("File uploads", func() { Expect(uploadIds["tus"]).ToNot(BeEmpty()) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) - Expect(err).ToNot(HaveOccurred()) Expect(len(resources)).To(Equal(0)) }) @@ -226,7 +225,6 @@ var _ = Describe("File uploads", func() { Expect(uploadIds["tus"]).ToNot(BeEmpty()) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) - Expect(err).ToNot(HaveOccurred()) Expect(len(resources)).To(Equal(0)) }) diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 8bd24bbb98..496071b37a 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -50,6 +50,13 @@ const ( BlobIDAttr string = OcisPrefix + "blobid" BlobsizeAttr string = OcisPrefix + "blobsize" + // statusPrefix is the prefix for the node status + StatusPrefix string = OcisPrefix + "nodestatus" + + // scanPrefix is the prefix for the virus scan status and date + ScanStatusPrefix string = OcisPrefix + "scanstatus" + ScanDatePrefix string = OcisPrefix + "scandate" + // grantPrefix is the prefix for sharing related extended attributes GrantPrefix string = OcisPrefix + "grant." GrantUserAcePrefix string = OcisPrefix + "grant." + UserAcePrefix @@ -122,8 +129,8 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // CopyMetadata copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. For the target, an exclusive -// write lock is acquired. +// For the source file, a shared lock is acquired. +// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { var readLock *flock.Flock @@ -147,8 +154,8 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e // CopyMetadataWithSourceLock copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. For the target, an exclusive -// write lock is acquired. +// For the source file, a shared lock is acquired. +// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { switch { case readLock == nil: @@ -159,23 +166,6 @@ func CopyMetadataWithSourceLock(src, target string, filter func(attributeName st return errors.New("not locked") } - var writeLock *flock.Flock - - // Acquire the write log on the target node - writeLock, err = filelocks.AcquireWriteLock(target) - - if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock target to write") - } - defer func() { - rerr := filelocks.ReleaseLock(writeLock) - - // if err is non nil we do not overwrite that - if err == nil { - err = rerr - } - }() - // both locks are established. Copy. var attrNameList []string if attrNameList, err = xattr.List(src); err != nil { From 3d4d59ba7bf0b103428151e100b8c14f70812f89 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 8 Dec 2022 10:55:55 +0100 Subject: [PATCH 02/11] checkout postprocessing events from experimental Signed-off-by: jkoberg --- pkg/events/postprocessing.go | 164 +++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 pkg/events/postprocessing.go diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go new file mode 100644 index 0000000000..d49ff9e920 --- /dev/null +++ b/pkg/events/postprocessing.go @@ -0,0 +1,164 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package events + +import ( + "encoding/json" + "time" + + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +type ( + // Postprocessingstep are the available postprocessingsteps + Postprocessingstep string + + // PostprocessingOutcome defines the result of the postprocessing + PostprocessingOutcome string +) + +var ( + // PPStepAntivirus is the step that scans for viruses + PPStepAntivirus Postprocessingstep = "virusscan" + // PPStepFTS is the step that indexes files for full text search + PPStepFTS Postprocessingstep = "fts" + // PPStepDelay is the step that processing. Useful for testing or user annoyment + PPStepDelay Postprocessingstep = "delay" + + // PPOutcomeDelete means that the file and the upload should be deleted + PPOutcomeDelete PostprocessingOutcome = "delete" + // PPOutcomeAbort means that the upload is cancelled but the bytes are being kept in the upload folder + PPOutcomeAbort PostprocessingOutcome = "abort" + // PPOutcomeContinue means that the upload is moved to its final destination (eventually being marked with pp results) + PPOutcomeContinue PostprocessingOutcome = "continue" +) + +// BytesReceived is emitted by the server when it received all bytes of an upload +type BytesReceived struct { + UploadID string + SpaceOwner *user.UserId + ExecutingUser *user.User + ResourceID *provider.ResourceId + Filename string + Filesize uint64 + URL string +} + +// Unmarshal to fulfill umarshaller interface +func (BytesReceived) Unmarshal(v []byte) (interface{}, error) { + e := BytesReceived{} + err := json.Unmarshal(v, &e) + return e, err +} + +// VirusscanFinished is emitted by the server when it has completed an antivirus scan +type VirusscanFinished struct { + Infected bool + Outcome PostprocessingOutcome + UploadID string + Filename string + ExecutingUser *user.User + Description string + Scandate time.Time + ResourceID *provider.ResourceId + ErrorMsg string // empty when no error +} + +// Unmarshal to fulfill umarshaller interface +func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) { + e := VirusscanFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// StartPostprocessingStep can be issued by the server to start a postprocessing step +type StartPostprocessingStep struct { + UploadID string + URL string + ExecutingUser *user.User + Filename string + Filesize uint64 + Token string // for file retrieval in after upload case + ResourceID *provider.ResourceId // for file retrieval in after upload case + RevaToken string // for file retrieval in after upload case + + StepToStart Postprocessingstep +} + +// Unmarshal to fulfill umarshaller interface +func (StartPostprocessingStep) Unmarshal(v []byte) (interface{}, error) { + e := StartPostprocessingStep{} + err := json.Unmarshal(v, &e) + return e, err +} + +// PostprocessingStepFinished can be issued by the server when a postprocessing step is finished +type PostprocessingStepFinished struct { + UploadID string + ExecutingUser *user.User + Filename string + + FinishedStep Postprocessingstep // name of the step + Result interface{} // result information + Error error // possible error of the step + Outcome PostprocessingOutcome // some services may cause postprocessing to stop +} + +// Unmarshal to fulfill umarshaller interface +func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) { + e := PostprocessingStepFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// PostprocessingFinished is emitted by *some* service which can decide that +type PostprocessingFinished struct { + UploadID string + Filename string + SpaceOwner *user.UserId + ExecutingUser *user.User + Result map[Postprocessingstep]interface{} // it is a map[step]Event + Outcome PostprocessingOutcome +} + +// Unmarshal to fulfill umarshaller interface +func (PostprocessingFinished) Unmarshal(v []byte) (interface{}, error) { + e := PostprocessingFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// UploadReady is emitted by the storage provider when postprocessing is finished +type UploadReady struct { + UploadID string + Filename string + SpaceOwner *user.UserId + ExecutingUser *user.User + FileRef *provider.Reference + Failed bool + // add reference here? We could use it to inform client pp is finished +} + +// Unmarshal to fulfill umarshaller interface +func (UploadReady) Unmarshal(v []byte) (interface{}, error) { + e := UploadReady{} + err := json.Unmarshal(v, &e) + return e, err +} From 0b93073558602df81fb627f2dc1614ce507bc713 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 8 Dec 2022 11:41:50 +0100 Subject: [PATCH 03/11] bring 425 status back Signed-off-by: jkoberg --- .../http/services/appprovider/appprovider.go | 11 +++++++++++ internal/http/services/appprovider/errors.go | 2 ++ .../owncloud/ocdav/propfind/propfind.go | 8 ++++++++ internal/http/services/owncloud/ocdav/put.go | 18 ++++++++++++++++++ pkg/storage/utils/decomposedfs/decomposedfs.go | 14 +++++++++----- 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/internal/http/services/appprovider/appprovider.go b/internal/http/services/appprovider/appprovider.go index 4a88210bd0..934166c75f 100644 --- a/internal/http/services/appprovider/appprovider.go +++ b/internal/http/services/appprovider/appprovider.go @@ -384,6 +384,17 @@ func (s *svc) handleOpen(openMode int) http.HandlerFunc { Path: ".", } + statRes, err := client.Stat(ctx, &provider.StatRequest{Ref: fileRef}) + if err != nil { + writeError(w, r, appErrorServerError, "Internal error accessing the file, please try again later", err) + return + } + + if status := utils.ReadPlainFromOpaque(statRes.GetInfo().GetOpaque(), "status"); status == "processing" { + writeError(w, r, appErrorTooEarly, "The requested file is not yet available, please try again later", nil) + return + } + viewMode, err := getViewModeFromPublicScope(ctx) if err != nil { writeError(w, r, appErrorPermissionDenied, "permission denied to open the application", err) diff --git a/internal/http/services/appprovider/errors.go b/internal/http/services/appprovider/errors.go index 311a4c6295..69d4ec5306 100644 --- a/internal/http/services/appprovider/errors.go +++ b/internal/http/services/appprovider/errors.go @@ -36,6 +36,7 @@ const ( appErrorUnimplemented appErrorCode = "NOT_IMPLEMENTED" appErrorInvalidParameter appErrorCode = "INVALID_PARAMETER" appErrorServerError appErrorCode = "SERVER_ERROR" + appErrorTooEarly appErrorCode = "TOO_EARLY" ) // appErrorCodeMapping stores the HTTP error code mapping for various APIErrorCodes @@ -47,6 +48,7 @@ var appErrorCodeMapping = map[appErrorCode]int{ appErrorInvalidParameter: http.StatusBadRequest, appErrorServerError: http.StatusInternalServerError, appErrorPermissionDenied: http.StatusForbidden, + appErrorTooEarly: http.StatusTooEarly, } // APIError encompasses the error type and message diff --git a/internal/http/services/owncloud/ocdav/propfind/propfind.go b/internal/http/services/owncloud/ocdav/propfind/propfind.go index 5c0ed7e190..0373e4c92f 100644 --- a/internal/http/services/owncloud/ocdav/propfind/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind/propfind.go @@ -1476,6 +1476,14 @@ func mdToPropResponse(ctx context.Context, pf *XML, md *provider.ResourceInfo, p } } + if status := utils.ReadPlainFromOpaque(md.Opaque, "status"); status == "processing" { + response.Propstat = append(response.Propstat, PropstatXML{ + Status: "HTTP/1.1 425 TOO EARLY", + Prop: propstatOK.Prop, + }) + return &response, nil + } + if len(propstatOK.Prop) > 0 { response.Propstat = append(response.Propstat, propstatOK) } diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 4a94ad0a16..007ec21b3a 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -137,6 +137,24 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ return } + if length == 0 { + tfRes, err := s.gwClient.TouchFile(ctx, &provider.TouchFileRequest{ + Ref: ref, + }) + if err != nil { + log.Error().Err(err).Msg("error sending grpc touch file request") + w.WriteHeader(http.StatusInternalServerError) + return + } + if tfRes.Status.Code != rpc.Code_CODE_OK { + log.Error().Interface("status", tfRes.Status).Msg("error touching file") + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusCreated) + return + } + opaqueMap := map[string]*typespb.OpaqueEntry{ net.HeaderUploadLength: { Decoder: "plain", diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 60d53fd945..a87a67d3d5 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -155,7 +155,10 @@ func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, p var ev events.Stream if o.Events.NatsAddress != "" { evtsCfg := o.Events - var rootCAPool *x509.CertPool + var ( + rootCAPool *x509.CertPool + tlsConf *tls.Config + ) if evtsCfg.TLSRootCACertificate != "" { rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) if err != nil { @@ -170,12 +173,13 @@ func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, p rootCAPool = x509.NewCertPool() rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) evtsCfg.TLSInsecure = false - } - tlsConf := &tls.Config{ - InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, + tlsConf = &tls.Config{ + InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } } + ev, err = server.NewNatsStream( natsjs.TLSConfig(tlsConf), natsjs.Address(evtsCfg.NatsAddress), From cbd58c67396155c64820e8a33af8963c42ffbe93 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 8 Dec 2022 14:32:43 +0100 Subject: [PATCH 04/11] changelog Signed-off-by: jkoberg --- changelog/unreleased/async-postprocessing.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/async-postprocessing.md diff --git a/changelog/unreleased/async-postprocessing.md b/changelog/unreleased/async-postprocessing.md new file mode 100644 index 0000000000..4092f38ad6 --- /dev/null +++ b/changelog/unreleased/async-postprocessing.md @@ -0,0 +1,5 @@ +Enhancement: Async Postprocessing + +Provides functionality for async postprocessing. This will allow the system to do the postprocessing (virusscan, copying of bytes to their final destination, ...) asynchronous to the users request. Major change when active. + +https://github.com/cs3org/reva/pull/3531 From 1ff975da762c0b43dbc9c22e8cfbc3ca5a9d1e04 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 8 Dec 2022 14:54:39 +0100 Subject: [PATCH 05/11] get rid of unneccessary lint issue Signed-off-by: jkoberg --- .golangci.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.golangci.yaml b/.golangci.yaml index 46904acca7..8fbf1c6276 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,4 +1,6 @@ issues: + exclude: + - "Error return value of .* is not checked" exclude-rules: - path: internal/http/interceptors/log/log.go text: "SA1019:" From 97ee2a47b2c9752b741969bb782157c474ae7e14 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 9 Dec 2022 11:28:35 +0100 Subject: [PATCH 06/11] rework locking when uploading Signed-off-by: jkoberg --- .../utils/decomposedfs/decomposedfs.go | 5 +- .../utils/decomposedfs/upload/processing.go | 99 +++++++++++-------- .../utils/decomposedfs/upload/upload.go | 25 ++--- pkg/storage/utils/filelocks/filelocks.go | 4 + 4 files changed, 74 insertions(+), 59 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index a87a67d3d5..d6c4dfa179 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -300,6 +300,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { ); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") } + + /* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED case events.VirusscanFinished: if ev.ErrorMsg != "" { // scan failed somehow @@ -385,7 +387,6 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } - default: // uploadid is not empty -> this is an async upload up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) @@ -410,7 +411,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { // remove cache entry in gateway fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - + */ default: log.Error().Interface("event", ev).Msg("Unknown event") } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index a7ae23db2d..2fe437b086 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,10 +21,12 @@ package upload import ( "context" "encoding/json" + "fmt" iofs "io/fs" "os" "path/filepath" "strconv" + "strings" "time" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -42,6 +44,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/gofrs/flock" "github.com/google/uuid" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" @@ -234,7 +237,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri } // CreateNodeForUpload will create the target node for the Upload -func CreateNodeForUpload(upload *Upload) (*node.Node, error) { +func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) { fi, err := os.Stat(upload.binPath) if err != nil { return nil, err @@ -262,19 +265,29 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) { return nil, err } + var lock *flock.Flock switch n.ID { case "": - err = initNewNode(upload, n, uint64(fsize)) + lock, err = initNewNode(upload, n, uint64(fsize)) default: - err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) } + defer filelocks.ReleaseLock(lock) if err != nil { return nil, err } - // create/update node info - if err := n.WriteAllNodeMetadata(); err != nil { + // overwrite technical information + initAttrs[xattrs.ParentidAttr] = n.ParentID + initAttrs[xattrs.NameAttr] = n.Name + initAttrs[xattrs.BlobIDAttr] = n.BlobID + initAttrs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) + initAttrs[xattrs.StatusPrefix] = node.ProcessingStatus + + // update node metadata with new blobid etc + err = n.SetXattrsWithLock(initAttrs, lock) + if err != nil { return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") } @@ -284,56 +297,61 @@ func CreateNodeForUpload(upload *Upload) (*node.Node, error) { return nil, err } - return n, n.MarkProcessing() + return n, nil } -func initNewNode(upload *Upload, n *node.Node, fsize uint64) error { +func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) { n.ID = uuid.New().String() // create folder structure (if needed) if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { - return err + return nil, err } if _, err := os.Create(n.InternalPath()); err != nil { - return err + return nil, err + } + + lock, err := filelocks.AcquireWriteLock(n.InternalPath()) + if err != nil { + // we cannot acquire a lock - we error for safety + return lock, err } if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { - return err + return lock, err } // link child name to parent if it is new childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) - var link string link, err := os.Readlink(childNameLink) if err == nil && link != "../"+n.ID { if err := os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not symlink child entry") + return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } // on a new file the sizeDiff is the fileSize upload.sizeDiff = int64(fsize) upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) - return nil + return lock, nil } -func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) error { +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) { old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { - return err + return nil, err } vfi, err := os.Stat(old.InternalPath()) if err != nil { - return err + return nil, err } // When the if-match header was set we need to check if the @@ -342,9 +360,9 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime()) switch { case err != nil: - return errtypes.InternalError(err.Error()) + return nil, errtypes.InternalError(err.Error()) case ifMatch != targetEtag: - return errtypes.Aborted("etag mismatch") + return nil, errtypes.Aborted("etag mismatch") } } @@ -358,36 +376,35 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint lock, err := filelocks.AcquireWriteLock(targetPath) if err != nil { // we cannot acquire a lock - we error for safety - return err + return nil, err + } + + // create version node + if _, err := os.Create(upload.versionsPath); err != nil { + return lock, err } - defer filelocks.ReleaseLock(lock) - // This move drops all metadata!!! We copy it below with CopyMetadata - if err = os.Rename(targetPath, upload.versionsPath); err != nil { - return err + // copy blob metadata to version node + if err := xattrs.CopyMetadataWithSourceLock(targetPath, upload.versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }, lock); err != nil { + return lock, err } - if _, err := os.Create(targetPath); err != nil { - return err + // keep mtime from previous version + if err := os.Chtimes(upload.versionsPath, vfi.ModTime(), vfi.ModTime()); err != nil { + return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) } - // copy grant and arbitrary metadata - // NOTE: now restoring an older revision might bring back a grant that was removed! - if err := xattrs.CopyMetadata(upload.versionsPath, targetPath, func(attributeName string) bool { - return true - // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties - /* - [> - return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants - strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata - strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites - strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file - */ - }); err != nil { - return err + // update mtime of current version + mtime := time.Now() + if err := os.Chtimes(n.InternalPath(), mtime, mtime); err != nil { + return nil, err } - return nil + return lock, nil } // lookupNode looks up nodes by path. diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 7164396dfd..490a1ab9da 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/utils" "github.com/golang-jwt/jwt" "github.com/pkg/errors" @@ -227,7 +228,14 @@ func (upload *Upload) FinishUpload(_ context.Context) error { } } - n, err := CreateNodeForUpload(upload) + // update checksums + attrs := map[string]string{ + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), + } + + n, err := CreateNodeForUpload(upload, attrs) if err != nil { Cleanup(upload, true, false) return err @@ -235,11 +243,6 @@ func (upload *Upload) FinishUpload(_ context.Context) error { upload.Node = n - // now try write all checksums - tryWritingChecksum(log, upload.Node, "sha1", sha1h) - tryWritingChecksum(log, upload.Node, "md5", md5h) - tryWritingChecksum(log, upload.Node, "adler32", adler32h) - if upload.pub != nil { u, _ := ctxpkg.ContextGetUser(upload.Ctx) s, err := upload.URL(upload.Ctx) @@ -459,13 +462,3 @@ func joinurl(paths ...string) string { return s.String() } - -func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { - if err := n.SetChecksum(algo, h); err != nil { - log.Err(err). - Str("csType", algo). - Bytes("hash", h.Sum(nil)). - Msg("Decomposedfs: could not write checksum") - // this is not critical, the bytes are there so we will continue - } -} diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index 79e91332ea..c98a499851 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -164,6 +164,10 @@ func AcquireWriteLock(file string) (*flock.Flock, error) { // ReleaseLock releases a lock from a file that was previously created // by AcquireReadLock or AcquireWriteLock. func ReleaseLock(lock *flock.Flock) error { + if lock == nil { + return errors.New("cannot unlock nil lock") + } + // there is a probability that if the file can not be unlocked, // we also can not remove the file. We will only try to remove if it // was successfully unlocked. From 2d407b6903cb9c6c7e8149baa4ec0bbd55e5ceed Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Dec 2022 10:16:26 +0100 Subject: [PATCH 07/11] disable 0-byte touches Signed-off-by: jkoberg --- internal/http/services/owncloud/ocdav/put.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 007ec21b3a..443b1ee627 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -137,6 +137,7 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ return } + /* FIXME: to bring back 0-byte touch instead upload return fileid in TouchFileRequest and add it to response headers if length == 0 { tfRes, err := s.gwClient.TouchFile(ctx, &provider.TouchFileRequest{ Ref: ref, @@ -154,6 +155,7 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ w.WriteHeader(http.StatusCreated) return } + */ opaqueMap := map[string]*typespb.OpaqueEntry{ net.HeaderUploadLength: { From 7900594abc2c1da378fc0d61873196c8b2b3a48a Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Dec 2022 16:14:09 +0100 Subject: [PATCH 08/11] extract regexp to global var Signed-off-by: jkoberg --- pkg/storage/utils/decomposedfs/upload.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index b62dfecfb8..537d873e3f 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -43,6 +43,8 @@ import ( "github.com/pkg/errors" ) +var _idRegexp = regexp.MustCompile(".*/([^/]+).info") + // Upload uploads data to the given resource // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? @@ -290,9 +292,8 @@ func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error return nil, err } - idRegexp := regexp.MustCompile(".*/([^/]+).info") for _, info := range infoFiles { - match := idRegexp.FindStringSubmatch(info) + match := _idRegexp.FindStringSubmatch(info) if match == nil || len(match) < 2 { continue } From 473d092a6b548b115e7e3aff26890f7f92ad7a3d Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 13 Dec 2022 11:14:41 +0100 Subject: [PATCH 09/11] get rid of *Metadata funcs Signed-off-by: jkoberg --- pkg/storage/utils/decomposedfs/node/node.go | 37 +++------------------ 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 8777b457e7..8d65981747 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -128,33 +128,6 @@ func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { return } -// SetMetadata populates a given key with its value. -// Note that consumers should be aware of the metadata options on xattrs.go. -func (n *Node) SetMetadata(key string, val string) (err error) { - nodePath := n.InternalPath() - if err := xattrs.Set(nodePath, key, val); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set extended attribute") - } - return nil -} - -// RemoveMetadata removes a given key -func (n *Node) RemoveMetadata(key string) (err error) { - if err = xattrs.Remove(n.InternalPath(), key); err == nil || xattrs.IsAttrUnset(err) { - return nil - } - return err -} - -// GetMetadata reads the metadata for the given key -func (n *Node) GetMetadata(key string) (val string, err error) { - nodePath := n.InternalPath() - if val, err = xattrs.Get(nodePath, key); err != nil { - return "", errors.Wrap(err, "Decomposedfs: could not get extended attribute") - } - return val, nil -} - // WriteAllNodeMetadata writes the Node metadata to disk func (n *Node) WriteAllNodeMetadata() (err error) { attribs := make(map[string]string) @@ -1224,17 +1197,17 @@ func (n *Node) FindStorageSpaceRoot() error { // MarkProcessing marks the node as being processed func (n *Node) MarkProcessing() error { - return n.SetMetadata(xattrs.StatusPrefix, ProcessingStatus) + return n.SetXattr(xattrs.StatusPrefix, ProcessingStatus) } // UnmarkProcessing removes the processing flag from the node func (n *Node) UnmarkProcessing() error { - return n.RemoveMetadata(xattrs.StatusPrefix) + return n.RemoveXattr(xattrs.StatusPrefix) } // IsProcessing returns true if the node is currently being processed func (n *Node) IsProcessing() bool { - v, err := n.GetMetadata(xattrs.StatusPrefix) + v, err := n.Xattr(xattrs.StatusPrefix) return err == nil && v == ProcessingStatus } @@ -1254,7 +1227,7 @@ func (n *Node) SetScanData(info string, date time.Time) error { // ScanData returns scanning information of the node func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { - ti, _ := n.GetMetadata(xattrs.ScanDatePrefix) + ti, _ := n.Xattr(xattrs.ScanDatePrefix) if ti == "" { return // not scanned yet } @@ -1264,7 +1237,7 @@ func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { return } - i, err := n.GetMetadata(xattrs.ScanStatusPrefix) + i, err := n.Xattr(xattrs.ScanStatusPrefix) if err != nil { return } From a37d40b8795afddff4a5d993681ad132ca303c63 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 13 Dec 2022 11:32:47 +0100 Subject: [PATCH 10/11] add custom type for PermissionFunc Signed-off-by: jkoberg --- pkg/storage/utils/decomposedfs/lookup/lookup.go | 2 +- pkg/storage/utils/decomposedfs/node/node.go | 4 ++-- pkg/storage/utils/decomposedfs/node/permissions.go | 10 ++++++++++ pkg/storage/utils/decomposedfs/tree/tree.go | 4 ++-- pkg/storage/utils/decomposedfs/upload.go | 2 +- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index ea0cbe981a..f9477ace08 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -105,7 +105,7 @@ func (lu *Lookup) NodeFromSpaceID(ctx context.Context, id *provider.ResourceId) } // Path returns the path for node -func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission func(*node.Node) bool) (p string, err error) { +func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (p string, err error) { root := n.SpaceRoot for n.ID != root.ID { p = filepath.Join(n.Name, p) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 8d65981747..8cefdf5572 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -93,7 +93,7 @@ type Node struct { type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *Node, hasPermission func(*Node) bool) (path string, err error) + Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error) } // New returns a new instance of Node @@ -635,7 +635,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi case returnBasename: fn = n.Name default: - fn, err = n.lu.Path(ctx, n, func(*Node) bool { return true }) + fn, err = n.lu.Path(ctx, n, NoCheck) if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 9a076f7f65..c42c67f816 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -30,6 +30,16 @@ import ( "github.com/pkg/errors" ) +// PermissionFunc should return true when the user has permission to access the node +type PermissionFunc func(*Node) bool + +var ( + // NoCheck doesn't check permissions, returns true always + NoCheck PermissionFunc = func(_ *Node) bool { + return true + } +) + // NoPermissions represents an empty set of permissions func NoPermissions() provider.ResourcePermissions { return provider.ResourcePermissions{} diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 5fe9e32a3d..331945bac6 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -61,7 +61,7 @@ type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *node.Node, hasPermission func(*node.Node) bool) (path string, err error) + Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error) } // Tree manages a hierarchical tree @@ -479,7 +479,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { } // get the original path - origin, err := t.lookup.Path(ctx, n, func(*node.Node) bool { return true }) + origin, err := t.lookup.Path(ctx, n, node.NoCheck) if err != nil { return } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 537d873e3f..4e707f3ba5 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -142,7 +142,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere // permissions are checked in NewUpload below - relative, err := fs.lu.Path(ctx, n, func(*node.Node) bool { return true }) + relative, err := fs.lu.Path(ctx, n, node.NoCheck) if err != nil { return nil, err } From 38111916114165489577a3ca59f72718a94dc6cb Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 13 Dec 2022 12:09:38 +0100 Subject: [PATCH 11/11] bring annoying linting message back Signed-off-by: jkoberg --- .golangci.yaml | 2 -- pkg/storage/utils/decomposedfs/decomposedfs.go | 2 +- pkg/storage/utils/decomposedfs/upload/processing.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 8fbf1c6276..46904acca7 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,6 +1,4 @@ issues: - exclude: - - "Error return value of .* is not checked" exclude-rules: - path: internal/http/interceptors/log/log.go text: "SA1019:" diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index d6c4dfa179..2c0617bc7f 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -271,7 +271,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { } else { // update parent tmtime to propagate etag change now := time.Now() - p.SetTMTime(&now) + _ = p.SetTMTime(&now) if err := fs.tp.Propagate(ctx, p, 0); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 2fe437b086..90da63a77a 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -273,7 +273,7 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) } - defer filelocks.ReleaseLock(lock) + defer filelocks.ReleaseLock(lock) //nolint:errcheck if err != nil { return nil, err }