Skip to content

Commit

Permalink
feat: Support args in cp (#109)
Browse files Browse the repository at this point in the history
* feat: Support args in cp

* Copy one by one
  • Loading branch information
JinnyYi authored Nov 16, 2021
1 parent f87be95 commit a1ebea8
Showing 1 changed file with 80 additions and 54 deletions.
134 changes: 80 additions & 54 deletions cmd/byctl/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"path/filepath"
"strings"

"github.com/docker/go-units"
Expand Down Expand Up @@ -44,7 +45,7 @@ var cpCmd = &cli.Command{
Flags: mergeFlags(globalFlags, ioFlags, cpFlags),
Before: func(c *cli.Context) error {
if args := c.Args().Len(); args < 2 {
return fmt.Errorf("cp command wants two args, but got %d", args)
return fmt.Errorf("cp command wants at least two args, but got %d", args)
}
return nil
},
Expand All @@ -57,24 +58,11 @@ var cpCmd = &cli.Command{
return err
}

srcConn, srcKey, err := cfg.ParseProfileInput(c.Args().Get(0))
if err != nil {
logger.Error("parse profile input from src", zap.Error(err))
return err
}
dstConn, dstKey, err := cfg.ParseProfileInput(c.Args().Get(1))
if err != nil {
logger.Error("parse profile input from dst", zap.Error(err))
return err
}
argsNum := c.Args().Len()

if c.Bool(cpFlagRecursive) && !strings.HasSuffix(srcKey, "/") {
srcKey += "/"
}

src, err := services.NewStoragerFromString(srcConn)
dstConn, dstKey, err := cfg.ParseProfileInput(c.Args().Get(argsNum - 1))
if err != nil {
logger.Error("init src storager", zap.Error(err), zap.String("conn string", srcConn))
logger.Error("parse profile input from dst", zap.Error(err))
return err
}

Expand All @@ -84,23 +72,16 @@ var cpCmd = &cli.Command{
return err
}

so := operations.NewSingleOperator(src)
dstSo := operations.NewSingleOperator(dst)

srcObject, err := so.Stat(srcKey)
if err != nil {
logger.Error("stat", zap.String("path", srcKey), zap.Error(err))
return err
}

size, ok := srcObject.GetContentLength()
if !ok {
logger.Error("can't get object content length", zap.String("path", srcKey))
return fmt.Errorf("get object content length failed")
}

do := operations.NewDualOperator(src, dst)
if c.IsSet(flagWorkersName) {
do.WithWorkers(c.Int(flagWorkersName))
if argsNum > 2 {
dstObject, err := dstSo.Stat(dstKey)
if err != nil {
return fmt.Errorf("copy: target '%s' is not a directory", dstKey)
}
if !dstObject.Mode.IsDir() {
return fmt.Errorf("copy: target '%s' is not a directory", dstKey)
}
}

// Handle read pairs.
Expand All @@ -116,7 +97,6 @@ var cpCmd = &cli.Command{

readPairs = append(readPairs, limitPair)
}
do.WithReadPairs(readPairs...)

// Handle write pairs.
var writePairs []types.Pair
Expand All @@ -131,7 +111,6 @@ var cpCmd = &cli.Command{

writePairs = append(writePairs, limitPair)
}
do.WithWritePairs(writePairs...)

// parse flag multipart-threshold, 1GB is the default value
multipartThreshold, err := units.FromHumanSize(c.String(cpFlagMultipartThresholdName))
Expand All @@ -142,27 +121,74 @@ var cpCmd = &cli.Command{
return err
}

var ch chan *operations.EmptyResult
if c.Bool(cpFlagRecursive) {
ch, err = do.CopyRecursively(srcKey, dstKey, multipartThreshold)
} else if size < multipartThreshold {
ch, err = do.CopyFileViaWrite(srcKey, dstKey, size)
} else {
// TODO: we will support other copy method later.
ch, err = do.CopyFileViaMultipart(srcKey, dstKey, size)
}
if err != nil {
logger.Error("start copy",
zap.String("src", srcKey),
zap.String("dst", dstKey),
zap.Error(err))
return err
}
for i := 0; i < argsNum-1; i++ {
srcConn, srcKey, err := cfg.ParseProfileInput(c.Args().Get(i))
if err != nil {
logger.Error("parse profile input from src", zap.Error(err))
continue
}

if c.Bool(cpFlagRecursive) && !strings.HasSuffix(srcKey, "/") {
srcKey += "/"
}

src, err := services.NewStoragerFromString(srcConn)
if err != nil {
logger.Error("init src storager", zap.Error(err), zap.String("conn string", srcConn))
continue
}

so := operations.NewSingleOperator(src)

srcObject, err := so.Stat(srcKey)
if err != nil {
logger.Error("stat", zap.String("path", srcKey), zap.Error(err))
continue
}

size, ok := srcObject.GetContentLength()
if !ok {
logger.Error("can't get object content length", zap.String("path", srcKey))
continue
}

for v := range ch {
logger.Error("read next result", zap.Error(v.Error))
return v.Error
do := operations.NewDualOperator(src, dst)
if c.IsSet(flagWorkersName) {
do.WithWorkers(c.Int(flagWorkersName))
}

// set read pairs
do.WithReadPairs(readPairs...)
// set write pairs
do.WithWritePairs(writePairs...)

realDstKey := dstKey
if argsNum > 2 {
realDstKey = filepath.Join(dstKey, srcKey)
}

var ch chan *operations.EmptyResult
if c.Bool(cpFlagRecursive) {
ch, err = do.CopyRecursively(srcKey, realDstKey, multipartThreshold)
} else if size < multipartThreshold {
ch, err = do.CopyFileViaWrite(srcKey, realDstKey, size)
} else {
// TODO: we will support other copy method later.
ch, err = do.CopyFileViaMultipart(srcKey, realDstKey, size)
}
if err != nil {
logger.Error("start copy",
zap.String("src", srcKey),
zap.String("dst", realDstKey),
zap.Error(err))
continue
}

for v := range ch {
logger.Error("read next result", zap.Error(v.Error))
}
}

return
},
}

0 comments on commit a1ebea8

Please sign in to comment.