From 2196367c98efedf386ed4202d2e083d140b9154f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 4 Nov 2024 10:23:22 +0100 Subject: [PATCH] memory/worker: move memory manager from worker to dedicated package --- api/worker.go | 10 +++------- {worker => internal/memory}/memory.go | 18 +++++++++++------- worker/download.go | 7 ++++--- worker/mocks_test.go | 12 ++++-------- worker/upload.go | 19 ++++++++++--------- 5 files changed, 32 insertions(+), 34 deletions(-) rename {worker => internal/memory}/memory.go (92%) diff --git a/api/worker.go b/api/worker.go index 8b3234a18..1863ac6cc 100644 --- a/api/worker.go +++ b/api/worker.go @@ -12,6 +12,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" + "go.sia.tech/renterd/internal/memory" ) var ( @@ -59,13 +60,8 @@ type ( } MemoryResponse struct { - Download MemoryStatus `json:"download"` - Upload MemoryStatus `json:"upload"` - } - - MemoryStatus struct { - Available uint64 `json:"available"` - Total uint64 `json:"total"` + Download memory.Status `json:"download"` + Upload memory.Status `json:"upload"` } // RHPFormResponse is the response type for the /rhp/form endpoint. diff --git a/worker/memory.go b/internal/memory/memory.go similarity index 92% rename from worker/memory.go rename to internal/memory/memory.go index d6f459bc8..55b2ca65b 100644 --- a/worker/memory.go +++ b/internal/memory/memory.go @@ -1,11 +1,10 @@ -package worker +package memory import ( "context" "fmt" "sync" - "go.sia.tech/renterd/api" "go.uber.org/zap" ) @@ -13,7 +12,7 @@ type ( // MemoryManager helps regulate processes that use a lot of memory. Such as // uploads and downloads. MemoryManager interface { - Status() api.MemoryStatus + Status() Status AcquireMemory(ctx context.Context, amt uint64) Memory Limit(amt uint64) (MemoryManager, error) } @@ -23,6 +22,11 @@ type ( ReleaseSome(amt uint64) } + Status struct { + Available uint64 `json:"available"` + Total uint64 `json:"total"` + } + memoryManager struct { totalAvailable uint64 logger *zap.SugaredLogger @@ -41,7 +45,7 @@ type ( var _ MemoryManager = (*memoryManager)(nil) -func newMemoryManager(maxMemory uint64, logger *zap.Logger) MemoryManager { +func NewManager(maxMemory uint64, logger *zap.Logger) MemoryManager { return newMemoryManagerCustom(maxMemory, logger.Named("memorymanager").Sugar()) } @@ -68,10 +72,10 @@ func (mm *memoryManager) Limit(amt uint64) (MemoryManager, error) { }, nil } -func (mm *memoryManager) Status() api.MemoryStatus { +func (mm *memoryManager) Status() Status { mm.mu.Lock() defer mm.mu.Unlock() - return api.MemoryStatus{ + return Status{ Available: mm.available, Total: mm.totalAvailable, } @@ -144,7 +148,7 @@ type ( } ) -func (lmm *limitMemoryManager) Status() api.MemoryStatus { +func (lmm *limitMemoryManager) Status() Status { return lmm.child.Status() } diff --git a/worker/download.go b/worker/download.go index ca8d9a213..08503394c 100644 --- a/worker/download.go +++ b/worker/download.go @@ -14,6 +14,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/memory" rhp3 "go.sia.tech/renterd/internal/rhp/v3" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" @@ -33,7 +34,7 @@ var ( type ( downloadManager struct { hm HostManager - mm MemoryManager + mm memory.MemoryManager os ObjectStore uploadKey *utils.UploadKey logger *zap.SugaredLogger @@ -81,7 +82,7 @@ type ( } slabDownloadResponse struct { - mem Memory + mem memory.Memory surchargeApplied bool shards [][]byte index int @@ -151,7 +152,7 @@ func newDownloadManager(ctx context.Context, uploadKey *utils.UploadKey, hm Host logger = logger.Named("downloadmanager") return &downloadManager{ hm: hm, - mm: newMemoryManager(maxMemory, logger), + mm: memory.NewManager(maxMemory, logger), os: os, uploadKey: uploadKey, logger: logger.Sugar(), diff --git a/worker/mocks_test.go b/worker/mocks_test.go index d93e18c9e..cd4a4ce3c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -15,6 +15,7 @@ import ( "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/gouging" + "go.sia.tech/renterd/internal/memory" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" ) @@ -293,11 +294,6 @@ func (hs *hostStoreMock) addHost() *hostMock { return hs.hosts[hk] } -var ( - _ MemoryManager = (*memoryManagerMock)(nil) - _ Memory = (*memoryMock)(nil) -) - type ( memoryMock struct{} memoryManagerMock struct{ memBlockChan chan struct{} } @@ -312,13 +308,13 @@ func newMemoryManagerMock() *memoryManagerMock { func (m *memoryMock) Release() {} func (m *memoryMock) ReleaseSome(uint64) {} -func (mm *memoryManagerMock) Limit(amt uint64) (MemoryManager, error) { +func (mm *memoryManagerMock) Limit(amt uint64) (memory.MemoryManager, error) { return mm, nil } -func (mm *memoryManagerMock) Status() api.MemoryStatus { return api.MemoryStatus{} } +func (mm *memoryManagerMock) Status() memory.Status { return memory.Status{} } -func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) Memory { +func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) memory.Memory { <-mm.memBlockChan return &memoryMock{} } diff --git a/worker/upload.go b/worker/upload.go index 8fdfefe2e..ab5f2b41b 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -17,6 +17,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/memory" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.uber.org/zap" @@ -40,7 +41,7 @@ var ( type ( uploadManager struct { hm HostManager - mm MemoryManager + mm memory.MemoryManager os ObjectStore cl ContractLocker cs ContractStore @@ -98,7 +99,7 @@ type ( numUploaded uint64 numSectors uint64 - mem Memory + mem memory.Memory errs HostErrorSet } @@ -278,7 +279,7 @@ func (w *Worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe wg.Wait() } -func (w *Worker) uploadPackedSlab(ctx context.Context, mem Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error { +func (w *Worker) uploadPackedSlab(ctx context.Context, mem memory.Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error { // fetch contracts contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet}) if err != nil { @@ -307,7 +308,7 @@ func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm HostMa logger = logger.Named("uploadmanager") return &uploadManager{ hm: hm, - mm: newMemoryManager(maxMemory, logger), + mm: memory.NewManager(maxMemory, logger), os: os, cl: cl, cs: cs, @@ -560,7 +561,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a return } -func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) { +func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem memory.Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -608,7 +609,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem memory.Memory) (err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -745,7 +746,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh mgr.uploaders = refreshed } -func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) { +func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem memory.Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) { // prepare response channel responseChan := make(chan sectorUploadResp) @@ -801,7 +802,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ }, responseChan } -func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) { +func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) { // create the response resp := slabUploadResponse{ slab: object.SlabSlice{ @@ -829,7 +830,7 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data return } -func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { +func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { // ensure inflight uploads get cancelled ctx, cancel := context.WithCancel(ctx) defer cancel()