Skip to content

Commit

Permalink
Merge branch 'dev' into pj/usable-hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Nov 8, 2024
2 parents f9a119c + c3c5654 commit aece529
Show file tree
Hide file tree
Showing 19 changed files with 226 additions and 229 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ overview of all settings configurable through the CLI.
| `Bus.UsedUTXOExpiry` | Expiry for used UTXOs in transactions | `24h` | `--bus.usedUTXOExpiry` | - | `bus.usedUtxoExpiry` |
| `Bus.SlabBufferCompletionThreshold` | Threshold for slab buffer upload | `4096` | `--bus.slabBufferCompletionThreshold` | `RENTERD_BUS_SLAB_BUFFER_COMPLETION_THRESHOLD` | `bus.slabBufferCompletionThreshold` |
| `Worker.BusFlushInterval` | Interval for flushing data to bus | `5s` | `--worker.busFlushInterval` | - | `worker.busFlushInterval` |
| `Worker.ContractLockTimeout` | Timeout for locking contracts | `30s` | - | - | `worker.contractLockTimeout` |
| `Worker.DownloadMaxOverdrive` | Max overdrive workers for downloads | `5` | `--worker.downloadMaxOverdrive` | - | `worker.downloadMaxOverdrive` |
| `Worker.DownloadMaxMemory` | Max memory for downloads | `1GiB` | `--worker.downloadMaxMemory` | `RENTERD_WORKER_DOWNLOAD_MAX_MEMORY` | `worker.downloadMaxMemory` |
| `Worker.ID` | Unique ID for worker | `worker` | `--worker.id` | `RENTERD_WORKER_ID` | `worker.id` |
Expand Down
1 change: 0 additions & 1 deletion cmd/renterd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func defaultConfig() config.Config {

ID: "",
AccountsRefillInterval: defaultAccountRefillInterval,
ContractLockTimeout: 30 * time.Second,
BusFlushInterval: 5 * time.Second,

DownloadMaxOverdrive: 5,
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ type (
Remotes []RemoteWorker `yaml:"remotes,omitempty"`
AccountsRefillInterval time.Duration `yaml:"accountsRefillInterval,omitempty"`
BusFlushInterval time.Duration `yaml:"busFlushInterval,omitempty"`
ContractLockTimeout time.Duration `yaml:"contractLockTimeout,omitempty"`
DownloadOverdriveTimeout time.Duration `yaml:"downloadOverdriveTimeout,omitempty"`
UploadOverdriveTimeout time.Duration `yaml:"uploadOverdriveTimeout,omitempty"`
DownloadMaxOverdrive uint64 `yaml:"downloadMaxOverdrive,omitempty"`
Expand Down
30 changes: 30 additions & 0 deletions internal/host/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package host

import (
"context"
"io"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)

type (
Host interface {
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

PriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)

FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error
SyncAccount(ctx context.Context, rev *types.FileContractRevision) error
}

HostManager interface {
Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host
}
)
100 changes: 100 additions & 0 deletions internal/locking/contractlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package locking

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.sia.tech/core/types"
"go.uber.org/zap"
)

type ContractLocker interface {
AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error)
KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error)
ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error)
}

type ContractLock struct {
lockID uint64
fcid types.FileContractID
d time.Duration
locker ContractLocker
logger *zap.SugaredLogger

keepaliveLoopCtx context.Context
keepaliveLoopCtxCancel context.CancelFunc
stopWG sync.WaitGroup
}

func (cl *ContractLock) keepaliveLoop() {
// Create ticker for 20% of the lock duration.
start := time.Now()
var lastUpdate time.Time
tickDuration := cl.d / 5
t := time.NewTicker(tickDuration)

// Cleanup
defer func() {
t.Stop()
select {
case <-t.C:
default:
}
}()

// Loop until stopped.
for {
select {
case <-cl.keepaliveLoopCtx.Done():
return // released
case <-t.C:
}
if err := cl.locker.KeepaliveContract(cl.keepaliveLoopCtx, cl.fcid, cl.lockID, cl.d); err != nil && !errors.Is(err, context.Canceled) {
cl.logger.Errorw(fmt.Sprintf("failed to send keepalive: %v", err),
"contract", cl.fcid,
"lockID", cl.lockID,
"loopStart", start,
"timeSinceLastUpdate", time.Since(lastUpdate),
"tickDuration", tickDuration)
return
}
lastUpdate = time.Now()
}
}

