Skip to content

Commit

Permalink
feat: Added MD5 check after migration
Browse files Browse the repository at this point in the history
  • Loading branch information
abyss-w committed Aug 2, 2024
1 parent 64cc75f commit 4dd91f0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
74 changes: 59 additions & 15 deletions migrate/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/sirupsen/logrus"
"io"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/yunify/qscamel/constants"
"github.com/yunify/qscamel/endpoint"
"github.com/yunify/qscamel/model"
Expand Down Expand Up @@ -70,15 +70,15 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) {

logrus.Infof("Start checking object %s.", o.Key)

so, err := statObject(ctx, src, o)
so, err := statObject(ctx, src, o, false)
if err != nil {
return
}
if so == nil {
return true, nil
}

do, err := statObject(ctx, dst, o)
do, err := statObject(ctx, dst, o, false)
if err != nil {
return
}
Expand All @@ -92,19 +92,17 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) {
return
}

// Check last modified
if t.IgnoreExisting == constants.TaskIgnoreExistingLastModified {
if so.LastModified > do.LastModified {
logrus.Infof("Object %s was modified, execute an operation on it.", o.Key)
if t.IgnoreBeforeTimestamp != 0 {
if so.LastModified > t.IgnoreBeforeTimestamp {
logrus.Infof("Object %s was modified after %s, execute an operation on it.", o.Key, time.Unix(t.IgnoreBeforeTimestamp, 0))
return
}
logrus.Infof("Object %s check passed, ignore.", o.Key)
return true, nil
}

if t.IgnoreBeforeTimestamp != 0 {
if so.LastModified > t.IgnoreBeforeTimestamp {
logrus.Infof("Object %s was modified after %s, execute an operation on it.", o.Key, time.Unix(t.IgnoreBeforeTimestamp, 0))
// Check last modified
if t.IgnoreExisting == constants.TaskIgnoreExistingLastModified {
if so.LastModified > do.LastModified {
logrus.Infof("Object %s was modified, execute an operation on it.", o.Key)
return
}
logrus.Infof("Object %s check passed, ignore.", o.Key)
Expand All @@ -121,6 +119,34 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) {
return true, nil
}

// checkObjectAfterMigrate will check whether the MD5 between the migrated src and dst is consistent.
func checkObjectAfterMigrate(ctx context.Context, mo model.Object) (err error) {
if t.Src.Type == "fs" || t.Dst.Type == "fs" {
return nil
}

o := mo.(*model.SingleObject)

rso, err := statObject(ctx, src, o, true)
if err != nil {
logrus.Errorf("Src stat %s failed for %v.", o.Key, err)
return err
}

rdo, err := statObject(ctx, dst, o, true)
if err != nil {
logrus.Errorf("Dst stat %s failed for %v.", o.Key, err)
return err
}

if rdo.MD5 != rso.MD5 {
logrus.Errorf("md5 mismatch between src and dst %s.", o.Key)
return fmt.Errorf("md5 not match")
}

return
}

// copyObject will do a real copy.
func copyObject(ctx context.Context, o model.Object) (err error) {
so := o.(*model.SingleObject)
Expand All @@ -140,6 +166,14 @@ func copyObject(ctx context.Context, o model.Object) (err error) {
return err
}

if t.CheckMD5 {
err = checkObjectAfterMigrate(ctx, o)
if err != nil {
_ = dst.Delete(ctx, so.Key)
return err
}
}

logrus.Infof("Single object %s copied.", so.Key)
return nil
}
Expand Down Expand Up @@ -245,6 +279,16 @@ func copyObject(ctx context.Context, o model.Object) (err error) {
return err
}

if t.CheckMD5 {
if t.CheckMD5 {
err = checkObjectAfterMigrate(ctx, o)
if err != nil {
_ = dst.Delete(ctx, so.Key)
return err
}
}
}

logrus.Infof("Object %s copied.", so.Key)

return
Expand Down Expand Up @@ -299,7 +343,7 @@ func fetchObject(ctx context.Context, o model.Object) (err error) {

// statObject will get an object metadata and try to get it's md5 if available.
func statObject(
ctx context.Context, e endpoint.Base, o *model.SingleObject,
ctx context.Context, e endpoint.Base, o *model.SingleObject, isMD5 bool,
) (ro *model.SingleObject, err error) {
ro, err = e.Stat(ctx, o.Key, o.IsDir)
if err != nil {
Expand All @@ -311,7 +355,7 @@ func statObject(
return
}

if t.IgnoreExisting != constants.TaskIgnoreExistingMD5Sum {
if t.IgnoreExisting != constants.TaskIgnoreExistingMD5Sum && !isMD5 {
return
}

Expand Down
1 change: 1 addition & 0 deletions model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Task struct {
Src *Endpoint `yaml:"source" msgpack:"src"`
Dst *Endpoint `yaml:"destination" msgpack:"dst"`

CheckMD5 bool `yaml:"check_md5" msgpack:"cm"`
IgnoreExisting string `yaml:"ignore_existing" msgpack:"ie"`
MultipartBoundarySize int64 `yaml:"multipart_boundary_size" msgpack:"mbs"`
// Format: 2006-01-02 15:04:05
Expand Down

0 comments on commit 4dd91f0

Please sign in to comment.