Skip to content

Commit

Permalink
worker,bus: revert changes to usable hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Nov 12, 2024
1 parent 112e12d commit ef8698d
Show file tree
Hide file tree
Showing 25 changed files with 136 additions and 144 deletions.
8 changes: 0 additions & 8 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,6 @@ func (cm ContractMetadata) InSet(set string) bool {
return false
}

func (cm ContractMetadata) HostInfo() HostInfo {
return HostInfo{
ContractID: cm.ID,
PublicKey: cm.HostKey,
SiamuxAddr: cm.SiamuxAddr,
}
}

type (
Revision struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
5 changes: 2 additions & 3 deletions api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ type (
}

HostInfo struct {
ContractID types.FileContractID `json:"contractID"`
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`
}

HostInteractions struct {
Expand Down
18 changes: 8 additions & 10 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type (
UpdateHostAllowlistEntries(ctx context.Context, add, remove []types.PublicKey, clear bool) error
UpdateHostBlocklistEntries(ctx context.Context, add, remove []string, clear bool) error
UpdateHostCheck(ctx context.Context, autopilotID string, hk types.PublicKey, check api.HostCheck) error
UsableHosts(ctx context.Context, minWindowStart uint64) ([]sql.HostInfo, error)
UsableHosts(ctx context.Context) ([]sql.HostInfo, error)
}

// A MetadataStore stores information about contracts and objects.
Expand Down Expand Up @@ -316,10 +316,9 @@ type (
)

type Bus struct {
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey
revisionSubmissionBuffer uint64
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey

alerts alerts.Alerter
alertMgr AlertManager
Expand All @@ -344,15 +343,14 @@ type Bus struct {
}

// New returns a new Bus
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, revisionSubmissionBuffer uint64, l *zap.Logger) (_ *Bus, err error) {
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, l *zap.Logger) (_ *Bus, err error) {
l = l.Named("bus")
dialer := rhp.NewFallbackDialer(store, net.Dialer{}, l)

b := &Bus{
allowPrivateIPs: cfg.AllowPrivateIPs,
revisionSubmissionBuffer: revisionSubmissionBuffer,
startTime: time.Now(),
masterKey: masterKey,
allowPrivateIPs: cfg.AllowPrivateIPs,
startTime: time.Now(),
masterKey: masterKey,

s: s,
cm: cm,
Expand Down
8 changes: 1 addition & 7 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,13 +485,7 @@ func (b *Bus) walletPendingHandler(jc jape.Context) {
}

func (b *Bus) hostsHandlerGET(jc jape.Context) {
cs, err := b.consensusState(jc.Request.Context())
if jc.Check("couldn't fetch consensus state", err) != nil {
return
}
minWindowStart := cs.BlockHeight + b.revisionSubmissionBuffer

hosts, err := b.store.UsableHosts(jc.Request.Context(), minWindowStart)
hosts, err := b.store.UsableHosts(jc.Request.Context())
if jc.Check("couldn't fetch hosts", err) != nil {
return
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/renterd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func defaultConfig() config.Config {
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: true,
GatewayAddr: ":9981",
RevisionSubmissionBuffer: 150, // 144 + 6 blocks leeway
SlabBufferCompletionThreshold: 1 << 12,
UsedUTXOExpiry: 24 * time.Hour,
SlabBufferCompletionThreshold: 1 << 12,
},
Worker: config.Worker{
Enabled: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/renterd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func newBus(ctx context.Context, cfg config.Config, pk types.PrivateKey, network
}

// create bus
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, cfg.Bus.RevisionSubmissionBuffer, logger)
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create bus: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type (
GatewayAddr string `yaml:"gatewayAddr,omitempty"`
RemoteAddr string `yaml:"remoteAddr,omitempty"`
RemotePassword string `yaml:"remotePassword,omitempty"`
RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer,omitempty"`
SlabBufferCompletionThreshold int64 `yaml:"slabBufferCompleionThreshold,omitempty"`
UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"`
}
Expand Down
4 changes: 1 addition & 3 deletions internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ func newTestLogger(enable bool) *zap.Logger {

// newTestCluster creates a new cluster without hosts with a funded bus.
func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {
t.Helper()

// Skip any test that requires a cluster when running short tests.
tt := test.NewTT(t)

Expand Down Expand Up @@ -618,7 +616,7 @@ func newTestBus(ctx context.Context, cm *chain.Manager, genesisBlock types.Block
masterKey := blake2b.Sum256(append([]byte("worker"), pk...))

// create bus
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", cfg.RevisionSubmissionBuffer, logger)
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", logger)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down
103 changes: 60 additions & 43 deletions internal/worker/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,43 @@ var (

type (
AccountFunder interface {
FundAccount(ctx context.Context, hi api.HostInfo, desired types.Currency) error
FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error
}

AccountSyncer interface {
SyncAccount(ctx context.Context, hi api.HostInfo) error
SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error
}

AccountStore interface {
Accounts(context.Context, string) ([]api.Account, error)
UpdateAccounts(context.Context, []api.Account) error
}

ConsensusState interface {
ConsensusStateStore interface {
ConsensusState(ctx context.Context) (api.ConsensusState, error)
}

HostStore interface {
UsableHosts(ctx context.Context) ([]api.HostInfo, error)
ContractStore interface {
Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error)
}
)

type (
AccountMgr struct {
alerts alerts.Alerter
funder AccountFunder
syncer AccountSyncer
cs ConsensusState
s AccountStore
hs HostStore
key utils.AccountsKey
logger *zap.SugaredLogger
owner string
refillInterval time.Duration
shutdownCtx context.Context
shutdownCancel context.CancelFunc
wg sync.WaitGroup
alerts alerts.Alerter
funder AccountFunder
syncer AccountSyncer
cs ContractStore
css ConsensusStateStore
s AccountStore
key utils.AccountsKey
logger *zap.SugaredLogger
owner string
refillInterval time.Duration
revisionSubmissionBuffer uint64
shutdownCtx context.Context
shutdownCancel context.CancelFunc
wg sync.WaitGroup

mu sync.Mutex
byID map[rhpv3.Account]*Account
Expand All @@ -91,7 +92,7 @@ type (
// NewAccountManager creates a new account manager. It will load all accounts
// from the given store and mark the shutdown as unclean. When Shutdown is
// called it will save all accounts.
func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, hs HostStore, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, css ConsensusStateStore, cs ContractStore, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
logger := l.Named("accounts").Sugar()

shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
Expand All @@ -100,7 +101,7 @@ func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alert
funder: funder,
syncer: syncer,
cs: cs,
hs: hs,
css: css,
s: s,
key: key,
logger: logger,
Expand Down Expand Up @@ -308,59 +309,75 @@ func (a *AccountMgr) markRefillDone(hk types.PublicKey) {
// goroutine from a previous call, refillWorkerAccounts will skip that account
// until the previously launched goroutine returns.
func (a *AccountMgr) refillAccounts() {
// fetch all usable hosts
hosts, err := a.hs.UsableHosts(a.shutdownCtx)
// fetch config
cs, err := a.css.ConsensusState(a.shutdownCtx)
if err != nil {
a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts: %v", err))
a.logger.Errorw(fmt.Sprintf("failed to fetch consensus state for refill: %v", err))
return
}

// fetch all contracts
contracts, err := a.cs.Contracts(a.shutdownCtx, api.ContractsOpts{FilterMode: api.ContractFilterModeActive})
if err != nil {
a.logger.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err))
return
} else if len(contracts) == 0 {
return
}

// refill accounts in separate goroutines
for _, hi := range hosts {
for _, c := range contracts {
// launch refill if not already in progress
if a.markRefillInProgress(hi.PublicKey) {
go func() {
defer a.markRefillDone(hi.PublicKey)
if a.markRefillInProgress(c.HostKey) {
go func(contract api.ContractMetadata) {
defer a.markRefillDone(contract.HostKey)

rCtx, cancel := context.WithTimeout(a.shutdownCtx, 5*time.Minute)
defer cancel()

// refill
refilled, err := a.refillAccount(rCtx, hi)
refilled, err := a.refillAccount(rCtx, c, cs.BlockHeight, a.revisionSubmissionBuffer)

// determine whether to log something
shouldLog := true
a.mu.Lock()
if t, exists := a.lastLoggedRefillErr[hi.PublicKey]; !exists || err == nil {
a.lastLoggedRefillErr[hi.PublicKey] = time.Now()
if t, exists := a.lastLoggedRefillErr[contract.HostKey]; !exists || err == nil {
a.lastLoggedRefillErr[contract.HostKey] = time.Now()
} else if time.Since(t) < time.Hour {
// only log error once per hour per account
shouldLog = false
}
a.mu.Unlock()

if err != nil && shouldLog {
a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err))
a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err))
} else if refilled {
a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err))
a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err))
}
}()
}(c)
}
}
}

func (a *AccountMgr) refillAccount(ctx context.Context, hi api.HostInfo) (bool, error) {
func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMetadata, bh, revisionSubmissionBuffer uint64) (bool, error) {
// fetch the account
account := a.Account(hi.PublicKey)
account := a.Account(contract.HostKey)

// check if the contract is too close to the proof window to be revised,
// trying to refill the account would result in the host not returning the
// revision and returning an obfuscated error
if (bh + revisionSubmissionBuffer) > contract.WindowStart {
return false, fmt.Errorf("contract %v is too close to the proof window to be revised", contract.ID)
}

// check if a host is potentially cheating before refilling.
// We only check against the max drift if the account's drift is
// negative because we don't care if we have more money than
// expected.
if account.Drift.Cmp(maxNegDrift) < 0 {
alert := newAccountRefillAlert(account.ID, hi.PublicKey, hi.ContractID, errMaxDriftExceeded,
alert := newAccountRefillAlert(account.ID, contract, errMaxDriftExceeded,
"accountID", account.ID.String(),
"hostKey", hi.PublicKey.String(),
"hostKey", contract.HostKey.String(),
"balance", account.Balance.String(),
"drift", account.Drift.String(),
)
Expand All @@ -373,13 +390,13 @@ func (a *AccountMgr) refillAccount(ctx context.Context, hi api.HostInfo) (bool,
// check if a resync is needed
if account.RequiresSync {
// sync the account
err := a.syncer.SyncAccount(ctx, hi)
err := a.syncer.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr)
if err != nil {
return false, fmt.Errorf("failed to sync account's balance: %w", err)
}

// refetch the account after syncing
account = a.Account(hi.PublicKey)
account = a.Account(contract.HostKey)
}

// check if refill is needed
Expand All @@ -388,7 +405,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, hi api.HostInfo) (bool,
}

// fund the account
err := a.funder.FundAccount(ctx, hi, maxBalance)
err := a.funder.FundAccount(ctx, contract.ID, contract.HostKey, maxBalance)
if err != nil {
return false, fmt.Errorf("failed to fund account: %w", err)
}
Expand Down Expand Up @@ -576,12 +593,12 @@ func (a *Account) setBalance(balance *big.Int) {
zap.Stringer("drift", drift))
}

func newAccountRefillAlert(id rhpv3.Account, hk types.PublicKey, fcid types.FileContractID, err error, keysAndValues ...string) alerts.Alert {
func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err error, keysAndValues ...string) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"accountID": id.String(),
"contractID": fcid.String(),
"hostKey": hk.String(),
"contractID": contract.ID.String(),
"hostKey": contract.HostKey.String(),
}
for i := 0; i < len(keysAndValues); i += 2 {
data[keysAndValues[i]] = keysAndValues[i+1]
Expand Down
6 changes: 3 additions & 3 deletions internal/worker/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func (b *mockAccountMgrBackend) RegisterAlert(context.Context, alerts.Alert) err
return nil
}

func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, hi api.HostInfo, balance types.Currency) error {
func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, balance types.Currency) error {
return nil
}
func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, hi api.HostInfo) error {
func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error {
return nil
}
func (b *mockAccountMgrBackend) Accounts(context.Context, string) ([]api.Account, error) {
Expand All @@ -45,7 +45,7 @@ func (b *mockAccountMgrBackend) UpdateAccounts(context.Context, []api.Account) e
func (b *mockAccountMgrBackend) ConsensusState(ctx context.Context) (api.ConsensusState, error) {
return api.ConsensusState{}, nil
}
func (b *mockAccountMgrBackend) DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) {
func (b *mockAccountMgrBackend) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) {
return nil, nil
}
func (b *mockAccountMgrBackend) UsableHosts(ctx context.Context) ([]api.HostInfo, error) {
Expand Down
5 changes: 4 additions & 1 deletion internal/worker/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error)

func (m *mockBus) UsableHosts(ctx context.Context) (hosts []api.HostInfo, _ error) {
for _, c := range m.contracts {
hosts = append(hosts, c.HostInfo())
hosts = append(hosts, api.HostInfo{
PublicKey: c.HostKey,
SiamuxAddr: c.SiamuxAddr,
})
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func (s *SQLStore) RecordPriceTables(ctx context.Context, priceTableUpdate []api
})
}

func (s *SQLStore) UsableHosts(ctx context.Context, minWindowStart uint64) (hosts []sql.HostInfo, err error) {
func (s *SQLStore) UsableHosts(ctx context.Context) (hosts []sql.HostInfo, err error) {
err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
hosts, err = tx.UsableHosts(ctx, minWindowStart)
hosts, err = tx.UsableHosts(ctx)
return err
})
return
Expand Down
Loading

0 comments on commit ef8698d

Please sign in to comment.