Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:SiaFoundation/renterd into pj/autopi…
Browse files Browse the repository at this point in the history
…lot-state
  • Loading branch information
peterjan committed Nov 14, 2024
2 parents 296d400 + 52c3e6e commit da3580d
Show file tree
Hide file tree
Showing 48 changed files with 1,647 additions and 1,974 deletions.
7 changes: 0 additions & 7 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ type (
RenterFunds types.Currency `json:"renterFunds"`
}

// ContractRootsResponse is the response type for the /contract/:id/roots
// endpoint.
ContractRootsResponse struct {
Roots []types.Hash256 `json:"roots"`
Uploading []types.Hash256 `json:"uploading"`
}

// ContractsArchiveRequest is the request type for the /contracts/archive endpoint.
ContractsArchiveRequest = map[types.FileContractID]string

Expand Down
6 changes: 0 additions & 6 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ type (
Slabs []UploadedPackedSlab `json:"slabs"`
}

// UploadSectorRequest is the request type for the /upload/:id/sector endpoint.
UploadSectorRequest struct {
ContractID types.FileContractID `json:"contractID"`
Root types.Hash256 `json:"root"`
}

UnhealthySlabsResponse struct {
Slabs []UnhealthySlab `json:"slabs"`
}
Expand Down
7 changes: 2 additions & 5 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ type (
}

UploadingSectorsCache interface {
AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error
AddSectors(uID api.UploadID, roots ...types.Hash256) error
FinishUpload(uID api.UploadID)
HandleRenewal(fcid, renewedFrom types.FileContractID)
Pending(fcid types.FileContractID) (size uint64)
Sectors(fcid types.FileContractID) (roots []types.Hash256)
Sectors() (sectors []types.Hash256)
StartUpload(uID api.UploadID) error
}

Expand Down Expand Up @@ -594,7 +592,6 @@ func (b *Bus) addRenewal(ctx context.Context, renewedFrom types.FileContractID,
return api.ContractMetadata{}, err
}

