diff --git a/changelog/unreleased/async-postprocessing.md b/changelog/unreleased/async-postprocessing.md new file mode 100644 index 0000000000..4092f38ad6 --- /dev/null +++ b/changelog/unreleased/async-postprocessing.md @@ -0,0 +1,5 @@ +Enhancement: Async Postprocessing + +Provides functionality for async postprocessing. This will allow the system to do the postprocessing (virusscan, copying of bytes to their final destination, ...) asynchronous to the users request. Major change when active. + +https://github.com/cs3org/reva/pull/3531 diff --git a/internal/http/services/appprovider/appprovider.go b/internal/http/services/appprovider/appprovider.go index 4a88210bd0..934166c75f 100644 --- a/internal/http/services/appprovider/appprovider.go +++ b/internal/http/services/appprovider/appprovider.go @@ -384,6 +384,17 @@ func (s *svc) handleOpen(openMode int) http.HandlerFunc { Path: ".", } + statRes, err := client.Stat(ctx, &provider.StatRequest{Ref: fileRef}) + if err != nil { + writeError(w, r, appErrorServerError, "Internal error accessing the file, please try again later", err) + return + } + + if status := utils.ReadPlainFromOpaque(statRes.GetInfo().GetOpaque(), "status"); status == "processing" { + writeError(w, r, appErrorTooEarly, "The requested file is not yet available, please try again later", nil) + return + } + viewMode, err := getViewModeFromPublicScope(ctx) if err != nil { writeError(w, r, appErrorPermissionDenied, "permission denied to open the application", err) diff --git a/internal/http/services/appprovider/errors.go b/internal/http/services/appprovider/errors.go index 311a4c6295..69d4ec5306 100644 --- a/internal/http/services/appprovider/errors.go +++ b/internal/http/services/appprovider/errors.go @@ -36,6 +36,7 @@ const ( appErrorUnimplemented appErrorCode = "NOT_IMPLEMENTED" appErrorInvalidParameter appErrorCode = "INVALID_PARAMETER" appErrorServerError appErrorCode = "SERVER_ERROR" + appErrorTooEarly appErrorCode = "TOO_EARLY" ) // appErrorCodeMapping stores the HTTP error code mapping for various APIErrorCodes @@ -47,6 +48,7 @@ var appErrorCodeMapping = map[appErrorCode]int{ appErrorInvalidParameter: http.StatusBadRequest, appErrorServerError: http.StatusInternalServerError, appErrorPermissionDenied: http.StatusForbidden, + appErrorTooEarly: http.StatusTooEarly, } // APIError encompasses the error type and message diff --git a/internal/http/services/owncloud/ocdav/propfind/propfind.go b/internal/http/services/owncloud/ocdav/propfind/propfind.go index 5c0ed7e190..0373e4c92f 100644 --- a/internal/http/services/owncloud/ocdav/propfind/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind/propfind.go @@ -1476,6 +1476,14 @@ func mdToPropResponse(ctx context.Context, pf *XML, md *provider.ResourceInfo, p } } + if status := utils.ReadPlainFromOpaque(md.Opaque, "status"); status == "processing" { + response.Propstat = append(response.Propstat, PropstatXML{ + Status: "HTTP/1.1 425 TOO EARLY", + Prop: propstatOK.Prop, + }) + return &response, nil + } + if len(propstatOK.Prop) > 0 { response.Propstat = append(response.Propstat, propstatOK) } diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 4a94ad0a16..443b1ee627 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -137,6 +137,26 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ return } + /* FIXME: to bring back 0-byte touch instead upload return fileid in TouchFileRequest and add it to response headers + if length == 0 { + tfRes, err := s.gwClient.TouchFile(ctx, &provider.TouchFileRequest{ + Ref: ref, + }) + if err != nil { + log.Error().Err(err).Msg("error sending grpc touch file request") + w.WriteHeader(http.StatusInternalServerError) + return + } + if tfRes.Status.Code != rpc.Code_CODE_OK { + log.Error().Interface("status", tfRes.Status).Msg("error touching file") + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusCreated) + return + } + */ + opaqueMap := map[string]*typespb.OpaqueEntry{ net.HeaderUploadLength: { Decoder: "plain", diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go new file mode 100644 index 0000000000..d49ff9e920 --- /dev/null +++ b/pkg/events/postprocessing.go @@ -0,0 +1,164 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package events + +import ( + "encoding/json" + "time" + + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +type ( + // Postprocessingstep are the available postprocessingsteps + Postprocessingstep string + + // PostprocessingOutcome defines the result of the postprocessing + PostprocessingOutcome string +) + +var ( + // PPStepAntivirus is the step that scans for viruses + PPStepAntivirus Postprocessingstep = "virusscan" + // PPStepFTS is the step that indexes files for full text search + PPStepFTS Postprocessingstep = "fts" + // PPStepDelay is the step that processing. Useful for testing or user annoyment + PPStepDelay Postprocessingstep = "delay" + + // PPOutcomeDelete means that the file and the upload should be deleted + PPOutcomeDelete PostprocessingOutcome = "delete" + // PPOutcomeAbort means that the upload is cancelled but the bytes are being kept in the upload folder + PPOutcomeAbort PostprocessingOutcome = "abort" + // PPOutcomeContinue means that the upload is moved to its final destination (eventually being marked with pp results) + PPOutcomeContinue PostprocessingOutcome = "continue" +) + +// BytesReceived is emitted by the server when it received all bytes of an upload +type BytesReceived struct { + UploadID string + SpaceOwner *user.UserId + ExecutingUser *user.User + ResourceID *provider.ResourceId + Filename string + Filesize uint64 + URL string +} + +// Unmarshal to fulfill umarshaller interface +func (BytesReceived) Unmarshal(v []byte) (interface{}, error) { + e := BytesReceived{} + err := json.Unmarshal(v, &e) + return e, err +} + +// VirusscanFinished is emitted by the server when it has completed an antivirus scan +type VirusscanFinished struct { + Infected bool + Outcome PostprocessingOutcome + UploadID string + Filename string + ExecutingUser *user.User + Description string + Scandate time.Time + ResourceID *provider.ResourceId + ErrorMsg string // empty when no error +} + +// Unmarshal to fulfill umarshaller interface +func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) { + e := VirusscanFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// StartPostprocessingStep can be issued by the server to start a postprocessing step +type StartPostprocessingStep struct { + UploadID string + URL string + ExecutingUser *user.User + Filename string + Filesize uint64 + Token string // for file retrieval in after upload case + ResourceID *provider.ResourceId // for file retrieval in after upload case + RevaToken string // for file retrieval in after upload case + + StepToStart Postprocessingstep +} + +// Unmarshal to fulfill umarshaller interface +func (StartPostprocessingStep) Unmarshal(v []byte) (interface{}, error) { + e := StartPostprocessingStep{} + err := json.Unmarshal(v, &e) + return e, err +} + +// PostprocessingStepFinished can be issued by the server when a postprocessing step is finished +type PostprocessingStepFinished struct { + UploadID string + ExecutingUser *user.User + Filename string + + FinishedStep Postprocessingstep // name of the step + Result interface{} // result information + Error error // possible error of the step + Outcome PostprocessingOutcome // some services may cause postprocessing to stop +} + +// Unmarshal to fulfill umarshaller interface +func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) { + e := PostprocessingStepFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// PostprocessingFinished is emitted by *some* service which can decide that +type PostprocessingFinished struct { + UploadID string + Filename string + SpaceOwner *user.UserId + ExecutingUser *user.User + Result map[Postprocessingstep]interface{} // it is a map[step]Event + Outcome PostprocessingOutcome +} + +// Unmarshal to fulfill umarshaller interface +func (PostprocessingFinished) Unmarshal(v []byte) (interface{}, error) { + e := PostprocessingFinished{} + err := json.Unmarshal(v, &e) + return e, err +} + +// UploadReady is emitted by the storage provider when postprocessing is finished +type UploadReady struct { + UploadID string + Filename string + SpaceOwner *user.UserId + ExecutingUser *user.User + FileRef *provider.Reference + Failed bool + // add reference here? We could use it to inform client pp is finished +} + +// Unmarshal to fulfill umarshaller interface +func (UploadReady) Unmarshal(v []byte) (interface{}, error) { + e := UploadReady{} + err := json.Unmarshal(v, &e) + return e, err +} diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 354c5c20df..2c0617bc7f 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -23,7 +23,10 @@ package decomposedfs //go:generate make --no-print-directory -C ../../../.. mockery NAME=Tree import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" "io" "net/url" "os" @@ -32,6 +35,7 @@ import ( "strconv" "strings" "syscall" + "time" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" @@ -39,19 +43,24 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/server" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" + "github.com/cs3org/reva/v2/pkg/storage/cache" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storage/utils/templates" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/go-micro/plugins/v4/events/natsjs" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -99,6 +108,8 @@ type Decomposedfs struct { p PermissionsChecker chunkHandler *chunking.ChunkHandler permissionsClient CS3PermissionsClient + stream events.Stream + cache cache.StatCache } // NewDefault returns an instance with default components @@ -126,10 +137,10 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore) (storage.FS, error) // New returns an implementation of the storage.FS interface that talks to // a local filesystem. func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, permissionsClient CS3PermissionsClient) (storage.FS, error) { + log := logger.New() err := tp.Setup() if err != nil { - logger.New().Error().Err(err). - Msg("could not setup tree") + log.Error().Err(err).Msg("could not setup tree") return nil, errors.Wrap(err, "could not setup tree") } @@ -141,14 +152,271 @@ func New(o *options.Options, lu *lookup.Lookup, p PermissionsChecker, tp Tree, p filelocks.SetLockCycleDurationFactor(o.LockCycleDurationFactor) } - return &Decomposedfs{ + var ev events.Stream + if o.Events.NatsAddress != "" { + evtsCfg := o.Events + var ( + rootCAPool *x509.CertPool + tlsConf *tls.Config + ) + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + var certBytes bytes.Buffer + if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { + return nil, err + } + + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) + evtsCfg.TLSInsecure = false + + tlsConf = &tls.Config{ + InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } + } + + ev, err = server.NewNatsStream( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.NatsAddress), + natsjs.ClusterID(evtsCfg.NatsClusterID), + ) + if err != nil { + return nil, err + } + } + + fs := &Decomposedfs{ tp: tp, lu: lu, o: o, p: p, chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")), permissionsClient: permissionsClient, - }, nil + stream: ev, + cache: cache.GetStatCache(o.StatCache.CacheStore, o.StatCache.CacheNodes, o.StatCache.CacheDatabase, "stat", 0), + } + + if o.AsyncFileUploads { + if o.Events.NatsAddress == "" { + log.Error().Msg("need nats for async file processing") + return nil, errors.New("need nats for async file processing") + } + + ch, err := events.Consume(ev, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{}) + if err != nil { + return nil, err + } + + if o.Events.NumConsumers <= 0 { + o.Events.NumConsumers = 1 + } + + for i := 0; i < o.Events.NumConsumers; i++ { + go fs.Postprocessing(ch) + } + } + + return fs, nil +} + +// Postprocessing starts the postprocessing result collector +func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { + ctx := context.TODO() + log := logger.New() + for event := range ch { + switch ev := event.(type) { + case events.PostprocessingFinished: + up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't delete the blob + } + + var ( + failed bool + keepUpload bool + ) + + switch ev.Outcome { + default: + log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + fallthrough + case events.PPOutcomeAbort: + failed = true + keepUpload = true + case events.PPOutcomeContinue: + if err := up.Finalize(); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + case events.PPOutcomeDelete: + failed = true + } + + n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + continue + } + up.Node = n + + if p, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], n.ParentID, false); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + } else { + // update parent tmtime to propagate etag change + now := time.Now() + _ = p.SetTMTime(&now) + if err := fs.tp.Propagate(ctx, p, 0); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + + upload.Cleanup(up, failed, keepUpload) + + if err := events.Publish( + fs.stream, + events.UploadReady{ + UploadID: ev.UploadID, + Failed: failed, + ExecutingUser: ev.ExecutingUser, + FileRef: &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: up.Info.MetaData["providerID"], + SpaceId: up.Info.Storage["SpaceRoot"], + OpaqueId: up.Info.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])), + }, + }, + ); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + } + + /* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED + case events.VirusscanFinished: + if ev.ErrorMsg != "" { + // scan failed somehow + // Should we handle this here? + continue + } + + var n *node.Node + switch ev.UploadID { + case "": + // uploadid is empty -> this was an on-demand scan + ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) + ref := &provider.Reference{ResourceId: ev.ResourceID} + + no, err := fs.lu.NodeFromResource(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") + continue + + } + n = no + if ev.Outcome == events.PPOutcomeDelete { + // antivir wants us to delete the file. We must obey and need to + + // check if there a previous versions existing + revs, err := fs.ListRevisions(ctx, ref) + if len(revs) == 0 { + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") + } + + // no versions -> trash file + err := fs.Delete(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") + continue + } + + // now purge it from the recycle bin + if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + + // we have versions - find the newest + versions := make(map[uint64]string) // remember all versions - we need them later + var nv uint64 + for _, v := range revs { + versions[v.Mtime] = v.Key + if v.Mtime > nv { + nv = v.Mtime + } + } + + // restore newest version + if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") + continue + } + + // now find infected version + revs, err = fs.ListRevisions(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") + } + + for _, v := range revs { + // we looking for a version that was previously not there + if _, ok := versions[v.Mtime]; ok { + continue + } + + if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + default: + // uploadid is not empty -> this is an async upload + up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue + } + + no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false) + if err != nil { + log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") + continue + } + + n = no + } + + if err := n.SetScanData(ev.Description, ev.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results") + continue + } + + // remove cache entry in gateway + fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + */ + default: + log.Error().Interface("event", ev).Msg("Unknown event") + } + } + } // Shutdown shuts down the storage @@ -270,12 +538,11 @@ func (fs *Decomposedfs) GetHome(ctx context.Context) (string, error) { // GetPathByID returns the fn pointed by the file id, without the internal namespace func (fs *Decomposedfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (string, error) { - node, err := fs.lu.NodeFromID(ctx, id) + n, err := fs.lu.NodeFromID(ctx, id) if err != nil { return "", err } - - rp, err := fs.p.AssemblePermissions(ctx, node) + rp, err := fs.p.AssemblePermissions(ctx, n) switch { case err != nil: return "", errtypes.InternalError(err.Error()) @@ -287,7 +554,14 @@ func (fs *Decomposedfs) GetPathByID(ctx context.Context, id *provider.ResourceId return "", errtypes.NotFound(f) } - return fs.lu.Path(ctx, node) + hp := func(n *node.Node) bool { + perms, err := fs.p.AssemblePermissions(ctx, n) + if err != nil { + return false + } + return perms.GetPath + } + return fs.lu.Path(ctx, n, hp) } // CreateDir creates the specified directory @@ -555,10 +829,13 @@ func (fs *Decomposedfs) ListFolder(ctx context.Context, ref *provider.Reference, // add this childs permissions pset, _ := n.PermissionSet(ctx) node.AddPermissions(&np, &pset) - if ri, err := children[i].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)); err == nil { - finfos = append(finfos, ri) + ri, err := children[i].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)) + if err != nil { + return nil, errtypes.InternalError(err.Error()) } + finfos = append(finfos, ri) } + return } diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index 0bd6ede652..f9477ace08 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -105,7 +105,7 @@ func (lu *Lookup) NodeFromSpaceID(ctx context.Context, id *provider.ResourceId) } // Path returns the path for node -func (lu *Lookup) Path(ctx context.Context, n *node.Node) (p string, err error) { +func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (p string, err error) { root := n.SpaceRoot for n.ID != root.ID { p = filepath.Join(n.Name, p) @@ -117,6 +117,10 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node) (p string, err error) Msg("Path()") return } + + if !hasPermission(n) { + break + } } p = filepath.Join("/", p) return diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup_test.go b/pkg/storage/utils/decomposedfs/lookup/lookup_test.go index aacdcc1789..16aa345951 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup_test.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup_test.go @@ -20,6 +20,7 @@ package lookup_test import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" helpers "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/testhelpers" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" . "github.com/onsi/ginkgo/v2" @@ -51,7 +52,7 @@ var _ = Describe("Lookup", func() { }) Expect(err).ToNot(HaveOccurred()) - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) }) @@ -65,7 +66,7 @@ var _ = Describe("Lookup", func() { }) Expect(err).ToNot(HaveOccurred()) - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/subdir1/file2")) Expect(n.SpaceRoot.Name).To(Equal("userid")) @@ -88,7 +89,7 @@ var _ = Describe("Lookup", func() { Expect(n.SpaceRoot).ToNot(BeNil()) // Check if we got the right node and spaceRoot - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) Expect(n.SpaceRoot.Name).To(Equal("userid")) @@ -111,7 +112,7 @@ var _ = Describe("Lookup", func() { Expect(n.SpaceRoot).ToNot(BeNil()) // Check if we got the right node and spaceRoot - path, err := env.Lookup.Path(env.Ctx, n) + path, err := env.Lookup.Path(env.Ctx, n, func(n *node.Node) bool { return true }) Expect(err).ToNot(HaveOccurred()) Expect(path).To(Equal("/dir1/file1")) Expect(n.SpaceRoot.Name).To(Equal("userid")) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index dd0f571003..8cefdf5572 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -68,6 +68,9 @@ const ( // RootID defines the root node's ID RootID = "root" + + // ProcessingStatus is the name of the status when processing a file + ProcessingStatus = "processing" ) // Node represents a node in the tree and provides methods to get a Parent or Child instance @@ -90,7 +93,7 @@ type Node struct { type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *Node) (path string, err error) + Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error) } // New returns a new instance of Node @@ -632,7 +635,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi case returnBasename: fn = n.Name default: - fn, err = n.lu.Path(ctx, n) + fn, err = n.lu.Path(ctx, n, NoCheck) if err != nil { return nil, err } @@ -659,6 +662,10 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi Name: n.Name, } + if n.IsProcessing() { + ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "status", "processing") + } + if nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER { ts, err := n.GetTreeSize() if err == nil { @@ -804,6 +811,11 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi Metadata: metadata, } + // add virusscan information + if scanned, _, date := n.ScanData(); scanned { + ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "scantime", date.Format(time.RFC3339Nano)) + } + sublog.Debug(). Interface("ri", ri). Msg("AsResourceInfo") @@ -1183,12 +1195,56 @@ func (n *Node) FindStorageSpaceRoot() error { return nil } +// MarkProcessing marks the node as being processed +func (n *Node) MarkProcessing() error { + return n.SetXattr(xattrs.StatusPrefix, ProcessingStatus) +} + +// UnmarkProcessing removes the processing flag from the node +func (n *Node) UnmarkProcessing() error { + return n.RemoveXattr(xattrs.StatusPrefix) +} + +// IsProcessing returns true if the node is currently being processed +func (n *Node) IsProcessing() bool { + v, err := n.Xattr(xattrs.StatusPrefix) + return err == nil && v == ProcessingStatus +} + // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot() bool { _, err := n.Xattr(xattrs.SpaceNameAttr) return err == nil } +// SetScanData sets the virus scan info to the node +func (n *Node) SetScanData(info string, date time.Time) error { + return xattrs.SetMultiple(n.InternalPath(), map[string]string{ + xattrs.ScanStatusPrefix: info, + xattrs.ScanDatePrefix: date.Format(time.RFC3339Nano), + }) +} + +// ScanData returns scanning information of the node +func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { + ti, _ := n.Xattr(xattrs.ScanDatePrefix) + if ti == "" { + return // not scanned yet + } + + t, err := time.Parse(time.RFC3339Nano, ti) + if err != nil { + return + } + + i, err := n.Xattr(xattrs.ScanStatusPrefix) + if err != nil { + return + } + + return true, i, t +} + // CheckQuota checks if both disk space and available quota are sufficient // Overwrite must be set to true if the new file replaces the old file e.g. // when creating a new file version. In such a case the function will diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 9a076f7f65..c42c67f816 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -30,6 +30,16 @@ import ( "github.com/pkg/errors" ) +// PermissionFunc should return true when the user has permission to access the node +type PermissionFunc func(*Node) bool + +var ( + // NoCheck doesn't check permissions, returns true always + NoCheck PermissionFunc = func(_ *Node) bool { + return true + } +) + // NoPermissions represents an empty set of permissions func NoPermissions() provider.ResourcePermissions { return provider.ResourcePermissions{} diff --git a/pkg/storage/utils/decomposedfs/options/options.go b/pkg/storage/utils/decomposedfs/options/options.go index 3ae5b5cbdf..4afe2e38d2 100644 --- a/pkg/storage/utils/decomposedfs/options/options.go +++ b/pkg/storage/utils/decomposedfs/options/options.go @@ -53,10 +53,42 @@ type Options struct { PersonalSpaceAliasTemplate string `mapstructure:"personalspacealias_template"` GeneralSpaceAliasTemplate string `mapstructure:"generalspacealias_template"` + AsyncFileUploads bool `mapstructure:"asyncfileuploads"` + + Events EventOptions `mapstructure:"events"` + + Tokens TokenOptions `mapstructure:"tokens"` + + StatCache CacheOptions `mapstructure:"statcache"` + MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"` LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"` } +// EventOptions are the configurable options for events +type EventOptions struct { + NatsAddress string `mapstructure:"natsaddress"` + NatsClusterID string `mapstructure:"natsclusterid"` + TLSInsecure bool `mapstructure:"tlsinsecure"` + TLSRootCACertificate string `mapstructure:"tlsrootcacertificate"` + NumConsumers int `mapstructure:"numconsumers"` +} + +// TokenOptions are the configurable option for tokens +type TokenOptions struct { + DownloadEndpoint string `mapstructure:"download_endpoint"` + DataGatewayEndpoint string `mapstructure:"datagateway_endpoint"` + TransferSharedSecret string `mapstructure:"transfer_shared_secret"` + TransferExpires int64 `mapstructure:"transfer_expires"` +} + +// CacheOptions contains options of configuring a cache +type CacheOptions struct { + CacheStore string `mapstructure:"cache_store"` + CacheNodes []string `mapstructure:"cache_nodes"` + CacheDatabase string `mapstructure:"cache_database"` +} + // New returns a new Options instance for the given configuration func New(m map[string]interface{}) (*Options, error) { o := &Options{} diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index dab017d137..e97b970392 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -278,3 +278,55 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer log.Error().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("original node does not exist") return nil } + +// DeleteRevision deletes the specified revision of the resource +func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Reference, revisionKey string) error { + n, err := fs.getRevisionNode(ctx, ref, revisionKey, func(rp *provider.ResourcePermissions) bool { + return rp.RestoreFileVersion + }) + if err != nil { + return err + } + + if err := os.RemoveAll(fs.lu.InternalPath(n.SpaceID, revisionKey)); err != nil { + return err + } + + return fs.tp.DeleteBlob(n) +} + +func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { + log := appctx.GetLogger(ctx) + + // verify revision key format + kp := strings.SplitN(revisionKey, node.RevisionIDDelimiter, 2) + if len(kp) != 2 { + log.Error().Str("revisionKey", revisionKey).Msg("malformed revisionKey") + return nil, errtypes.NotFound(revisionKey) + } + log.Debug().Str("revisionKey", revisionKey).Msg("DownloadRevision") + + spaceID := ref.ResourceId.SpaceId + // check if the node is available and has not been deleted + n, err := node.ReadNode(ctx, fs.lu, spaceID, kp[0], false) + if err != nil { + return nil, err + } + if !n.Exists { + err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) + return nil, err + } + + p, err := fs.p.AssemblePermissions(ctx, n) + switch { + case err != nil: + return nil, errtypes.InternalError(err.Error()) + case !hasPermission(&p): + return nil, errtypes.PermissionDenied(filepath.Join(n.ParentID, n.Name)) + } + + // Set space owner in context + storagespace.ContextSendSpaceOwnerID(ctx, n.SpaceOwnerOrManager(ctx)) + + return n, nil +} diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 7aab6712ba..331945bac6 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -19,6 +19,7 @@ package tree import ( + "bytes" "context" "fmt" "io" @@ -60,7 +61,7 @@ type PathLookup interface { InternalRoot() string InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *node.Node) (path string, err error) + Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error) } // Tree manages a hierarchical tree @@ -478,7 +479,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { } // get the original path - origin, err := t.lookup.Path(ctx, n) + origin, err := t.lookup.Path(ctx, n, node.NoCheck) if err != nil { return } @@ -915,6 +916,10 @@ func (t *Tree) WriteBlob(node *node.Node, reader io.Reader) error { // ReadBlob reads a blob from the blobstore func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { + if node.BlobID == "" { + // there is no blob yet - we are dealing with a 0 byte file + return io.NopCloser(bytes.NewReader([]byte{})), nil + } return t.blobstore.Download(node) } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 66c62acc6b..4e707f3ba5 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -20,15 +20,7 @@ package decomposedfs import ( "context" - "crypto/md5" - "crypto/sha1" - "encoding/hex" - "encoding/json" - "fmt" - "hash" - "hash/adler32" "io" - iofs "io/fs" "os" "path/filepath" "regexp" @@ -45,30 +37,26 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" - "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/google/uuid" "github.com/pkg/errors" ) -var defaultFilePerm = os.FileMode(0664) +var _idRegexp = regexp.MustCompile(".*/([^/]+).info") // Upload uploads data to the given resource // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { - upload, err := fs.GetUpload(ctx, ref.GetPath()) + up, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") } - uploadInfo := upload.(*fileUpload) + uploadInfo := up.(*upload.Upload) - p := uploadInfo.info.Storage["NodeName"] + p := uploadInfo.Info.Storage["NodeName"] if chunking.IsChunked(p) { // check chunking v1 var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) @@ -81,7 +69,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } - uploadInfo.info.Storage["NodeName"] = p + uploadInfo.Info.Storage["NodeName"] = p fd, err := os.Open(assembledFile) if err != nil { return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") @@ -100,7 +88,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } if uff != nil { - info := uploadInfo.info + info := uploadInfo.Info uploadRef := &provider.Reference{ ResourceId: &provider.ResourceId{ StorageId: info.MetaData["providerID"], @@ -109,7 +97,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i }, Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) + owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } @@ -122,14 +110,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i ri := provider.ResourceInfo{ // fill with at least fileid, mtime and etag Id: &provider.ResourceId{ - StorageId: uploadInfo.info.MetaData["providerID"], - SpaceId: uploadInfo.info.Storage["SpaceRoot"], - OpaqueId: uploadInfo.info.Storage["NodeId"], + StorageId: uploadInfo.Info.MetaData["providerID"], + SpaceId: uploadInfo.Info.Storage["SpaceRoot"], + OpaqueId: uploadInfo.Info.Storage["NodeId"], }, - Etag: uploadInfo.info.MetaData["etag"], + Etag: uploadInfo.Info.MetaData["etag"], } - if mtime, err := utils.MTimeToTS(uploadInfo.info.MetaData["mtime"]); err == nil { + if mtime, err := utils.MTimeToTS(uploadInfo.Info.MetaData["mtime"]); err == nil { ri.Mtime = &mtime } @@ -154,7 +142,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere // permissions are checked in NewUpload below - relative, err := fs.lu.Path(ctx, n) + relative, err := fs.lu.Path(ctx, n, node.NoCheck) if err != nil { return nil, err } @@ -235,225 +223,23 @@ func (fs *Decomposedfs) UseIn(composer *tusd.StoreComposer) { // - the upload needs to implement the tusd.Upload interface: WriteChunk, GetInfo, GetReader and FinishUpload // NewUpload returns a new tus Upload instance -func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") - - if info.MetaData["filename"] == "" { - return nil, errors.New("Decomposedfs: missing filename in metadata") - } - if info.MetaData["dir"] == "" { - return nil, errors.New("Decomposedfs: missing dir in metadata") - } - - n, err := fs.lu.NodeFromSpaceID(ctx, &provider.ResourceId{ - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error getting space root node") - } - - n, err = fs.lookupNode(ctx, n, filepath.Join(info.MetaData["dir"], info.MetaData["filename"])) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - log.Debug().Interface("info", info).Interface("node", n).Msg("Decomposedfs: resolved filename") - - // the parent owner will become the new owner - p, perr := n.Parent() - if perr != nil { - return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) - } - - // check permissions - var checkNode *node.Node - var f string - if n.Exists { - // check permissions of file to be overwritten - checkNode = n - f, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: n.SpaceID, - OpaqueId: n.ID, - }}) - } else { - // check permissions of parent - checkNode = p - f, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: p.SpaceID, - OpaqueId: p.ID, - }, Path: n.Name}) - } - rp, err := fs.p.AssemblePermissions(ctx, checkNode) - switch { - case err != nil: - return nil, errtypes.InternalError(err.Error()) - case !rp.InitiateFileUpload: - if rp.Stat { - return nil, errtypes.PermissionDenied(f) - } - return nil, errtypes.NotFound(f) - } - - // if we are trying to overwriting a folder with a file - if n.Exists && n.IsDir() { - return nil, errtypes.PreconditionFailed("resource is not a file") - } - - // check lock - if info.MetaData["lockid"] != "" { - ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData["lockid"]) - } - if err := n.CheckLock(ctx); err != nil { - return nil, err - } - - info.ID = uuid.New().String() - - binPath, err := fs.getUploadPath(ctx, info.ID) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error resolving upload path") - } - usr := ctxpkg.ContextMustGetUser(ctx) - - spaceRoot := n.SpaceRoot.ID - if info.Storage != nil && info.Storage["SpaceRoot"] != "" { - spaceRoot = info.Storage["SpaceRoot"] - } - - info.Storage = map[string]string{ - "Type": "OCISStore", - "BinPath": binPath, - - "NodeId": n.ID, - "NodeParentId": n.ParentID, - "NodeName": n.Name, - "SpaceRoot": spaceRoot, - "SpaceOwnerOrManager": info.Storage["SpaceOwnerOrManager"], - - "Idp": usr.Id.Idp, - "UserId": usr.Id.OpaqueId, - "UserType": utils.UserTypeToString(usr.Id.Type), - "UserName": usr.Username, - - "LogLevel": log.GetLevel().String(), - } - // Create binary file in the upload folder with no content - log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info") - file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) - if err != nil { - return nil, err - } - defer file.Close() - - u := &fileUpload{ - info: info, - binPath: binPath, - infoPath: filepath.Join(fs.o.Root, "uploads", info.ID+".info"), - fs: fs, - ctx: ctx, - } - - // writeInfo creates the file by itself if necessary - err = u.writeInfo() - if err != nil { - return nil, err - } - - return u, nil -} - -func (fs *Decomposedfs) getUploadPath(ctx context.Context, uploadID string) (string, error) { - return filepath.Join(fs.o.Root, "uploads", uploadID), nil -} - -func (fs *Decomposedfs) readInfo(id string) (tusd.FileInfo, error) { - infoPath := filepath.Join(fs.o.Root, "uploads", id+".info") - - info := tusd.FileInfo{} - data, err := os.ReadFile(infoPath) - if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - // Interpret os.ErrNotExist as 404 Not Found - err = tusd.ErrNotFound - } - return tusd.FileInfo{}, err - } - if err := json.Unmarshal(data, &info); err != nil { - return tusd.FileInfo{}, err - } - - stat, err := os.Stat(info.Storage["BinPath"]) - if err != nil { - return tusd.FileInfo{}, err - } - info.Offset = stat.Size() - - return info, nil +func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd.Upload, error) { + return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } // GetUpload returns the Upload for the given upload id func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - l := appctx.GetLogger(ctx) - sub := l.With().Int("pid", os.Getpid()).Logger() - info, err := fs.readInfo(id) - if err != nil { - return nil, err - } - - u := &userpb.User{ - Id: &userpb.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - Type: utils.UserTypeMap(info.Storage["UserType"]), - }, - Username: info.Storage["UserName"], - } - - ctx = ctxpkg.ContextSetUser(ctx, u) - ctx = appctx.WithLogger(ctx, &sub) - - return &fileUpload{ - info: info, - binPath: info.Storage["BinPath"], - infoPath: filepath.Join(fs.o.Root, "uploads", id+".info"), - fs: fs, - ctx: ctx, - }, nil + return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } // ListUploads returns a list of all incomplete uploads func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { - return fs.uploadInfos() -} - -func (fs *Decomposedfs) uploadInfos() ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} - infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) - if err != nil { - return nil, err - } - - idRegexp := regexp.MustCompile(".*/([^/]+).info") - for _, info := range infoFiles { - match := idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { - continue - } - info, err := fs.readInfo(match[1]) - if err != nil { - return nil, err - } - infos = append(infos, info) - } - return infos, nil + return fs.uploadInfos(context.Background()) } // PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos() + infos, err := fs.uploadInfos(context.Background()) if err != nil { return err } @@ -478,535 +264,49 @@ func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) err return nil } -// lookupNode looks up nodes by path. -// This method can also handle lookups for paths which contain chunking information. -func (fs *Decomposedfs) lookupNode(ctx context.Context, spaceRoot *node.Node, path string) (*node.Node, error) { - p := path - isChunked := chunking.IsChunked(path) - if isChunked { - chunkInfo, err := chunking.GetChunkBLOBInfo(path) - if err != nil { - return nil, err - } - p = chunkInfo.Path - } - - n, err := fs.lu.WalkPath(ctx, spaceRoot, p, true, func(ctx context.Context, n *node.Node) error { return nil }) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - if isChunked { - n.Name = filepath.Base(path) - } - return n, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *Decomposedfs - // a context with a user - // TODO add logger as well? - ctx context.Context -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) - if err != nil { - return 0, err - } - defer file.Close() - - // calculate cheksum here? needed for the TUS checksum extension. https://tus.io/protocols/resumable-upload.html#checksum - // TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ... - // It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used - // but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ... - n, err := io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for the ocis driver it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() // TODO info is written here ... we need to truncate in DiscardChunk - - return n, err -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { - return os.Open(upload.binPath) -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) - if err != nil { - return err - } - return os.WriteFile(upload.infoPath, data, defaultFilePerm) -} - -// FinishUpload finishes an upload and moves the file to the internal destination -// -// # upload steps -// check if match header to fail early -// copy blob -// lock metadata node -// check if match header again as safeguard -// read metadata -// create version node with current metadata -// update node metadata with new blobid etc -// remember size diff -// unlock metadata -// propagate size diff and new etag -// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant -// - propagation needs to propagate the diff -func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { - - // ensure cleanup - defer upload.discardChunk() - - fi, err := os.Stat(upload.binPath) - if err != nil { - appctx.GetLogger(upload.ctx).Err(err).Msg("Decomposedfs: could not stat uploaded file") - return - } - - spaceID := upload.info.Storage["SpaceRoot"] - newNode := node.New( - spaceID, - upload.info.Storage["NodeId"], - upload.info.Storage["NodeParentId"], - upload.info.Storage["NodeName"], - fi.Size(), - "", - nil, - upload.fs.lu, - ) - newNode.SpaceRoot = node.New(spaceID, spaceID, "", "", 0, "", nil, upload.fs.lu) - - // check lock - if upload.info.MetaData["lockid"] != "" { - ctx = ctxpkg.ContextSetLockID(ctx, upload.info.MetaData["lockid"]) - } - if err := newNode.CheckLock(ctx); err != nil { - return err - } - - overwrite := newNode.ID != "" - var oldSize int64 - if overwrite { - // read size from existing node - old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, newNode.ID, false) - oldSize = old.Blobsize - } else { - // create new fileid - newNode.ID = uuid.New().String() - upload.info.Storage["NodeId"] = newNode.ID - } - - if _, err = node.CheckQuota(newNode.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil { - return err - } - - targetPath := newNode.InternalPath() - sublog := appctx.GetLogger(upload.ctx). - With(). - Interface("info", upload.info). - Str("spaceid", spaceID). - Str("nodeid", newNode.ID). - Str("binPath", upload.binPath). - Str("targetPath", targetPath). - Logger() - - // calculate the checksum of the written bytes - // they will all be written to the metadata later, so we cannot omit any of them - // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present - // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... - sha1h := sha1.New() - md5h := md5.New() - adler32h := adler32.New() - { - f, err := os.Open(upload.binPath) - if err != nil { - sublog.Err(err).Msg("Decomposedfs: could not open file for checksumming") - // we can continue if no oc checksum header is set - } - defer f.Close() - - r1 := io.TeeReader(f, sha1h) - r2 := io.TeeReader(r1, md5h) - - if _, err := io.Copy(adler32h, r2); err != nil { - sublog.Err(err).Msg("Decomposedfs: could not copy bytes for checksumming") - } - } - // compare if they match the sent checksum - // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads - if upload.info.MetaData["checksum"] != "" { - parts := strings.SplitN(upload.info.MetaData["checksum"], " ", 2) - if len(parts) != 2 { - return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") - } - switch parts[0] { - case "sha1": - err = upload.checkHash(parts[1], sha1h) - case "md5": - err = upload.checkHash(parts[1], md5h) - case "adler32": - err = upload.checkHash(parts[1], adler32h) - default: - err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) - } - if err != nil { - return err - } - } - newNode.BlobID = upload.info.ID // This can be changed to a content hash in the future when reference counting for the blobs was added - - // defer writing the checksums until the node is in place - - // upload steps - // check if match header to fail early - - if fi, err = os.Stat(targetPath); err == nil { - // When the if-match header was set we need to check if the - // etag still matches before finishing the upload. - if ifMatch, ok := upload.info.MetaData["if-match"]; ok { - var targetEtag string - targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) - if err != nil { - return errtypes.InternalError(err.Error()) - } - if ifMatch != targetEtag { - return errtypes.Aborted("etag mismatch") - } - } - } else { - // create dir to node - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - sublog.Err(err).Msg("could not create node dir") - return errtypes.InternalError("could not create node dir") - } - } - - // copy blob - - file, err := os.Open(upload.binPath) - if err != nil { - return err - } - defer file.Close() - err = upload.fs.tp.WriteBlob(newNode, file) - if err != nil { - return errors.Wrap(err, "failed to upload file to blobstore") - } - - // prepare discarding the blob if something changed while writing it - discardBlob := func() { - if err := upload.fs.tp.DeleteBlob(newNode); err != nil { - sublog.Err(err).Str("blobid", newNode.BlobID).Msg("Decomposedfs: failed to discard blob in blobstore") - } - } - - // lock metadata node - lock, err := filelocks.AcquireWriteLock(targetPath) - if err != nil { - discardBlob() - return errtypes.InternalError(err.Error()) - } - releaseLock := func() { - // ReleaseLock returns nil if already unlocked - if err := filelocks.ReleaseLock(lock); err != nil { - sublog.Err(err).Msg("Decomposedfs:could not unlock node") - } - } - defer releaseLock() - - // check if match header again as safeguard - var oldMtime time.Time - versionsPath := "" - if fi, err = os.Stat(targetPath); err == nil { - // When the if-match header was set we need to check if the - // etag still matches before finishing the upload. - if ifMatch, ok := upload.info.MetaData["if-match"]; ok { - var targetEtag string - targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) - if err != nil { - discardBlob() - return errtypes.InternalError(err.Error()) - } - if ifMatch != targetEtag { - discardBlob() - return errtypes.Aborted("etag mismatch") - } - } - - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath = upload.fs.lu.InternalPath(spaceID, newNode.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - - // remember mtime of existing file so we can apply it to the version - oldMtime = fi.ModTime() - } - - // read metadata - - // attributes that will change - attrs := map[string]string{ - xattrs.BlobIDAttr: newNode.BlobID, - xattrs.BlobsizeAttr: strconv.FormatInt(newNode.Blobsize, 10), - - // update checksums - xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), - xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), - xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), - } - - // create version node with current metadata - - var newMtime time.Time - // if file already exists - if versionsPath != "" { - // touch version node - file, err := os.Create(versionsPath) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") - return errtypes.InternalError("could not create version node") - } - defer file.Close() - - fi, err := file.Stat() - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") - return errtypes.InternalError("could not stat version node") - } - newMtime = fi.ModTime() - - // copy blob metadata to version node - err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { - return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || - attributeName == xattrs.BlobIDAttr || - attributeName == xattrs.BlobsizeAttr - }, lock) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") - return errtypes.InternalError("failed to copy blob xattrs to version node") - } - - // keep mtime from previous version - if err := os.Chtimes(versionsPath, oldMtime, oldMtime); err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to change mtime of version node") - return errtypes.InternalError("failed to change mtime of version node") - } - - // we MUST bypass any cache here as we have to calculate the size diff atomically - oldSize, err = node.ReadBlobSizeAttr(targetPath) - if err != nil { - discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to read old blobsize") - return errtypes.InternalError("failed to read old blobsize") - } - } else { - // touch metadata node - file, err := os.Create(targetPath) - if err != nil { - discardBlob() - sublog.Err(err).Msg("could not create node") - return errtypes.InternalError("could not create node") - } - file.Close() - - // basic node metadata - attrs[xattrs.ParentidAttr] = newNode.ParentID - attrs[xattrs.NameAttr] = newNode.Name - oldSize = 0 - } - - // update node metadata with new blobid etc - err = newNode.SetXattrsWithLock(attrs, lock) - if err != nil { - discardBlob() - return errors.Wrap(err, "Decomposedfs: could not write metadata") - } - - // update mtime - switch { - case upload.info.MetaData["mtime"] != "": - if err := newNode.SetMtimeString(upload.info.MetaData["mtime"]); err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not apply mtime from metadata") - return err - } - case newMtime != time.Time{}: - // we are creating a version - if err := newNode.SetMtime(newMtime); err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not change mtime of node") - return err - } - } - - // remember size diff - // old 10, new 5 (upload a smaller file) -> 5-10 = -5 - // old 5, new 10 (upload a bigger file) -> 10-5 = +5 - sizeDiff := newNode.Blobsize - oldSize - - // unlock metadata - err = filelocks.ReleaseLock(lock) - if err != nil { - return errtypes.InternalError(err.Error()) - } - - // link child name to parent if it is new - childNameLink := filepath.Join(newNode.ParentInternalPath(), newNode.Name) - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(newNode.ID, 4, 2)) - var link string - link, err = os.Readlink(childNameLink) - if err == nil && link != relativeNodePath { - sublog.Err(err). - Interface("node", newNode). - Str("childNameLink", childNameLink). - Str("link", link). - Msg("Decomposedfs: child name link has wrong target id, repairing") - - if err = os.Remove(childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") - } - } - if errors.Is(err, iofs.ErrNotExist) || link != relativeNodePath { - if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return errors.Wrap(err, "Decomposedfs: could not symlink child entry") - } - } - - // fill metadata with current mtime - if fi, err = os.Stat(targetPath); err == nil { - upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) - upload.info.MetaData["etag"], _ = node.CalculateEtag(newNode.ID, fi.ModTime()) - } - - newNode.Exists = true - - // propagate size diff and new etag - // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant - sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") - return upload.fs.tp.Propagate(upload.ctx, newNode, sizeDiff) -} - -func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { - if expected != hex.EncodeToString(h.Sum(nil)) { - upload.discardChunk() - return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.info.MetaData["checksum"], h.Sum(nil))) - } - return nil -} - -func (upload *fileUpload) discardChunk() { - if err := os.Remove(upload.binPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("binPath", upload.binPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk") - return - } - } - if err := os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("infoPath", upload.infoPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk info") - return - } - } -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - // AsTerminatableUpload returns a TerminatableUpload -func (fs *Decomposedfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) -} - -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) error { - if err := os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - return err - } - } - if err := os.Remove(upload.binPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - return err - } - } - return nil +// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// the storage needs to implement AsTerminatableUpload +func (fs *Decomposedfs) AsTerminatableUpload(up tusd.Upload) tusd.TerminatableUpload { + return up.(*upload.Upload) } -// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation -// - the storage needs to implement AsLengthDeclarableUpload -// - the upload needs to implement DeclareLength - // AsLengthDeclarableUpload returns a LengthDeclarableUpload -func (fs *Decomposedfs) AsLengthDeclarableUpload(upload tusd.Upload) tusd.LengthDeclarableUpload { - return upload.(*fileUpload) -} - -// DeclareLength updates the upload length information -func (upload *fileUpload) DeclareLength(ctx context.Context, length int64) error { - upload.info.Size = length - upload.info.SizeIsDeferred = false - return upload.writeInfo() +// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation +// the storage needs to implement AsLengthDeclarableUpload +func (fs *Decomposedfs) AsLengthDeclarableUpload(up tusd.Upload) tusd.LengthDeclarableUpload { + return up.(*upload.Upload) } -// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation -// - the storage needs to implement AsConcatableUpload -// - the upload needs to implement ConcatUploads - // AsConcatableUpload returns a ConcatableUpload -func (fs *Decomposedfs) AsConcatableUpload(upload tusd.Upload) tusd.ConcatableUpload { - return upload.(*fileUpload) +// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation +// the storage needs to implement AsConcatableUpload +func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload { + return up.(*upload.Upload) } -// ConcatUploads concatenates multiple uploads -func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) +func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { + infos := []tusd.FileInfo{} + infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { - return err + return nil, err } - defer file.Close() - - for _, partialUpload := range uploads { - fileUpload := partialUpload.(*fileUpload) - src, err := os.Open(fileUpload.binPath) + for _, info := range infoFiles { + match := _idRegexp.FindStringSubmatch(info) + if match == nil || len(match) < 2 { + continue + } + up, err := fs.GetUpload(ctx, match[1]) if err != nil { - return err + return nil, err } - defer src.Close() - - if _, err := io.Copy(file, src); err != nil { - return err + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err } - } - return + infos = append(infos, info) + } + return infos, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go new file mode 100644 index 0000000000..90da63a77a --- /dev/null +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -0,0 +1,432 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package upload + +import ( + "context" + "encoding/json" + "fmt" + iofs "io/fs" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/logger" + "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/gofrs/flock" + "github.com/google/uuid" + "github.com/pkg/errors" + tusd "github.com/tus/tusd/pkg/handler" +) + +var defaultFilePerm = os.FileMode(0664) + +// PermissionsChecker defines an interface for checking permissions on a Node +type PermissionsChecker interface { + AssemblePermissions(ctx context.Context, n *node.Node) (ap provider.ResourcePermissions, err error) +} + +// New returns a new processing instance +func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (upload *Upload, err error) { + + log := appctx.GetLogger(ctx) + log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") + + if info.MetaData["filename"] == "" { + return nil, errors.New("Decomposedfs: missing filename in metadata") + } + if info.MetaData["dir"] == "" { + return nil, errors.New("Decomposedfs: missing dir in metadata") + } + + n, err := lu.NodeFromSpaceID(ctx, &provider.ResourceId{ + SpaceId: info.Storage["SpaceRoot"], + OpaqueId: info.Storage["SpaceRoot"], + }) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error getting space root node") + } + + n, err = lookupNode(ctx, n, filepath.Join(info.MetaData["dir"], info.MetaData["filename"]), lu) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error walking path") + } + + log.Debug().Interface("info", info).Interface("node", n).Msg("Decomposedfs: resolved filename") + + // the parent owner will become the new owner + parent, perr := n.Parent() + if perr != nil { + return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) + } + + // check permissions + var ( + checkNode *node.Node + path string + ) + if n.Exists { + // check permissions of file to be overwritten + checkNode = n + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }}) + } else { + // check permissions of parent + checkNode = parent + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }, Path: n.Name}) + } + rp, err := p.AssemblePermissions(ctx, checkNode) + switch { + case err != nil: + return nil, errtypes.InternalError(err.Error()) + case !rp.InitiateFileUpload: + return nil, errtypes.PermissionDenied(path) + } + + // are we trying to overwriting a folder with a file? + if n.Exists && n.IsDir() { + return nil, errtypes.PreconditionFailed("resource is not a file") + } + + // check lock + if info.MetaData["lockid"] != "" { + ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData["lockid"]) + } + if err := n.CheckLock(ctx); err != nil { + return nil, err + } + + info.ID = uuid.New().String() + + binPath := filepath.Join(fsRoot, "uploads", info.ID) + usr := ctxpkg.ContextMustGetUser(ctx) + + var ( + spaceRoot string + ok bool + ) + if info.Storage != nil { + if spaceRoot, ok = info.Storage["SpaceRoot"]; !ok { + spaceRoot = n.SpaceRoot.ID + } + } else { + spaceRoot = n.SpaceRoot.ID + } + + info.Storage = map[string]string{ + "Type": "OCISStore", + "BinPath": binPath, + + "NodeId": n.ID, + "NodeParentId": n.ParentID, + "NodeName": n.Name, + "SpaceRoot": spaceRoot, + "SpaceOwnerOrManager": info.Storage["SpaceOwnerOrManager"], + + "Idp": usr.Id.Idp, + "UserId": usr.Id.OpaqueId, + "UserType": utils.UserTypeToString(usr.Id.Type), + "UserName": usr.Username, + + "LogLevel": log.GetLevel().String(), + } + // Create binary file in the upload folder with no content + log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info") + file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) + if err != nil { + return nil, err + } + defer file.Close() + + u := buildUpload(ctx, info, binPath, filepath.Join(fsRoot, "uploads", info.ID+".info"), lu, tp, pub, async, tknopts) + + // writeInfo creates the file by itself if necessary + err = u.writeInfo() + if err != nil { + return nil, err + } + + return u, nil +} + +// Get returns the Upload for the given upload id +func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (*Upload, error) { + infoPath := filepath.Join(fsRoot, "uploads", id+".info") + + info := tusd.FileInfo{} + data, err := os.ReadFile(infoPath) + if err != nil { + if errors.Is(err, iofs.ErrNotExist) { + // Interpret os.ErrNotExist as 404 Not Found + err = tusd.ErrNotFound + } + return nil, err + } + if err := json.Unmarshal(data, &info); err != nil { + return nil, err + } + + stat, err := os.Stat(info.Storage["BinPath"]) + if err != nil { + return nil, err + } + + info.Offset = stat.Size() + + u := &userpb.User{ + Id: &userpb.UserId{ + Idp: info.Storage["Idp"], + OpaqueId: info.Storage["UserId"], + Type: utils.UserTypeMap(info.Storage["UserType"]), + }, + Username: info.Storage["UserName"], + } + + ctx = ctxpkg.ContextSetUser(ctx, u) + // TODO configure the logger the same way ... store and add traceid in file info + + var opts []logger.Option + opts = append(opts, logger.WithLevel(info.Storage["LogLevel"])) + opts = append(opts, logger.WithWriter(os.Stderr, logger.ConsoleMode)) + l := logger.New(opts...) + + sub := l.With().Int("pid", os.Getpid()).Logger() + + ctx = appctx.WithLogger(ctx, &sub) + + up := buildUpload(ctx, info, info.Storage["BinPath"], infoPath, lu, tp, pub, async, tknopts) + up.versionsPath = info.MetaData["versionsPath"] + up.sizeDiff, _ = strconv.ParseInt(info.MetaData["sizeDiff"], 10, 64) + return up, nil +} + +// CreateNodeForUpload will create the target node for the Upload +func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) { + fi, err := os.Stat(upload.binPath) + if err != nil { + return nil, err + } + + fsize := fi.Size() + spaceID := upload.Info.Storage["SpaceRoot"] + n := node.New( + spaceID, + upload.Info.Storage["NodeId"], + upload.Info.Storage["NodeParentId"], + upload.Info.Storage["NodeName"], + fsize, + upload.Info.ID, + nil, + upload.lu, + ) + n.SpaceRoot, err = node.ReadNode(upload.Ctx, upload.lu, spaceID, spaceID, false) + if err != nil { + return nil, err + } + + // check lock + if err := n.CheckLock(upload.Ctx); err != nil { + return nil, err + } + + var lock *flock.Flock + switch n.ID { + case "": + lock, err = initNewNode(upload, n, uint64(fsize)) + default: + lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + } + + defer filelocks.ReleaseLock(lock) //nolint:errcheck + if err != nil { + return nil, err + } + + // overwrite technical information + initAttrs[xattrs.ParentidAttr] = n.ParentID + initAttrs[xattrs.NameAttr] = n.Name + initAttrs[xattrs.BlobIDAttr] = n.BlobID + initAttrs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) + initAttrs[xattrs.StatusPrefix] = node.ProcessingStatus + + // update node metadata with new blobid etc + err = n.SetXattrsWithLock(initAttrs, lock) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") + } + + // update nodeid for later + upload.Info.Storage["NodeId"] = n.ID + if err := upload.writeInfo(); err != nil { + return nil, err + } + + return n, nil +} + +func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) { + n.ID = uuid.New().String() + + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + return nil, err + } + + if _, err := os.Create(n.InternalPath()); err != nil { + return nil, err + } + + lock, err := filelocks.AcquireWriteLock(n.InternalPath()) + if err != nil { + // we cannot acquire a lock - we error for safety + return lock, err + } + + if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { + return lock, err + } + + // link child name to parent if it is new + childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) + link, err := os.Readlink(childNameLink) + if err == nil && link != "../"+n.ID { + if err := os.Remove(childNameLink); err != nil { + return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + } + } + if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") + } + } + + // on a new file the sizeDiff is the fileSize + upload.sizeDiff = int64(fsize) + upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) + return lock, nil +} + +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) { + old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) + if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { + return nil, err + } + + vfi, err := os.Stat(old.InternalPath()) + if err != nil { + return nil, err + } + + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := upload.Info.MetaData["if-match"]; ok { + targetEtag, err := node.CalculateEtag(n.ID, vfi.ModTime()) + switch { + case err != nil: + return nil, errtypes.InternalError(err.Error()) + case ifMatch != targetEtag: + return nil, errtypes.Aborted("etag mismatch") + } + } + + upload.versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+vfi.ModTime().UTC().Format(time.RFC3339Nano)) + upload.sizeDiff = int64(fsize) - old.Blobsize + upload.Info.MetaData["versionsPath"] = upload.versionsPath + upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) + + targetPath := n.InternalPath() + + lock, err := filelocks.AcquireWriteLock(targetPath) + if err != nil { + // we cannot acquire a lock - we error for safety + return nil, err + } + + // create version node + if _, err := os.Create(upload.versionsPath); err != nil { + return lock, err + } + + // copy blob metadata to version node + if err := xattrs.CopyMetadataWithSourceLock(targetPath, upload.versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }, lock); err != nil { + return lock, err + } + + // keep mtime from previous version + if err := os.Chtimes(upload.versionsPath, vfi.ModTime(), vfi.ModTime()); err != nil { + return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) + } + + // update mtime of current version + mtime := time.Now() + if err := os.Chtimes(n.InternalPath(), mtime, mtime); err != nil { + return nil, err + } + + return lock, nil +} + +// lookupNode looks up nodes by path. +// This method can also handle lookups for paths which contain chunking information. +func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *lookup.Lookup) (*node.Node, error) { + p := path + isChunked := chunking.IsChunked(path) + if isChunked { + chunkInfo, err := chunking.GetChunkBLOBInfo(path) + if err != nil { + return nil, err + } + p = chunkInfo.Path + } + + n, err := lu.WalkPath(ctx, spaceRoot, p, true, func(ctx context.Context, n *node.Node) error { return nil }) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: error walking path") + } + + if isChunked { + n.Name = filepath.Base(path) + } + return n, nil +} diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go new file mode 100644 index 0000000000..490a1ab9da --- /dev/null +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -0,0 +1,464 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package upload + +import ( + "context" + "crypto/md5" + "crypto/sha1" + "encoding/hex" + "encoding/json" + "fmt" + "hash" + "hash/adler32" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/golang-jwt/jwt" + "github.com/pkg/errors" + "github.com/rs/zerolog" + tusd "github.com/tus/tusd/pkg/handler" +) + +// Tree is used to manage a tree hierarchy +type Tree interface { + Setup() error + + GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error) + ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error) + // CreateHome(owner *userpb.UserId) (n *node.Node, err error) + CreateDir(ctx context.Context, node *node.Node) (err error) + // CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error + Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) + Delete(ctx context.Context, node *node.Node) (err error) + RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) + PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) + + WriteBlob(node *node.Node, reader io.Reader) error + ReadBlob(node *node.Node) (io.ReadCloser, error) + DeleteBlob(node *node.Node) error + + Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) +} + +// Upload processes the upload +// it implements tus tusd.Upload interface https://tus.io/protocols/resumable-upload.html#core-protocol +// it also implements its termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// it also implements its creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation +// it also implements its concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation +type Upload struct { + // we use a struct field on the upload as tus pkg will give us an empty context.Background + Ctx context.Context + // info stores the current information about the upload + Info tusd.FileInfo + // node for easy access + Node *node.Node + // infoPath is the path to the .info file + infoPath string + // binPath is the path to the binary file (which has no extension) + binPath string + // lu and tp needed for file operations + lu *lookup.Lookup + tp Tree + // versionsPath will be empty if there was no file before + versionsPath string + // sizeDiff size difference between new and old file version + sizeDiff int64 + // and a logger as well + log zerolog.Logger + // publisher used to publish events + pub events.Publisher + // async determines if uploads shoud be done asynchronously + async bool + // tknopts hold token signing information + tknopts options.TokenOptions +} + +func buildUpload(ctx context.Context, info tusd.FileInfo, binPath string, infoPath string, lu *lookup.Lookup, tp Tree, pub events.Publisher, async bool, tknopts options.TokenOptions) *Upload { + return &Upload{ + Info: info, + binPath: binPath, + infoPath: infoPath, + lu: lu, + tp: tp, + Ctx: ctx, + pub: pub, + async: async, + tknopts: tknopts, + log: appctx.GetLogger(ctx). + With(). + Interface("info", info). + Str("binPath", binPath). + Logger(), + } +} + +// Cleanup cleans the upload +func Cleanup(upload *Upload, failure bool, keepUpload bool) { + upload.cleanup(failure, !keepUpload, !keepUpload) + + // unset processing status + if upload.Node != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) + if err := upload.Node.UnmarkProcessing(); err != nil { + upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("unmarking processing failed") + } + } +} + +// WriteChunk writes the stream from the reader to the given offset of the upload +func (upload *Upload) WriteChunk(_ context.Context, offset int64, src io.Reader) (int64, error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return 0, err + } + defer file.Close() + + // calculate cheksum here? needed for the TUS checksum extension. https://tus.io/protocols/resumable-upload.html#checksum + // TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ... + // It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used + // but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ... + n, err := io.Copy(file, src) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for the ocis driver it's not important whether the stream has ended + // on purpose or accidentally. + if err != nil && err != io.ErrUnexpectedEOF { + return n, err + } + + upload.Info.Offset += n + return n, upload.writeInfo() +} + +// GetInfo returns the FileInfo +func (upload *Upload) GetInfo(_ context.Context) (tusd.FileInfo, error) { + return upload.Info, nil +} + +// GetReader returns an io.Reader for the upload +func (upload *Upload) GetReader(_ context.Context) (io.Reader, error) { + return os.Open(upload.binPath) +} + +// FinishUpload finishes an upload and moves the file to the internal destination +func (upload *Upload) FinishUpload(_ context.Context) error { + // set lockID to context + if upload.Info.MetaData["lockid"] != "" { + upload.Ctx = ctxpkg.ContextSetLockID(upload.Ctx, upload.Info.MetaData["lockid"]) + } + + log := appctx.GetLogger(upload.Ctx) + + // calculate the checksum of the written bytes + // they will all be written to the metadata later, so we cannot omit any of them + // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present + // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... + sha1h := sha1.New() + md5h := md5.New() + adler32h := adler32.New() + { + f, err := os.Open(upload.binPath) + if err != nil { + // we can continue if no oc checksum header is set + log.Info().Err(err).Str("binPath", upload.binPath).Msg("error opening binPath") + } + defer f.Close() + + r1 := io.TeeReader(f, sha1h) + r2 := io.TeeReader(r1, md5h) + + _, err = io.Copy(adler32h, r2) + if err != nil { + log.Info().Err(err).Msg("error copying checksums") + } + } + + // compare if they match the sent checksum + // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads + if upload.Info.MetaData["checksum"] != "" { + var err error + parts := strings.SplitN(upload.Info.MetaData["checksum"], " ", 2) + if len(parts) != 2 { + return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") + } + switch parts[0] { + case "sha1": + err = upload.checkHash(parts[1], sha1h) + case "md5": + err = upload.checkHash(parts[1], md5h) + case "adler32": + err = upload.checkHash(parts[1], adler32h) + default: + err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) + } + if err != nil { + Cleanup(upload, true, false) + return err + } + } + + // update checksums + attrs := map[string]string{ + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), + } + + n, err := CreateNodeForUpload(upload, attrs) + if err != nil { + Cleanup(upload, true, false) + return err + } + + upload.Node = n + + if upload.pub != nil { + u, _ := ctxpkg.ContextGetUser(upload.Ctx) + s, err := upload.URL(upload.Ctx) + if err != nil { + return err + } + + if err := events.Publish(upload.pub, events.BytesReceived{ + UploadID: upload.Info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(upload.Ctx), + ExecutingUser: u, + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: upload.Info.Storage["NodeName"], + Filesize: uint64(upload.Info.Size), + }); err != nil { + return err + } + } + + if upload.async { + // handle postprocessing asynchronously but inform there is something in progress + return upload.tp.Propagate(upload.Ctx, n, upload.sizeDiff) + } + + err = upload.Finalize() + Cleanup(upload, err != nil, false) + if err != nil { + return err + } + + return upload.tp.Propagate(upload.Ctx, n, upload.sizeDiff) +} + +// Terminate terminates the upload +func (upload *Upload) Terminate(_ context.Context) error { + upload.cleanup(true, true, true) + return nil +} + +// DeclareLength updates the upload length information +func (upload *Upload) DeclareLength(_ context.Context, length int64) error { + upload.Info.Size = length + upload.Info.SizeIsDeferred = false + return upload.writeInfo() +} + +// ConcatUploads concatenates multiple uploads +func (upload *Upload) ConcatUploads(_ context.Context, uploads []tusd.Upload) (err error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return err + } + defer file.Close() + + for _, partialUpload := range uploads { + fileUpload := partialUpload.(*Upload) + + src, err := os.Open(fileUpload.binPath) + if err != nil { + return err + } + defer src.Close() + + if _, err := io.Copy(file, src); err != nil { + return err + } + } + + return +} + +// writeInfo updates the entire information. Everything will be overwritten. +func (upload *Upload) writeInfo() error { + data, err := json.Marshal(upload.Info) + if err != nil { + return err + } + return os.WriteFile(upload.infoPath, data, defaultFilePerm) +} + +// Finalize finalizes the upload (eg moves the file to the internal destination) +func (upload *Upload) Finalize() (err error) { + n := upload.Node + if n == nil { + var err error + n, err = node.ReadNode(upload.Ctx, upload.lu, upload.Info.Storage["SpaceRoot"], upload.Info.Storage["NodeId"], false) + if err != nil { + return err + } + upload.Node = n + } + + // upload the data to the blobstore + file, err := os.Open(upload.binPath) + if err != nil { + return err + } + defer file.Close() + + if err := upload.tp.WriteBlob(n, file); err != nil { + return errors.Wrap(err, "failed to upload file to blostore") + } + + if upload.async { + return nil + } + + sublog := appctx.GetLogger(upload.Ctx). + With(). + Interface("info", upload.Info). + Str("binPath", upload.binPath). + Str("targetPath", n.InternalPath()). + Str("spaceID", upload.Info.Storage["SpaceRoot"]). + Logger() + + // tests sometimes set the mtime + if upload.Info.MetaData["mtime"] != "" { + if err := n.SetMtimeString(upload.Info.MetaData["mtime"]); err != nil { + sublog.Err(err).Interface("info", upload.Info).Msg("Decomposedfs: could not set mtime metadata") + return err + } + } + + // some clients need the etag in the upload metadata + fi, err := os.Stat(n.InternalPath()) + if err != nil { + sublog.Err(err).Interface("info", upload.Info).Str("path", n.InternalPath()).Msg("Decomposedfs: could not stat file") + return err + } + upload.Info.MetaData["etag"], _ = node.CalculateEtag(n.ID, fi.ModTime()) + return nil +} + +func (upload *Upload) checkHash(expected string, h hash.Hash) error { + if expected != hex.EncodeToString(h.Sum(nil)) { + return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.Info.MetaData["checksum"], h.Sum(nil))) + } + return nil +} + +// cleanup cleans up after the upload is finished +func (upload *Upload) cleanup(cleanNode, cleanBin, cleanInfo bool) { + if cleanNode && upload.Node != nil { + switch p := upload.versionsPath; p { + case "": + // remove node + if err := utils.RemoveItem(upload.Node.InternalPath()); err != nil { + upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("removing node failed") + } + + // no old version was present - remove child entry + src := filepath.Join(upload.Node.ParentInternalPath(), upload.Node.Name) + if err := os.Remove(src); err != nil { + upload.log.Info().Str("path", upload.Node.ParentInternalPath()).Err(err).Msg("removing node from parent failed") + } + default: + // restore old version + if err := os.Rename(p, upload.Node.InternalPath()); err != nil { + upload.log.Info().Str("versionpath", p).Str("nodepath", upload.Node.InternalPath()).Err(err).Msg("renaming version node failed") + } + + } + } + + if cleanBin { + if err := os.Remove(upload.binPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + upload.log.Error().Str("path", upload.binPath).Err(err).Msg("removing upload failed") + } + } + + if cleanInfo { + if err := os.Remove(upload.infoPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + upload.log.Error().Str("path", upload.infoPath).Err(err).Msg("removing upload info failed") + } + } +} + +// URL returns a url to download an upload +func (upload *Upload) URL(_ context.Context) (string, error) { + type transferClaims struct { + jwt.StandardClaims + Target string `json:"target"` + } + + u := joinurl(upload.tknopts.DownloadEndpoint, "tus/", upload.Info.ID) + ttl := time.Duration(upload.tknopts.TransferExpires) * time.Second + claims := transferClaims{ + StandardClaims: jwt.StandardClaims{ + ExpiresAt: time.Now().Add(ttl).Unix(), + Audience: "reva", + IssuedAt: time.Now().Unix(), + }, + Target: u, + } + + t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), claims) + + tkn, err := t.SignedString([]byte(upload.tknopts.TransferSharedSecret)) + if err != nil { + return "", errors.Wrapf(err, "error signing token with claims %+v", claims) + } + + return joinurl(upload.tknopts.DataGatewayEndpoint, tkn), nil +} + +// replace with url.JoinPath after switching to go1.19 +func joinurl(paths ...string) string { + var s strings.Builder + l := len(paths) + for i, p := range paths { + s.WriteString(p) + if !strings.HasSuffix(p, "/") && i != l-1 { + s.WriteString("/") + } + } + + return s.String() +} diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 7f301fbe83..b00d9a4ab7 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -210,7 +210,6 @@ var _ = Describe("File uploads", func() { Expect(uploadIds["tus"]).ToNot(BeEmpty()) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) - Expect(err).ToNot(HaveOccurred()) Expect(len(resources)).To(Equal(0)) }) @@ -226,7 +225,6 @@ var _ = Describe("File uploads", func() { Expect(uploadIds["tus"]).ToNot(BeEmpty()) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) - Expect(err).ToNot(HaveOccurred()) Expect(len(resources)).To(Equal(0)) }) diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 8bd24bbb98..496071b37a 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -50,6 +50,13 @@ const ( BlobIDAttr string = OcisPrefix + "blobid" BlobsizeAttr string = OcisPrefix + "blobsize" + // statusPrefix is the prefix for the node status + StatusPrefix string = OcisPrefix + "nodestatus" + + // scanPrefix is the prefix for the virus scan status and date + ScanStatusPrefix string = OcisPrefix + "scanstatus" + ScanDatePrefix string = OcisPrefix + "scandate" + // grantPrefix is the prefix for sharing related extended attributes GrantPrefix string = OcisPrefix + "grant." GrantUserAcePrefix string = OcisPrefix + "grant." + UserAcePrefix @@ -122,8 +129,8 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // CopyMetadata copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. For the target, an exclusive -// write lock is acquired. +// For the source file, a shared lock is acquired. +// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { var readLock *flock.Flock @@ -147,8 +154,8 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e // CopyMetadataWithSourceLock copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. For the target, an exclusive -// write lock is acquired. +// For the source file, a shared lock is acquired. +// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { switch { case readLock == nil: @@ -159,23 +166,6 @@ func CopyMetadataWithSourceLock(src, target string, filter func(attributeName st return errors.New("not locked") } - var writeLock *flock.Flock - - // Acquire the write log on the target node - writeLock, err = filelocks.AcquireWriteLock(target) - - if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock target to write") - } - defer func() { - rerr := filelocks.ReleaseLock(writeLock) - - // if err is non nil we do not overwrite that - if err == nil { - err = rerr - } - }() - // both locks are established. Copy. var attrNameList []string if attrNameList, err = xattr.List(src); err != nil { diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index 79e91332ea..c98a499851 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -164,6 +164,10 @@ func AcquireWriteLock(file string) (*flock.Flock, error) { // ReleaseLock releases a lock from a file that was previously created // by AcquireReadLock or AcquireWriteLock. func ReleaseLock(lock *flock.Flock) error { + if lock == nil { + return errors.New("cannot unlock nil lock") + } + // there is a probability that if the file can not be unlocked, // we also can not remove the file. We will only try to remove if it // was successfully unlocked.