Skip to content

Commit

Permalink
rate limit #monitor update messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ShuhaoQing committed Aug 23, 2024
1 parent 9378069 commit 8b2c60a
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions pkg/cmd_utils/upload_utils/upload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewUploadManager(client *minio.Client, hideMonitor bool, opts *MultipartOpt
// statusMonitorStartSignal is to ensure status monitor is ready before sending messages.
statusMonitorStartSignal := new(sync.WaitGroup)
um.statusMonitorDoneSignal.Add(1)
um.StatusMonitor = tea.NewProgram(NewUploadStatusMonitor(statusMonitorStartSignal), tea.WithFPS(10))
um.StatusMonitor = tea.NewProgram(NewUploadStatusMonitor(statusMonitorStartSignal), tea.WithFPS(2))
go um.runUploadStatusMonitor()
statusMonitorStartSignal.Wait()

Expand Down Expand Up @@ -230,6 +230,7 @@ func (um *UploadManager) FPutObject(absPath string, bucket string, key string, u
} else {
progress := &uploadProgressReader{
absPath: absPath,
total: fileInfo.Size,
monitor: um.StatusMonitor,
}
um.UpdateMonitor(UpdateStatusMsg{Name: absPath, Status: UploadInProgress})
Expand Down Expand Up @@ -433,6 +434,7 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
SectionReader: io.NewSectionReader(fileReader, readOffset, curPartSize),
monitor: um.StatusMonitor,
absPath: filePath,
total: curPartSize,
}
um.Debugf("Uploading part: %d", partToUpload)
objPart, err := c.PutObjectPart(ctx, bucket, key, uploadId, partToUpload, sectionReader, curPartSize, minio.PutObjectPartOptions{SSE: opts.ServerSideEncryption})
Expand Down Expand Up @@ -515,16 +517,21 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,

// uploadProgressReader is a reader that sends progress updates to a channel.
type uploadProgressReader struct {
absPath string
uploaded int64
monitor *tea.Program
absPath string
total int64
prevUploadedCheckpoint int64
uploaded int64
monitor *tea.Program
}

func (r *uploadProgressReader) Read(b []byte) (int, error) {
n := int64(len(b))
r.uploaded += n
if r.monitor != nil {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: n})
if r.uploaded-r.prevUploadedCheckpoint > r.total/100 || r.uploaded == r.total {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: r.uploaded - r.prevUploadedCheckpoint})
r.prevUploadedCheckpoint = r.uploaded
}
}
return int(n), nil
}
Expand All @@ -538,14 +545,21 @@ type uploadedPartRes struct {
// uploadProgressSectionReader is a SectionReader that also sends progress updates to a channel.
type uploadProgressSectionReader struct {
*io.SectionReader
absPath string
monitor *tea.Program
absPath string
total int64
prevUploadedCheckpoint int64
uploaded int64
monitor *tea.Program
}

func (r *uploadProgressSectionReader) Read(b []byte) (int, error) {
n, err := r.SectionReader.Read(b)
r.uploaded += int64(n)
if r.monitor != nil {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: int64(n), Status: UploadInProgress})
if r.uploaded-r.prevUploadedCheckpoint > r.total/100 || r.uploaded == r.total {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: r.uploaded - r.prevUploadedCheckpoint, Status: UploadInProgress})
r.prevUploadedCheckpoint = r.uploaded
}
}
return n, err
}

0 comments on commit 8b2c60a

Please sign in to comment.