func (cl *ContractLock) Release(ctx context.Context) error {
// Stop background loop.
cl.keepaliveLoopCtxCancel()
cl.stopWG.Wait()

// Release the contract.
return cl.locker.ReleaseContract(ctx, cl.fcid, cl.lockID)
}

func NewContractLock(ctx context.Context, fcid types.FileContractID, priority int, locker ContractLocker, logger *zap.SugaredLogger) (*ContractLock, error) {
duration := 30 * time.Second
lockID, err := locker.AcquireContract(ctx, fcid, priority, duration)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
cl := &ContractLock{
lockID: lockID,
fcid: fcid,
d: duration,
locker: locker,
logger: logger,

keepaliveLoopCtx: ctx,
keepaliveLoopCtxCancel: cancel,
}
cl.stopWG.Add(1)
go func() {
cl.keepaliveLoop()
cl.stopWG.Done()
}()
return cl, nil
}
1 change: 0 additions & 1 deletion internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ func testDBCfg() dbConfig {
func testWorkerCfg() config.Worker {
return config.Worker{
AccountsRefillInterval: 10 * time.Millisecond,
ContractLockTimeout: 5 * time.Second,
ID: "worker",
BusFlushInterval: testBusFlushInterval,
DownloadOverdriveTimeout: 500 * time.Millisecond,
Expand Down
21 changes: 21 additions & 0 deletions internal/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"strings"

"go.sia.tech/core/types"
)

// Common i/o related errors
Expand Down Expand Up @@ -42,3 +44,22 @@ func WrapErr(ctx context.Context, fnName string, err *error) {
}
}
}

// A HostErrorSet is a collection of errors from various hosts.
type HostErrorSet map[types.PublicKey]error

