From 0f55aab14a9371a7213313e3507ba06e905051f4 Mon Sep 17 00:00:00 2001 From: leaf corcoran Date: Wed, 22 Nov 2023 02:41:59 -0800 Subject: [PATCH] measure transfer speed when copying file --- zipserver/copy_handler.go | 29 +++++++++++++++++++++++++---- zipserver/readers.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/zipserver/copy_handler.go b/zipserver/copy_handler.go index 7fcda0f..0556d91 100644 --- a/zipserver/copy_handler.go +++ b/zipserver/copy_handler.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io/ioutil" "log" "net/http" "net/url" @@ -12,6 +13,19 @@ import ( var copyLockTable = NewLockTable() +func formatBytes(b float64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%.2f B", b) + } + div, exp := float64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.2f %cB", b/div, "kMGTPE"[exp]) +} + // notify the callback URL of task completion func notifyCallback(callbackURL string, resValues url.Values) error { notifyCtx, notifyCancel := context.WithTimeout(context.Background(), time.Duration(config.AsyncNotificationTimeout)) @@ -89,15 +103,17 @@ func copyHandler(w http.ResponseWriter, r *http.Request) error { defer reader.Close() if err != nil { - log.Print("Failed to get file", err) + log.Print("Failed to get file: ", err) notifyError(callbackURL, err) return } + mReader := newMeasuredReader(reader) + // transfer the reader to s3 // TODO: get the actual mime type from the GetFile request log.Print("Starting transfer: ", key) - err = targetStorage.PutFile(jobCtx, config.S3Bucket, key, reader, "application/octet-stream") + err = targetStorage.PutFile(jobCtx, config.S3Bucket, key, mReader, "application/octet-stream") if err != nil { log.Print("Failed to copy file: ", err) @@ -105,11 +121,16 @@ func copyHandler(w http.ResponseWriter, r *http.Request) error { return } - log.Print("Transfer complete " + callbackURL) + log.Print("Transfer complete: ", key, + ", bytes read: ", formatBytes(float64(mReader.BytesRead)), + ", duration: ", mReader.Duration.Seconds(), + ", speed: ", formatBytes(mReader.TransferSpeed()), "/s") + resValues := url.Values{} resValues.Add("Success", "true") resValues.Add("Key", key) - resValues.Add("Duration", fmt.Sprintf("%f", time.Since(startTime).Seconds())) + resValues.Add("Duration", fmt.Sprintf("%.4fs", time.Since(startTime).Seconds())) + resValues.Add("Size", fmt.Sprintf("%d", mReader.BytesRead)) notifyCallback(callbackURL, resValues) })() diff --git a/zipserver/readers.go b/zipserver/readers.go index 4b39fab..cbb6f7a 100644 --- a/zipserver/readers.go +++ b/zipserver/readers.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "time" ) type readerClosure func(p []byte) (int, error) @@ -35,3 +36,34 @@ func limitedReader(reader io.Reader, maxBytes uint64, totalBytes *uint64) reader return bytesRead, err } } + +type measuredReader struct { + reader io.Reader // The underlying reader + BytesRead int64 // Total bytes read + StartTime time.Time // Time when reading started + Duration time.Duration // Duration of the read operation +} + +func newMeasuredReader(r io.Reader) *measuredReader { + return &measuredReader{ + reader: r, + StartTime: time.Now(), + } +} + +// Read reads data from the underlying io.Reader, tracking the bytes read and duration +func (mr *measuredReader) Read(p []byte) (int, error) { + n, err := mr.reader.Read(p) + mr.BytesRead += int64(n) + mr.Duration = time.Since(mr.StartTime) + + return n, err +} + +// TransferSpeed returns the average transfer speed in bytes per second +func (mr *measuredReader) TransferSpeed() float64 { + if mr.Duration.Seconds() == 0 { + return 0 + } + return float64(mr.BytesRead) / mr.Duration.Seconds() +}