Skip to content

Commit

Permalink
memory/worker: move memory manager from worker to dedicated package
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Nov 4, 2024
1 parent 62577b8 commit e6d1b8e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 34 deletions.
10 changes: 3 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/memory"
)

var (
Expand Down Expand Up @@ -65,13 +66,8 @@ type (
}

MemoryResponse struct {
Download MemoryStatus `json:"download"`
Upload MemoryStatus `json:"upload"`
}

MemoryStatus struct {
Available uint64 `json:"available"`
Total uint64 `json:"total"`
Download memory.Status `json:"download"`
Upload memory.Status `json:"upload"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
Expand Down
18 changes: 11 additions & 7 deletions worker/memory.go → internal/memory/memorymanager.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package worker
package memory

import (
"context"
"fmt"
"sync"

"go.sia.tech/renterd/api"
"go.uber.org/zap"
)

type (
// MemoryManager helps regulate processes that use a lot of memory. Such as
// uploads and downloads.
MemoryManager interface {
Status() api.MemoryStatus
Status() Status
AcquireMemory(ctx context.Context, amt uint64) Memory
Limit(amt uint64) (MemoryManager, error)
}
Expand All @@ -23,6 +22,11 @@ type (
ReleaseSome(amt uint64)
}

Status struct {
Available uint64 `json:"available"`
Total uint64 `json:"total"`
}

memoryManager struct {
totalAvailable uint64
logger *zap.SugaredLogger
Expand All @@ -41,7 +45,7 @@ type (

var _ MemoryManager = (*memoryManager)(nil)

func newMemoryManager(maxMemory uint64, logger *zap.Logger) MemoryManager {
func NewManager(maxMemory uint64, logger *zap.Logger) MemoryManager {
return newMemoryManagerCustom(maxMemory, logger.Named("memorymanager").Sugar())
}

Expand All @@ -68,10 +72,10 @@ func (mm *memoryManager) Limit(amt uint64) (MemoryManager, error) {
}, nil
}

func (mm *memoryManager) Status() api.MemoryStatus {
func (mm *memoryManager) Status() Status {
mm.mu.Lock()
defer mm.mu.Unlock()
return api.MemoryStatus{
return Status{
Available: mm.available,
Total: mm.totalAvailable,
}
Expand Down Expand Up @@ -144,7 +148,7 @@ type (
}
)

func (lmm *limitMemoryManager) Status() api.MemoryStatus {
func (lmm *limitMemoryManager) Status() Status {
return lmm.child.Status()
}

Expand Down
7 changes: 4 additions & 3 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/memory"
rhp3 "go.sia.tech/renterd/internal/rhp/v3"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand All @@ -33,7 +34,7 @@ var (
type (
downloadManager struct {
hm HostManager
mm MemoryManager
mm memory.MemoryManager
os ObjectStore
uploadKey *utils.UploadKey
logger *zap.SugaredLogger
Expand Down Expand Up @@ -81,7 +82,7 @@ type (
}

slabDownloadResponse struct {
mem Memory
mem memory.Memory
surchargeApplied bool
shards [][]byte
index int
Expand Down Expand Up @@ -151,7 +152,7 @@ func newDownloadManager(ctx context.Context, uploadKey *utils.UploadKey, hm Host
logger = logger.Named("downloadmanager")
return &downloadManager{
hm: hm,
mm: newMemoryManager(maxMemory, logger),
mm: memory.NewManager(maxMemory, logger),
os: os,
uploadKey: uploadKey,
logger: logger.Sugar(),
Expand Down
12 changes: 4 additions & 8 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/gouging"
"go.sia.tech/renterd/internal/memory"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
)
Expand Down Expand Up @@ -293,11 +294,6 @@ func (hs *hostStoreMock) addHost() *hostMock {
return hs.hosts[hk]
}

var (
_ MemoryManager = (*memoryManagerMock)(nil)
_ Memory = (*memoryMock)(nil)
)

type (
memoryMock struct{}
memoryManagerMock struct{ memBlockChan chan struct{} }
Expand All @@ -312,13 +308,13 @@ func newMemoryManagerMock() *memoryManagerMock {
func (m *memoryMock) Release() {}
func (m *memoryMock) ReleaseSome(uint64) {}

func (mm *memoryManagerMock) Limit(amt uint64) (MemoryManager, error) {
func (mm *memoryManagerMock) Limit(amt uint64) (memory.MemoryManager, error) {
return mm, nil
}

func (mm *memoryManagerMock) Status() api.MemoryStatus { return api.MemoryStatus{} }
func (mm *memoryManagerMock) Status() memory.Status { return memory.Status{} }

func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) Memory {
func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) memory.Memory {
<-mm.memBlockChan
return &memoryMock{}
}
Expand Down
19 changes: 10 additions & 9 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/memory"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.uber.org/zap"
Expand All @@ -40,7 +41,7 @@ var (
type (
uploadManager struct {
hm HostManager
mm MemoryManager
mm memory.MemoryManager
os ObjectStore
cl ContractLocker
cs ContractStore
Expand Down Expand Up @@ -98,7 +99,7 @@ type (
numUploaded uint64
numSectors uint64

mem Memory
mem memory.Memory

errs HostErrorSet
}
Expand Down Expand Up @@ -278,7 +279,7 @@ func (w *Worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
wg.Wait()
}

func (w *Worker) uploadPackedSlab(ctx context.Context, mem Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
func (w *Worker) uploadPackedSlab(ctx context.Context, mem memory.Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
// fetch contracts
contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet})
if err != nil {
Expand Down Expand Up @@ -307,7 +308,7 @@ func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm HostMa
logger = logger.Named("uploadmanager")
return &uploadManager{
hm: hm,
mm: newMemoryManager(maxMemory, logger),
mm: memory.NewManager(maxMemory, logger),
os: os,
cl: cl,
cs: cs,
Expand Down Expand Up @@ -560,7 +561,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
return
}

func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) {
func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem memory.Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -608,7 +609,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc
return nil
}

func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) {
func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem memory.Memory) (err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -745,7 +746,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
mgr.uploaders = refreshed
}

func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) {
func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem memory.Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) {
// prepare response channel
responseChan := make(chan sectorUploadResp)

Expand Down Expand Up @@ -801,7 +802,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [
}, responseChan
}

func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) {
func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) {
// create the response
resp := slabUploadResponse{
slab: object.SlabSlice{
Expand Down Expand Up @@ -829,7 +830,7 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data
return
}

func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) {
func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) {
// ensure inflight uploads get cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit e6d1b8e

Please sign in to comment.