// Error implements error.
func (hes HostErrorSet) Error() string {
if len(hes) == 0 {
return ""
}

var strs []string
for hk, he := range hes {
strs = append(strs, fmt.Sprintf("%x: %v", hk[:4], he.Error()))
}

// include a leading newline so that the first error isn't printed on the
// same line as the error context
return "\n" + strings.Join(strs, "\n")
}
5 changes: 3 additions & 2 deletions worker/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objec
for errors.Unwrap(hostErr) != nil {
hostErr = errors.Unwrap(hostErr)
}
if set, ok := hostErr.(HostErrorSet); ok {
if set, ok := hostErr.(utils.HostErrorSet); ok {
hostErrors := make(map[string]string, len(set))
for hk, err := range set {
hostErrors[hk.String()] = err.Error()
Expand Down Expand Up @@ -98,7 +99,7 @@ func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards,
for errors.Unwrap(hostErr) != nil {
hostErr = errors.Unwrap(hostErr)
}
if set, ok := hostErr.(HostErrorSet); ok {
if set, ok := hostErr.(utils.HostErrorSet); ok {
hostErrors := make(map[string]string, len(set))
for hk, err := range set {
hostErrors[hk.String()] = err.Error()
Expand Down
3 changes: 2 additions & 1 deletion worker/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/google/go-cmp/cmp"
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/internal/utils"
)

// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with
// a HostErrorSet error registers an alert with all the individual errors of any
// host in the payload.
func TestUploadFailedAlertErrorSet(t *testing.T) {
hostErrSet := HostErrorSet{
hostErrSet := utils.HostErrorSet{
types.PublicKey{1, 1, 1}: errors.New("test"),
}
wrapped := fmt.Errorf("wrapped error: %w", hostErrSet)
Expand Down
6 changes: 3 additions & 3 deletions worker/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BenchmarkDownloaderSingleObject(b *testing.B) {
w.AddHosts(up.rs.TotalShards)

data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy())))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up, lockingPriorityUpload)
_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func BenchmarkUploaderSingleObject(b *testing.B) {
b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up, lockingPriorityUpload)
_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up)
if err != nil {
b.Fatal(err)
}
Expand All @@ -93,7 +93,7 @@ func BenchmarkUploaderMultiObject(b *testing.B) {

for i := 0; i < b.N; i++ {
data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up, lockingPriorityUpload)
_, _, err := w.uploadManager.Upload(context.Background(), data, w.Contracts(), up)
if err != nil {
b.Fatal(err)
}
Expand Down
90 changes: 3 additions & 87 deletions worker/contract_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package worker

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.sia.tech/core/types"
"go.uber.org/zap"
"go.sia.tech/renterd/internal/locking"
)

type ContractLocker interface {
Expand All @@ -19,44 +16,8 @@ type ContractLocker interface {

var _ ContractLocker = (Bus)(nil)

type contractLock struct {
lockID uint64
fcid types.FileContractID
d time.Duration
locker ContractLocker
logger *zap.SugaredLogger

keepaliveLoopCtx context.Context
keepaliveLoopCtxCancel context.CancelFunc
stopWG sync.WaitGroup
}

func newContractLock(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration, locker ContractLocker, logger *zap.SugaredLogger) *contractLock {
ctx, cancel := context.WithCancel(ctx)
cl := &contractLock{
lockID: lockID,
fcid: fcid,
d: d,
locker: locker,
logger: logger,

keepaliveLoopCtx: ctx,
keepaliveLoopCtxCancel: cancel,
}
cl.stopWG.Add(1)
go func() {
cl.keepaliveLoop()
cl.stopWG.Done()
}()
return cl
}

func (w *Worker) acquireContractLock(ctx context.Context, fcid types.FileContractID, priority int) (_ *contractLock, err error) {
lockID, err := w.bus.AcquireContract(ctx, fcid, priority, w.contractLockingDuration)
if err != nil {
return nil, err
}
return newContractLock(w.shutdownCtx, fcid, lockID, w.contractLockingDuration, w.bus, w.logger), nil
func (w *Worker) acquireContractLock(ctx context.Context, fcid types.FileContractID, priority int) (_ *locking.ContractLock, err error) {
return locking.NewContractLock(ctx, fcid, priority, w.bus, w.logger)
}

func (w *Worker) withContractLock(ctx context.Context, fcid types.FileContractID, priority int, fn func() error) error {
Expand All @@ -72,48 +33,3 @@ func (w *Worker) withContractLock(ctx context.Context, fcid types.FileContractID

return fn()
}

func (cl *contractLock) Release(ctx context.Context) error {
// Stop background loop.
cl.keepaliveLoopCtxCancel()
cl.stopWG.Wait()

// Release the contract.
return cl.locker.ReleaseContract(ctx, cl.fcid, cl.lockID)
}

func (cl *contractLock) keepaliveLoop() {
// Create ticker for 20% of the lock duration.
start := time.Now()
var lastUpdate time.Time
tickDuration := cl.d / 5
t := time.NewTicker(tickDuration)

// Cleanup
defer func() {
t.Stop()
select {
case <-t.C:
default:
}
}()

// Loop until stopped.
for {
select {
case <-cl.keepaliveLoopCtx.Done():
return // released
case <-t.C:
}
if err := cl.locker.KeepaliveContract(cl.keepaliveLoopCtx, cl.fcid, cl.lockID, cl.d); err != nil && !errors.Is(err, context.Canceled) {
cl.logger.Errorw(fmt.Sprintf("failed to send keepalive: %v", err),
"contract", cl.fcid,
"lockID", cl.lockID,
"loopStart", start,
"timeSinceLastUpdate", time.Since(lastUpdate),
"tickDuration", tickDuration)
return
}
lastUpdate = time.Now()
}
}
4 changes: 2 additions & 2 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type (
numRelaunched uint64

sectors []*sectorInfo
errs HostErrorSet
errs utils.HostErrorSet
}

slabDownloadResponse struct {
Expand Down Expand Up @@ -550,7 +550,7 @@ func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bo
overpay: migration && slice.Health <= downloadOverpayHealthThreshold,

sectors: sectors,
errs: make(HostErrorSet),
errs: make(utils.HostErrorSet),
}
}

Expand Down
Loading

0 comments on commit aece529

Please sign in to comment.