b.sectors.HandleRenewal(renewal.ID, renewal.RenewedFrom)
b.broadcastAction(webhooks.Event{
Module: api.ModuleContract,
Event: api.EventRenew,
Expand Down
9 changes: 3 additions & 6 deletions bus/client/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ func (c *Client) Contract(ctx context.Context, id types.FileContractID) (contrac

// ContractRoots returns the sector roots, as well as the ones that are still
// uploading, for the contract with given id.
func (c *Client) ContractRoots(ctx context.Context, contractID types.FileContractID) (roots, uploading []types.Hash256, err error) {
var resp api.ContractRootsResponse
if err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", contractID), &resp); err != nil {
return
}
return resp.Roots, resp.Uploading, nil
func (c *Client) ContractRoots(ctx context.Context, contractID types.FileContractID) (roots []types.Hash256, err error) {
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", contractID), &roots)
return
}

// ContractSets returns the contract sets of the bus.
Expand Down
9 changes: 3 additions & 6 deletions bus/client/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import (
"go.sia.tech/renterd/api"
)

// AddUploadingSector adds the given sector to the upload with given id.
func (c *Client) AddUploadingSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s/sector", uID), api.UploadSectorRequest{
ContractID: id,
Root: root,
}, nil)
// AddUploadingSectors adds the given sectors to the upload with given id.
func (c *Client) AddUploadingSectors(ctx context.Context, uID api.UploadID, roots []types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s/sector", uID), &roots, nil)
return
}

Expand Down
34 changes: 5 additions & 29 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// build map of uploading sectors
pending := make(map[types.Hash256]struct{})
for _, root := range b.sectors.Sectors(fcid) {
for _, root := range b.sectors.Sectors() {
pending[root] = struct{}{}
}

Expand Down Expand Up @@ -994,16 +994,6 @@ func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// build the response
for fcid, size := range sizes {
// adjust the amount of prunable data with the pending uploads, due to
// how we record contract spending a contract's size might already
// include pending sectors
pending := b.sectors.Pending(fcid)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

contracts = append(contracts, api.ContractPrunableData{
ID: fcid,
ContractSize: size,
Expand Down Expand Up @@ -1040,17 +1030,6 @@ func (b *Bus) contractSizeHandlerGET(jc jape.Context) {
} else if jc.Check("failed to fetch contract size", err) != nil {
return
}

// adjust the amount of prunable data with the pending uploads, due to how
// we record contract spending a contract's size might already include
// pending sectors
pending := b.sectors.Pending(id)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

jc.Encode(size)
}

Expand Down Expand Up @@ -1182,10 +1161,7 @@ func (b *Bus) contractIDRootsHandlerGET(jc jape.Context) {

roots, err := b.store.ContractRoots(jc.Request.Context(), id)
if jc.Check("couldn't fetch contract sectors", err) == nil {
jc.Encode(api.ContractRootsResponse{
Roots: roots,
Uploading: b.sectors.Sectors(id),
})
jc.Encode(roots)
}
}

Expand Down Expand Up @@ -2073,11 +2049,11 @@ func (b *Bus) uploadAddSectorHandlerPOST(jc jape.Context) {
if jc.DecodeParam("id", &id) != nil {
return
}
var req api.UploadSectorRequest
if jc.Decode(&req) != nil {
var roots []types.Hash256
if jc.Decode(&roots) != nil {
return
}
jc.Check("failed to add sector", b.sectors.AddSector(id, req.ContractID, req.Root))
jc.Check("failed to add sectors", b.sectors.AddSectors(id, roots...))
}

func (b *Bus) uploadFinishedHandlerDELETE(jc jape.Context) {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ require (
go.sia.tech/mux v1.3.0
go.sia.tech/web/renterd v0.66.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.28.0
golang.org/x/sys v0.26.0
golang.org/x/term v0.25.0
golang.org/x/crypto v0.29.0
golang.org/x/sys v0.27.0
golang.org/x/term v0.26.0
gopkg.in/yaml.v3 v3.0.1
lukechampine.com/frand v1.5.1
)
Expand All @@ -44,7 +44,7 @@ require (
go.sia.tech/web v0.0.0-20240610131903-5611d44a533e // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.23.0 // indirect
nhooyr.io/websocket v1.8.17 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,25 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190829051458-42f498d34c4d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
76 changes: 10 additions & 66 deletions internal/bus/sectorscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)
Expand All @@ -19,36 +18,23 @@ const (

type (
SectorsCache struct {
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
renewedTo map[types.FileContractID]types.FileContractID
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
}

ongoingUpload struct {
started time.Time
contractSectors map[types.FileContractID][]types.Hash256
started time.Time
sectors []types.Hash256
}
)

func (ou *ongoingUpload) addSector(fcid types.FileContractID, root types.Hash256) {
ou.contractSectors[fcid] = append(ou.contractSectors[fcid], root)
}

func (ou *ongoingUpload) sectors(fcid types.FileContractID) (roots []types.Hash256) {
if sectors, exists := ou.contractSectors[fcid]; exists && time.Since(ou.started) < cacheExpiry {
roots = append(roots, sectors...)
}
return
}

func NewSectorsCache() *SectorsCache {
return &SectorsCache{
uploads: make(map[api.UploadID]*ongoingUpload),
renewedTo: make(map[types.FileContractID]types.FileContractID),
uploads: make(map[api.UploadID]*ongoingUpload),
}
}

func (sc *SectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error {
func (sc *SectorsCache) AddSectors(uID api.UploadID, roots ...types.Hash256) error {
sc.mu.Lock()
defer sc.mu.Unlock()

Expand All @@ -57,8 +43,7 @@ func (sc *SectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, r
return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID)
}

fcid = sc.latestFCID(fcid)
ongoing.addSector(fcid, root)
ongoing.sectors = append(ongoing.sectors, roots...)
return nil
}

Expand All @@ -73,46 +58,13 @@ func (sc *SectorsCache) FinishUpload(uID api.UploadID) {
delete(sc.uploads, uID)
}
}

// prune renewed to map
for old, new := range sc.renewedTo {
if _, exists := sc.renewedTo[new]; exists {
delete(sc.renewedTo, old)
}
}
}

func (sc *SectorsCache) HandleRenewal(fcid, renewedFrom types.FileContractID) {
sc.mu.Lock()
defer sc.mu.Unlock()

for _, upload := range sc.uploads {
if _, exists := upload.contractSectors[renewedFrom]; exists {
upload.contractSectors[fcid] = upload.contractSectors[renewedFrom]
upload.contractSectors[renewedFrom] = nil
}
}
sc.renewedTo[renewedFrom] = fcid
}

func (sc *SectorsCache) Pending(fcid types.FileContractID) (size uint64) {
sc.mu.Lock()
defer sc.mu.Unlock()

fcid = sc.latestFCID(fcid)
for _, ongoing := range sc.uploads {
size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize
}
return
}

func (sc *SectorsCache) Sectors(fcid types.FileContractID) (roots []types.Hash256) {
func (sc *SectorsCache) Sectors() (sectors []types.Hash256) {
sc.mu.Lock()
defer sc.mu.Unlock()

fcid = sc.latestFCID(fcid)
for _, ongoing := range sc.uploads {
roots = append(roots, ongoing.sectors(fcid)...)
sectors = append(sectors, ongoing.sectors...)
}
return
}
Expand All @@ -127,15 +79,7 @@ func (sc *SectorsCache) StartUpload(uID api.UploadID) error {
}

sc.uploads[uID] = &ongoingUpload{
started: time.Now(),
contractSectors: make(map[types.FileContractID][]types.Hash256),
started: time.Now(),
}
return nil
}

func (um *SectorsCache) latestFCID(fcid types.FileContractID) types.FileContractID {
if latest, ok := um.renewedTo[fcid]; ok {
return latest
}
return fcid
}
Loading

0 comments on commit da3580d

Please sign in to comment.