Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move memory manager from worker to dedicated package #1651

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/memory"
)

var (
Expand Down Expand Up @@ -59,13 +60,8 @@ type (
}

MemoryResponse struct {
Download MemoryStatus `json:"download"`
Upload MemoryStatus `json:"upload"`
}

MemoryStatus struct {
Available uint64 `json:"available"`
Total uint64 `json:"total"`
Download memory.Status `json:"download"`
Upload memory.Status `json:"upload"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
Expand Down
18 changes: 11 additions & 7 deletions worker/memory.go → internal/memory/memory.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package worker
package memory

import (
"context"
"fmt"
"sync"

"go.sia.tech/renterd/api"
"go.uber.org/zap"
)

type (
// MemoryManager helps regulate processes that use a lot of memory. Such as
// uploads and downloads.
MemoryManager interface {
Status() api.MemoryStatus
Status() Status
AcquireMemory(ctx context.Context, amt uint64) Memory
Limit(amt uint64) (MemoryManager, error)
}
Expand All @@ -23,6 +22,11 @@ type (
ReleaseSome(amt uint64)
}

Status struct {
Available uint64 `json:"available"`
Total uint64 `json:"total"`
}

memoryManager struct {
totalAvailable uint64
logger *zap.SugaredLogger
Expand All @@ -41,7 +45,7 @@ type (

var _ MemoryManager = (*memoryManager)(nil)

func newMemoryManager(maxMemory uint64, logger *zap.Logger) MemoryManager {
func NewManager(maxMemory uint64, logger *zap.Logger) MemoryManager {
return newMemoryManagerCustom(maxMemory, logger.Named("memorymanager").Sugar())
}

Expand All @@ -68,10 +72,10 @@ func (mm *memoryManager) Limit(amt uint64) (MemoryManager, error) {
}, nil
}

func (mm *memoryManager) Status() api.MemoryStatus {
func (mm *memoryManager) Status() Status {
mm.mu.Lock()
defer mm.mu.Unlock()
return api.MemoryStatus{
return Status{
Available: mm.available,
Total: mm.totalAvailable,
}
Expand Down Expand Up @@ -144,7 +148,7 @@ type (
}
)

func (lmm *limitMemoryManager) Status() api.MemoryStatus {
func (lmm *limitMemoryManager) Status() Status {
return lmm.child.Status()
}

Expand Down
7 changes: 4 additions & 3 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/memory"
rhp3 "go.sia.tech/renterd/internal/rhp/v3"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand All @@ -33,7 +34,7 @@ var (
type (
downloadManager struct {
hm HostManager
mm MemoryManager
mm memory.MemoryManager
os ObjectStore
uploadKey *utils.UploadKey
logger *zap.SugaredLogger
Expand Down Expand Up @@ -81,7 +82,7 @@ type (
}

slabDownloadResponse struct {
mem Memory
mem memory.Memory
surchargeApplied bool
shards [][]byte
index int
Expand Down Expand Up @@ -151,7 +152,7 @@ func newDownloadManager(ctx context.Context, uploadKey *utils.UploadKey, hm Host
logger = logger.Named("downloadmanager")
return &downloadManager{
hm: hm,
mm: newMemoryManager(maxMemory, logger),
mm: memory.NewManager(maxMemory, logger),
os: os,
uploadKey: uploadKey,
logger: logger.Sugar(),
Expand Down
12 changes: 4 additions & 8 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/gouging"
"go.sia.tech/renterd/internal/memory"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
)
Expand Down Expand Up @@ -293,11 +294,6 @@ func (hs *hostStoreMock) addHost() *hostMock {
return hs.hosts[hk]
}

var (
_ MemoryManager = (*memoryManagerMock)(nil)
_ Memory = (*memoryMock)(nil)
)

type (
memoryMock struct{}
memoryManagerMock struct{ memBlockChan chan struct{} }
Expand All @@ -312,13 +308,13 @@ func newMemoryManagerMock() *memoryManagerMock {
func (m *memoryMock) Release() {}
func (m *memoryMock) ReleaseSome(uint64) {}

func (mm *memoryManagerMock) Limit(amt uint64) (MemoryManager, error) {
func (mm *memoryManagerMock) Limit(amt uint64) (memory.MemoryManager, error) {
return mm, nil
}

func (mm *memoryManagerMock) Status() api.MemoryStatus { return api.MemoryStatus{} }
func (mm *memoryManagerMock) Status() memory.Status { return memory.Status{} }

func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) Memory {
func (mm *memoryManagerMock) AcquireMemory(ctx context.Context, amt uint64) memory.Memory {
<-mm.memBlockChan
return &memoryMock{}
}
Expand Down
19 changes: 10 additions & 9 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/memory"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.uber.org/zap"
Expand All @@ -40,7 +41,7 @@ var (
type (
uploadManager struct {
hm HostManager
mm MemoryManager
mm memory.MemoryManager
os ObjectStore
cl ContractLocker
cs ContractStore
Expand Down Expand Up @@ -98,7 +99,7 @@ type (
numUploaded uint64
numSectors uint64

mem Memory
mem memory.Memory

errs HostErrorSet
}
Expand Down Expand Up @@ -278,7 +279,7 @@ func (w *Worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
wg.Wait()
}

func (w *Worker) uploadPackedSlab(ctx context.Context, mem Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
func (w *Worker) uploadPackedSlab(ctx context.Context, mem memory.Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
// fetch contracts
contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet})
if err != nil {
Expand Down Expand Up @@ -307,7 +308,7 @@ func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm HostMa
logger = logger.Named("uploadmanager")
return &uploadManager{
hm: hm,
mm: newMemoryManager(maxMemory, logger),
mm: memory.NewManager(maxMemory, logger),
os: os,
cl: cl,
cs: cs,
Expand Down Expand Up @@ -560,7 +561,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
return
}

func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) {
func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem memory.Memory, contracts []api.ContractMetadata, bh uint64, lockPriority int) (err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -608,7 +609,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc
return nil
}

func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) {
func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem memory.Memory) (err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -745,7 +746,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
mgr.uploaders = refreshed
}

func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) {
func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader, mem memory.Memory, maxOverdrive uint64) (*slabUpload, chan sectorUploadResp) {
// prepare response channel
responseChan := make(chan sectorUploadResp)

Expand Down Expand Up @@ -801,7 +802,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [
}, responseChan
}

func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) {
func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data []byte, length, index int, respChan chan slabUploadResponse, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (uploadSpeed int64, overdrivePct float64) {
// create the response
resp := slabUploadResponse{
slab: object.SlabSlice{
Expand Down Expand Up @@ -829,7 +830,7 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data
return
}

func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) {
func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem memory.Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) {
// ensure inflight uploads get cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading