From 37ae74f3d666e459d893afe5ae996dcfb8f3f37c Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 25 Sep 2024 15:48:02 +0200 Subject: [PATCH 1/7] stores: add Backup method --- api/bus.go | 7 ++ bus/bus.go | 55 +++++----- bus/client/client.go | 9 ++ bus/routes.go | 200 ++++++++++++++++++++---------------- internal/sql/sql.go | 4 + stores/backups.go | 29 ++++++ stores/sql/sqlite/backup.go | 134 ++++++++++++++++++++++++ stores/sql/sqlite/common.go | 2 +- 8 files changed, 320 insertions(+), 120 deletions(-) create mode 100644 stores/backups.go create mode 100644 stores/sql/sqlite/backup.go diff --git a/api/bus.go b/api/bus.go index 3b73469e3..cf2552b03 100644 --- a/api/bus.go +++ b/api/bus.go @@ -10,6 +10,8 @@ import ( var ( ErrMarkerNotFound = errors.New("marker not found") ErrMaxFundAmountExceeded = errors.New("renewal exceeds max fund amount") + ErrInvalidDatabase = errors.New("invalid database type") + ErrBackupNotSupported = errors.New("backups not supported for used database") ) type ( @@ -60,6 +62,11 @@ type ( Accounts []Account `json:"accounts"` } + BackupRequest struct { + Database string `json:"database"` + Path string `json:"path"` + } + // BusStateResponse is the response type for the /bus/state endpoint. BusStateResponse struct { StartTime TimeRFC3339 `json:"startTime"` diff --git a/bus/bus.go b/bus/bus.go index 832b603b0..a9faecf32 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -164,6 +164,7 @@ type ( Store interface { AccountStore AutopilotStore + BackupStore ChainStore HostStore MetadataStore @@ -186,6 +187,11 @@ type ( UpdateAutopilot(ctx context.Context, ap api.Autopilot) error } + // BackupStore is the interface of a store that can be backed up. + BackupStore interface { + Backup(ctx context.Context, dbID, destPath string) error + } + // A ChainStore stores information about the chain. ChainStore interface { ChainIndex(ctx context.Context) (types.ChainIndex, error) @@ -314,13 +320,7 @@ type Bus struct { cs ChainSubscriber s Syncer w Wallet - - accounts AccountStore - as AutopilotStore - hs HostStore - ms MetadataStore - mtrcs MetricsStore - ss SettingStore + store Store rhp2 *rhp2.Client rhp3 *rhp3.Client @@ -340,15 +340,10 @@ func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksMa startTime: time.Now(), masterKey: masterKey, - accounts: store, - s: s, - cm: cm, - w: w, - hs: store, - as: store, - ms: store, - mtrcs: store, - ss: store, + s: s, + cm: cm, + w: w, + store: store, alerts: alerts.WithOrigin(am, "bus"), alertMgr: am, @@ -497,6 +492,8 @@ func (b *Bus) Handler() http.Handler { "POST /syncer/connect": b.syncerConnectHandler, "GET /syncer/peers": b.syncerPeersHandler, + "POST /system/database/backup": b.postSystemSQLite3BackupHandler, + "GET /txpool/recommendedfee": b.txpoolFeeHandler, "GET /txpool/transactions": b.txpoolTransactionsHandler, "POST /txpool/broadcast": b.txpoolBroadcastHandler, @@ -533,7 +530,7 @@ func (b *Bus) Shutdown(ctx context.Context) error { } func (b *Bus) addContract(ctx context.Context, rev rhpv2.ContractRevision, contractPrice, totalCost types.Currency, startHeight uint64, state string) (api.ContractMetadata, error) { - c, err := b.ms.AddContract(ctx, rev, contractPrice, totalCost, startHeight, state) + c, err := b.store.AddContract(ctx, rev, contractPrice, totalCost, startHeight, state) if err != nil { return api.ContractMetadata{}, err } @@ -550,7 +547,7 @@ func (b *Bus) addContract(ctx context.Context, rev rhpv2.ContractRevision, contr } func (b *Bus) addRenewedContract(ctx context.Context, renewedFrom types.FileContractID, rev rhpv2.ContractRevision, contractPrice, totalCost types.Currency, startHeight uint64, state string) (api.ContractMetadata, error) { - r, err := b.ms.AddRenewedContract(ctx, rev, contractPrice, totalCost, startHeight, renewedFrom, state) + r, err := b.store.AddRenewedContract(ctx, rev, contractPrice, totalCost, startHeight, renewedFrom, state) if err != nil { return api.ContractMetadata{}, err } @@ -580,7 +577,7 @@ func (b *Bus) broadcastContract(ctx context.Context, fcid types.FileContractID) }() // fetch contract - c, err := b.ms.Contract(ctx, fcid) + c, err := b.store.Contract(ctx, fcid) if err != nil { return types.TransactionID{}, fmt.Errorf("couldn't fetch contract; %w", err) } @@ -685,10 +682,10 @@ func (b *Bus) initSettings(ctx context.Context) error { api.SettingRedundancy: defaultRedundancySettings, api.SettingUploadPacking: api.DefaultUploadPackingSettings, } { - if _, err := b.ss.Setting(ctx, key); errors.Is(err, api.ErrSettingNotFound) { + if _, err := b.store.Setting(ctx, key); errors.Is(err, api.ErrSettingNotFound) { if bytes, err := json.Marshal(value); err != nil { panic("failed to marshal default settings") // should never happen - } else if err := b.ss.UpdateSetting(ctx, key, string(bytes)); err != nil { + } else if err := b.store.UpdateSetting(ctx, key, string(bytes)); err != nil { return err } } @@ -696,21 +693,21 @@ func (b *Bus) initSettings(ctx context.Context) error { // check redundancy settings for validity var rs api.RedundancySettings - if rss, err := b.ss.Setting(ctx, api.SettingRedundancy); err != nil { + if rss, err := b.store.Setting(ctx, api.SettingRedundancy); err != nil { return err } else if err := json.Unmarshal([]byte(rss), &rs); err != nil { return err } else if err := rs.Validate(); err != nil { b.logger.Warn(fmt.Sprintf("invalid redundancy setting found '%v', overwriting the redundancy settings with the default settings", rss)) bytes, _ := json.Marshal(defaultRedundancySettings) - if err := b.ss.UpdateSetting(ctx, api.SettingRedundancy, string(bytes)); err != nil { + if err := b.store.UpdateSetting(ctx, api.SettingRedundancy, string(bytes)); err != nil { return err } } // check gouging settings for validity var gs api.GougingSettings - if gss, err := b.ss.Setting(ctx, api.SettingGouging); err != nil { + if gss, err := b.store.Setting(ctx, api.SettingGouging); err != nil { return err } else if err := json.Unmarshal([]byte(gss), &gs); err != nil { return err @@ -722,7 +719,7 @@ func (b *Bus) initSettings(ctx context.Context) error { if err := gs.Validate(); err == nil { b.logger.Info(fmt.Sprintf("updating gouging settings with default EA settings: %+v", gs)) bytes, _ := json.Marshal(gs) - if err := b.ss.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { + if err := b.store.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { return err } } else { @@ -731,13 +728,13 @@ func (b *Bus) initSettings(ctx context.Context) error { if err := gs.Validate(); err == nil { b.logger.Info(fmt.Sprintf("updating gouging settings with default HostBlockHeightLeeway settings: %v", gs)) bytes, _ := json.Marshal(gs) - if err := b.ss.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { + if err := b.store.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { return err } } else { b.logger.Warn(fmt.Sprintf("invalid gouging setting found '%v', overwriting the gouging settings with the default settings", gss)) bytes, _ := json.Marshal(api.DefaultGougingSettings) - if err := b.ss.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { + if err := b.store.UpdateSetting(ctx, api.SettingGouging, string(bytes)); err != nil { return err } } @@ -746,7 +743,7 @@ func (b *Bus) initSettings(ctx context.Context) error { // compat: default price pin settings var pps api.PricePinSettings - if pss, err := b.ss.Setting(ctx, api.SettingPricePinning); err != nil { + if pss, err := b.store.Setting(ctx, api.SettingPricePinning); err != nil { return err } else if err := json.Unmarshal([]byte(pss), &pps); err != nil { return err @@ -775,7 +772,7 @@ func (b *Bus) initSettings(ctx context.Context) error { updated, _ = json.Marshal(api.DefaultPricePinSettings) } - if err := b.ss.UpdateSetting(ctx, api.SettingPricePinning, string(updated)); err != nil { + if err := b.store.UpdateSetting(ctx, api.SettingPricePinning, string(updated)); err != nil { return err } } diff --git a/bus/client/client.go b/bus/client/client.go index c31b6d4a2..ee3dacc6c 100644 --- a/bus/client/client.go +++ b/bus/client/client.go @@ -1,6 +1,7 @@ package client import ( + "context" "net/http" "go.sia.tech/jape" @@ -21,6 +22,14 @@ func New(addr, password string) *Client { }} } +func (c *Client) Backup(ctx context.Context, database, dstPath string) (err error) { + err = c.c.WithContext(ctx).POST("/system/database/backup", api.BackupRequest{ + Database: database, + Path: dstPath, + }, nil) + return +} + // State returns the current state of the bus. func (c *Client) State() (state api.BusStateResponse, err error) { err = c.c.GET("/state", &state) diff --git a/bus/routes.go b/bus/routes.go index d8ffd2997..d521c01a7 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -36,7 +36,7 @@ import ( ) func (b *Bus) fetchSetting(ctx context.Context, key string, value interface{}) error { - if val, err := b.ss.Setting(ctx, key); err != nil { + if val, err := b.store.Setting(ctx, key); err != nil { return fmt.Errorf("could not get contract set settings: %w", err) } else if err := json.Unmarshal([]byte(val), &value); err != nil { b.logger.Panicf("failed to unmarshal %v settings '%s': %v", key, val, err) @@ -51,7 +51,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) { } // contract metadata - cm, err := b.ms.Contract(jc.Request.Context(), req.ContractID) + cm, err := b.store.Contract(jc.Request.Context(), req.ContractID) if jc.Check("failed to fetch contract metadata", err) != nil { return } @@ -104,7 +104,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) { } // record spending - err = b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ + err = b.store.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ { ContractSpending: api.ContractSpending{ FundAccount: deposit.Add(cost), @@ -181,6 +181,26 @@ func (b *Bus) consensusNetworkHandler(jc jape.Context) { }) } +func (b *Bus) postSystemSQLite3BackupHandler(jc jape.Context) { + var req api.BackupRequest + if jc.Decode(&req) != nil { + return + } + switch req.Database { + case "main", "metrics": + default: + jc.Error(api.ErrInvalidDatabase, http.StatusBadRequest) + return + } + err := b.store.Backup(jc.Request.Context(), req.Database, req.Path) + if errors.Is(err, api.ErrBackupNotSupported) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("failed to backup", err) != nil { + return + } +} + func (b *Bus) txpoolFeeHandler(jc jape.Context) { jc.Encode(b.cm.RecommendedFee()) } @@ -204,7 +224,7 @@ func (b *Bus) txpoolBroadcastHandler(jc jape.Context) { } func (b *Bus) bucketsHandlerGET(jc jape.Context) { - resp, err := b.ms.ListBuckets(jc.Request.Context()) + resp, err := b.store.ListBuckets(jc.Request.Context()) if jc.Check("couldn't list buckets", err) != nil { return } @@ -218,7 +238,7 @@ func (b *Bus) bucketsHandlerPOST(jc jape.Context) { } else if bucket.Name == "" { jc.Error(errors.New("no name provided"), http.StatusBadRequest) return - } else if jc.Check("failed to create bucket", b.ms.CreateBucket(jc.Request.Context(), bucket.Name, bucket.Policy)) != nil { + } else if jc.Check("failed to create bucket", b.store.CreateBucket(jc.Request.Context(), bucket.Name, bucket.Policy)) != nil { return } } @@ -230,7 +250,7 @@ func (b *Bus) bucketsHandlerPolicyPUT(jc jape.Context) { } else if bucket := jc.PathParam("name"); bucket == "" { jc.Error(errors.New("no bucket name provided"), http.StatusBadRequest) return - } else if jc.Check("failed to create bucket", b.ms.UpdateBucketPolicy(jc.Request.Context(), bucket, req.Policy)) != nil { + } else if jc.Check("failed to create bucket", b.store.UpdateBucketPolicy(jc.Request.Context(), bucket, req.Policy)) != nil { return } } @@ -242,7 +262,7 @@ func (b *Bus) bucketHandlerDELETE(jc jape.Context) { } else if name == "" { jc.Error(errors.New("no name provided"), http.StatusBadRequest) return - } else if jc.Check("failed to delete bucket", b.ms.DeleteBucket(jc.Request.Context(), name)) != nil { + } else if jc.Check("failed to delete bucket", b.store.DeleteBucket(jc.Request.Context(), name)) != nil { return } } @@ -255,7 +275,7 @@ func (b *Bus) bucketHandlerGET(jc jape.Context) { jc.Error(errors.New("parameter 'name' is required"), http.StatusBadRequest) return } - bucket, err := b.ms.Bucket(jc.Request.Context(), name) + bucket, err := b.store.Bucket(jc.Request.Context(), name) if errors.Is(err, api.ErrBucketNotFound) { jc.Error(err, http.StatusNotFound) return @@ -598,7 +618,7 @@ func (b *Bus) hostsHandlerGETDeprecated(jc jape.Context) { } // fetch hosts - hosts, err := b.hs.SearchHosts(jc.Request.Context(), "", api.HostFilterModeAllowed, api.UsabilityFilterModeAll, "", nil, offset, limit) + hosts, err := b.store.SearchHosts(jc.Request.Context(), "", api.HostFilterModeAllowed, api.UsabilityFilterModeAll, "", nil, offset, limit) if jc.Check(fmt.Sprintf("couldn't fetch hosts %d-%d", offset, offset+limit), err) != nil { return } @@ -615,7 +635,7 @@ func (b *Bus) searchHostsHandlerPOST(jc jape.Context) { // - properly default search params (currently no defaults are set) // - properly validate and return 400 (currently validation is done in autopilot and the store) - hosts, err := b.hs.SearchHosts(jc.Request.Context(), req.AutopilotID, req.FilterMode, req.UsabilityMode, req.AddressContains, req.KeyIn, req.Offset, req.Limit) + hosts, err := b.store.SearchHosts(jc.Request.Context(), req.AutopilotID, req.FilterMode, req.UsabilityMode, req.AddressContains, req.KeyIn, req.Offset, req.Limit) if jc.Check(fmt.Sprintf("couldn't fetch hosts %d-%d", req.Offset, req.Offset+req.Limit), err) != nil { return } @@ -635,7 +655,7 @@ func (b *Bus) hostsRemoveHandlerPOST(jc jape.Context) { jc.Error(errors.New("maxConsecutiveScanFailures must be non-zero"), http.StatusBadRequest) return } - removed, err := b.hs.RemoveOfflineHosts(jc.Request.Context(), hrr.MaxConsecutiveScanFailures, time.Duration(hrr.MaxDowntimeHours)) + removed, err := b.store.RemoveOfflineHosts(jc.Request.Context(), hrr.MaxConsecutiveScanFailures, time.Duration(hrr.MaxDowntimeHours)) if jc.Check("couldn't remove offline hosts", err) != nil { return } @@ -649,7 +669,7 @@ func (b *Bus) hostsScanningHandlerGET(jc jape.Context) { if jc.DecodeForm("offset", &offset) != nil || jc.DecodeForm("limit", &limit) != nil || jc.DecodeForm("lastScan", (*api.TimeRFC3339)(&maxLastScan)) != nil { return } - hosts, err := b.hs.HostsForScanning(jc.Request.Context(), maxLastScan, offset, limit) + hosts, err := b.store.HostsForScanning(jc.Request.Context(), maxLastScan, offset, limit) if jc.Check(fmt.Sprintf("couldn't fetch hosts %d-%d", offset, offset+limit), err) != nil { return } @@ -661,7 +681,7 @@ func (b *Bus) hostsPubkeyHandlerGET(jc jape.Context) { if jc.DecodeParam("hostkey", &hostKey) != nil { return } - host, err := b.hs.Host(jc.Request.Context(), hostKey) + host, err := b.store.Host(jc.Request.Context(), hostKey) if jc.Check("couldn't load host", err) == nil { jc.Encode(host) } @@ -672,7 +692,7 @@ func (b *Bus) hostsResetLostSectorsPOST(jc jape.Context) { if jc.DecodeParam("hostkey", &hostKey) != nil { return } - err := b.hs.ResetLostSectors(jc.Request.Context(), hostKey) + err := b.store.ResetLostSectors(jc.Request.Context(), hostKey) if jc.Check("couldn't reset lost sectors", err) != nil { return } @@ -683,7 +703,7 @@ func (b *Bus) hostsScanHandlerPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - if jc.Check("failed to record scans", b.hs.RecordHostScans(jc.Request.Context(), req.Scans)) != nil { + if jc.Check("failed to record scans", b.store.RecordHostScans(jc.Request.Context(), req.Scans)) != nil { return } } @@ -693,7 +713,7 @@ func (b *Bus) hostsPricetableHandlerPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - if jc.Check("failed to record interactions", b.hs.RecordPriceTables(jc.Request.Context(), req.PriceTableUpdates)) != nil { + if jc.Check("failed to record interactions", b.store.RecordPriceTables(jc.Request.Context(), req.PriceTableUpdates)) != nil { return } } @@ -703,13 +723,13 @@ func (b *Bus) contractsSpendingHandlerPOST(jc jape.Context) { if jc.Decode(&records) != nil { return } - if jc.Check("failed to record spending metrics for contract", b.ms.RecordContractSpending(jc.Request.Context(), records)) != nil { + if jc.Check("failed to record spending metrics for contract", b.store.RecordContractSpending(jc.Request.Context(), records)) != nil { return } } func (b *Bus) hostsAllowlistHandlerGET(jc jape.Context) { - allowlist, err := b.hs.HostAllowlist(jc.Request.Context()) + allowlist, err := b.store.HostAllowlist(jc.Request.Context()) if jc.Check("couldn't load allowlist", err) == nil { jc.Encode(allowlist) } @@ -722,14 +742,14 @@ func (b *Bus) hostsAllowlistHandlerPUT(jc jape.Context) { if len(req.Add)+len(req.Remove) > 0 && req.Clear { jc.Error(errors.New("cannot add or remove entries while clearing the allowlist"), http.StatusBadRequest) return - } else if jc.Check("couldn't update allowlist entries", b.hs.UpdateHostAllowlistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { + } else if jc.Check("couldn't update allowlist entries", b.store.UpdateHostAllowlistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { return } } } func (b *Bus) hostsBlocklistHandlerGET(jc jape.Context) { - blocklist, err := b.hs.HostBlocklist(jc.Request.Context()) + blocklist, err := b.store.HostBlocklist(jc.Request.Context()) if jc.Check("couldn't load blocklist", err) == nil { jc.Encode(blocklist) } @@ -742,7 +762,7 @@ func (b *Bus) hostsBlocklistHandlerPUT(jc jape.Context) { if len(req.Add)+len(req.Remove) > 0 && req.Clear { jc.Error(errors.New("cannot add or remove entries while clearing the blocklist"), http.StatusBadRequest) return - } else if jc.Check("couldn't update blocklist entries", b.hs.UpdateHostBlocklistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { + } else if jc.Check("couldn't update blocklist entries", b.store.UpdateHostBlocklistEntries(ctx, req.Add, req.Remove, req.Clear)) != nil { return } } @@ -753,7 +773,7 @@ func (b *Bus) contractsHandlerGET(jc jape.Context) { if jc.DecodeForm("contractset", &cs) != nil { return } - contracts, err := b.ms.Contracts(jc.Request.Context(), api.ContractsOpts{ + contracts, err := b.store.Contracts(jc.Request.Context(), api.ContractsOpts{ ContractSet: cs, }) if jc.Check("couldn't load contracts", err) == nil { @@ -767,7 +787,7 @@ func (b *Bus) contractsRenewedIDHandlerGET(jc jape.Context) { return } - md, err := b.ms.RenewedContract(jc.Request.Context(), id) + md, err := b.store.RenewedContract(jc.Request.Context(), id) if jc.Check("faild to fetch renewed contract", err) == nil { jc.Encode(md) } @@ -779,7 +799,7 @@ func (b *Bus) contractsArchiveHandlerPOST(jc jape.Context) { return } - if jc.Check("failed to archive contracts", b.ms.ArchiveContracts(jc.Request.Context(), toArchive)) == nil { + if jc.Check("failed to archive contracts", b.store.ArchiveContracts(jc.Request.Context(), toArchive)) == nil { for fcid, reason := range toArchive { b.broadcastAction(webhooks.Event{ Module: api.ModuleContract, @@ -795,7 +815,7 @@ func (b *Bus) contractsArchiveHandlerPOST(jc jape.Context) { } func (b *Bus) contractsSetsHandlerGET(jc jape.Context) { - sets, err := b.ms.ContractSets(jc.Request.Context()) + sets, err := b.store.ContractSets(jc.Request.Context()) if jc.Check("couldn't fetch contract sets", err) == nil { jc.Encode(sets) } @@ -808,7 +828,7 @@ func (b *Bus) contractsSetHandlerPUT(jc jape.Context) { return } else if jc.Decode(&req) != nil { return - } else if jc.Check("could not add contracts to set", b.ms.UpdateContractSet(jc.Request.Context(), set, req.ToAdd, req.ToRemove)) != nil { + } else if jc.Check("could not add contracts to set", b.store.UpdateContractSet(jc.Request.Context(), set, req.ToAdd, req.ToRemove)) != nil { return } else { b.broadcastAction(webhooks.Event{ @@ -826,7 +846,7 @@ func (b *Bus) contractsSetHandlerPUT(jc jape.Context) { func (b *Bus) contractsSetHandlerDELETE(jc jape.Context) { if set := jc.PathParam("set"); set != "" { - jc.Check("could not remove contract set", b.ms.RemoveContractSet(jc.Request.Context(), set)) + jc.Check("could not remove contract set", b.store.RemoveContractSet(jc.Request.Context(), set)) } } @@ -907,7 +927,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { }() // fetch the contract from the bus - c, err := b.ms.Contract(ctx, fcid) + c, err := b.store.Contract(ctx, fcid) if errors.Is(err, api.ErrContractNotFound) { jc.Error(err, http.StatusNotFound) return @@ -923,7 +943,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { // prune the contract rev, spending, pruned, remaining, err := b.rhp2.PruneContract(pruneCtx, b.deriveRenterKey(c.HostKey), gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) { - indices, err := b.ms.PrunableContractRoots(ctx, fcid, roots) + indices, err := b.store.PrunableContractRoots(ctx, fcid, roots) if err != nil { return nil, err } else if len(indices) > len(roots) { @@ -951,7 +971,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { // record spending if !spending.Total().IsZero() { - b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ + b.store.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ { ContractSpending: spending, ContractID: fcid, @@ -977,7 +997,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { } func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) { - sizes, err := b.ms.ContractSizes(jc.Request.Context()) + sizes, err := b.store.ContractSizes(jc.Request.Context()) if jc.Check("failed to fetch contract sizes", err) != nil { return } @@ -1027,7 +1047,7 @@ func (b *Bus) contractSizeHandlerGET(jc jape.Context) { return } - size, err := b.ms.ContractSize(jc.Request.Context(), id) + size, err := b.store.ContractSize(jc.Request.Context(), id) if errors.Is(err, api.ErrContractNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1067,7 +1087,7 @@ func (b *Bus) contractIDHandlerGET(jc jape.Context) { if jc.DecodeParam("id", &id) != nil { return } - c, err := b.ms.Contract(jc.Request.Context(), id) + c, err := b.store.Contract(jc.Request.Context(), id) if jc.Check("couldn't load contract", err) == nil { jc.Encode(c) } @@ -1125,7 +1145,7 @@ func (b *Bus) contractIDRenewHandlerPOST(jc jape.Context) { } // fetch the contract - c, err := b.ms.Contract(ctx, fcid) + c, err := b.store.Contract(ctx, fcid) if errors.Is(err, api.ErrContractNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1134,7 +1154,7 @@ func (b *Bus) contractIDRenewHandlerPOST(jc jape.Context) { } // fetch the host - h, err := b.hs.Host(ctx, c.HostKey) + h, err := b.store.Host(ctx, c.HostKey) if jc.Check("couldn't fetch host", err) != nil { return } @@ -1204,7 +1224,7 @@ func (b *Bus) contractIDRootsHandlerGET(jc jape.Context) { return } - roots, err := b.ms.ContractRoots(jc.Request.Context(), id) + roots, err := b.store.ContractRoots(jc.Request.Context(), id) if jc.Check("couldn't fetch contract sectors", err) == nil { jc.Encode(api.ContractRootsResponse{ Roots: roots, @@ -1218,11 +1238,11 @@ func (b *Bus) contractIDHandlerDELETE(jc jape.Context) { if jc.DecodeParam("id", &id) != nil { return } - jc.Check("couldn't remove contract", b.ms.ArchiveContract(jc.Request.Context(), id, api.ContractArchivalReasonRemoved)) + jc.Check("couldn't remove contract", b.store.ArchiveContract(jc.Request.Context(), id, api.ContractArchivalReasonRemoved)) } func (b *Bus) contractsAllHandlerDELETE(jc jape.Context) { - jc.Check("couldn't remove contracts", b.ms.ArchiveAllContracts(jc.Request.Context(), api.ContractArchivalReasonRemoved)) + jc.Check("couldn't remove contracts", b.store.ArchiveAllContracts(jc.Request.Context(), api.ContractArchivalReasonRemoved)) } func (b *Bus) searchObjectsHandlerGET(jc jape.Context) { @@ -1236,7 +1256,7 @@ func (b *Bus) searchObjectsHandlerGET(jc jape.Context) { if jc.DecodeForm("bucket", &bucket) != nil { return } - keys, err := b.ms.SearchObjects(jc.Request.Context(), bucket, key, offset, limit) + keys, err := b.store.SearchObjects(jc.Request.Context(), bucket, key, offset, limit) if jc.Check("couldn't list objects", err) != nil { return } @@ -1265,9 +1285,9 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) { var o api.Object var err error if onlymetadata { - o, err = b.ms.ObjectMetadata(jc.Request.Context(), bucket, path) + o, err = b.store.ObjectMetadata(jc.Request.Context(), bucket, path) } else { - o, err = b.ms.Object(jc.Request.Context(), bucket, path) + o, err = b.store.Object(jc.Request.Context(), bucket, path) } if errors.Is(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) @@ -1314,7 +1334,7 @@ func (b *Bus) objectEntriesHandlerGET(jc jape.Context, path string) { } // look for object entries - entries, hasMore, err := b.ms.ObjectEntries(jc.Request.Context(), bucket, path, prefix, sortBy, sortDir, marker, offset, limit) + entries, hasMore, err := b.store.ObjectEntries(jc.Request.Context(), bucket, path, prefix, sortBy, sortDir, marker, offset, limit) if jc.Check("couldn't list object entries", err) != nil { return } @@ -1329,7 +1349,7 @@ func (b *Bus) objectsHandlerPUT(jc jape.Context) { } else if aor.Bucket == "" { aor.Bucket = api.DefaultBucketName } - jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Metadata, aor.Object)) + jc.Check("couldn't store object", b.store.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Metadata, aor.Object)) } func (b *Bus) objectsCopyHandlerPOST(jc jape.Context) { @@ -1337,7 +1357,7 @@ func (b *Bus) objectsCopyHandlerPOST(jc jape.Context) { if jc.Decode(&orr) != nil { return } - om, err := b.ms.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType, orr.Metadata) + om, err := b.store.CopyObject(jc.Request.Context(), orr.SourceBucket, orr.DestinationBucket, orr.SourcePath, orr.DestinationPath, orr.MimeType, orr.Metadata) if jc.Check("couldn't copy object", err) != nil { return } @@ -1355,7 +1375,7 @@ func (b *Bus) objectsListHandlerPOST(jc jape.Context) { if req.Bucket == "" { req.Bucket = api.DefaultBucketName } - resp, err := b.ms.ListObjects(jc.Request.Context(), req.Bucket, req.Prefix, req.SortBy, req.SortDir, req.Marker, req.Limit) + resp, err := b.store.ListObjects(jc.Request.Context(), req.Bucket, req.Prefix, req.SortBy, req.SortDir, req.Marker, req.Limit) if errors.Is(err, api.ErrMarkerNotFound) { jc.Error(err, http.StatusBadRequest) return @@ -1378,7 +1398,7 @@ func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) { jc.Error(fmt.Errorf("can't rename dirs with mode %v", orr.Mode), http.StatusBadRequest) return } - jc.Check("couldn't rename object", b.ms.RenameObject(jc.Request.Context(), orr.Bucket, orr.From, orr.To, orr.Force)) + jc.Check("couldn't rename object", b.store.RenameObject(jc.Request.Context(), orr.Bucket, orr.From, orr.To, orr.Force)) return } else if orr.Mode == api.ObjectsRenameModeMulti { // Multi object rename. @@ -1386,7 +1406,7 @@ func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) { jc.Error(fmt.Errorf("can't rename file with mode %v", orr.Mode), http.StatusBadRequest) return } - jc.Check("couldn't rename objects", b.ms.RenameObjects(jc.Request.Context(), orr.Bucket, orr.From, orr.To, orr.Force)) + jc.Check("couldn't rename objects", b.store.RenameObjects(jc.Request.Context(), orr.Bucket, orr.From, orr.To, orr.Force)) return } else { // Invalid mode. @@ -1406,9 +1426,9 @@ func (b *Bus) objectsHandlerDELETE(jc jape.Context) { } var err error if batch { - err = b.ms.RemoveObjects(jc.Request.Context(), bucket, jc.PathParam("path")) + err = b.store.RemoveObjects(jc.Request.Context(), bucket, jc.PathParam("path")) } else { - err = b.ms.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("path")) + err = b.store.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("path")) } if errors.Is(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) @@ -1418,7 +1438,7 @@ func (b *Bus) objectsHandlerDELETE(jc jape.Context) { } func (b *Bus) slabbuffersHandlerGET(jc jape.Context) { - buffers, err := b.ms.SlabBuffers(jc.Request.Context()) + buffers, err := b.store.SlabBuffers(jc.Request.Context()) if jc.Check("couldn't get slab buffers info", err) != nil { return } @@ -1430,7 +1450,7 @@ func (b *Bus) objectsStatshandlerGET(jc jape.Context) { if jc.DecodeForm("bucket", &opts.Bucket) != nil { return } - info, err := b.ms.ObjectsStats(jc.Request.Context(), opts) + info, err := b.store.ObjectsStats(jc.Request.Context(), opts) if jc.Check("couldn't get objects stats", err) != nil { return } @@ -1454,7 +1474,7 @@ func (b *Bus) packedSlabsHandlerFetchPOST(jc jape.Context) { jc.Error(fmt.Errorf("contract_set must be non-empty"), http.StatusBadRequest) return } - slabs, err := b.ms.PackedSlabsForUpload(jc.Request.Context(), time.Duration(psrg.LockingDuration), psrg.MinShards, psrg.TotalShards, psrg.ContractSet, psrg.Limit) + slabs, err := b.store.PackedSlabsForUpload(jc.Request.Context(), time.Duration(psrg.LockingDuration), psrg.MinShards, psrg.TotalShards, psrg.ContractSet, psrg.Limit) if jc.Check("couldn't get packed slabs", err) != nil { return } @@ -1466,7 +1486,7 @@ func (b *Bus) packedSlabsHandlerDonePOST(jc jape.Context) { if jc.Decode(&psrp) != nil { return } - jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs)) + jc.Check("failed to mark packed slab(s) as uploaded", b.store.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs)) } func (b *Bus) sectorsHostRootHandlerDELETE(jc jape.Context) { @@ -1477,7 +1497,7 @@ func (b *Bus) sectorsHostRootHandlerDELETE(jc jape.Context) { } else if jc.DecodeParam("root", &root) != nil { return } - n, err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root) + n, err := b.store.DeleteHostSector(jc.Request.Context(), hk, root) if jc.Check("failed to mark sector as lost", err) != nil { return } else if n > 0 { @@ -1494,7 +1514,7 @@ func (b *Bus) slabObjectsHandlerGET(jc jape.Context) { if jc.DecodeForm("bucket", &bucket) != nil { return } - objects, err := b.ms.ObjectsBySlabKey(jc.Request.Context(), bucket, key) + objects, err := b.store.ObjectsBySlabKey(jc.Request.Context(), bucket, key) if jc.Check("failed to retrieve objects by slab", err) != nil { return } @@ -1506,7 +1526,7 @@ func (b *Bus) slabHandlerGET(jc jape.Context) { if jc.DecodeParam("key", &key) != nil { return } - slab, err := b.ms.Slab(jc.Request.Context(), key) + slab, err := b.store.Slab(jc.Request.Context(), key) if errors.Is(err, api.ErrSlabNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1520,18 +1540,18 @@ func (b *Bus) slabHandlerGET(jc jape.Context) { func (b *Bus) slabHandlerPUT(jc jape.Context) { var usr api.UpdateSlabRequest if jc.Decode(&usr) == nil { - jc.Check("couldn't update slab", b.ms.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet)) + jc.Check("couldn't update slab", b.store.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet)) } } func (b *Bus) slabsRefreshHealthHandlerPOST(jc jape.Context) { - jc.Check("failed to recompute health", b.ms.RefreshHealth(jc.Request.Context())) + jc.Check("failed to recompute health", b.store.RefreshHealth(jc.Request.Context())) } func (b *Bus) slabsMigrationHandlerPOST(jc jape.Context) { var msr api.MigrationSlabsRequest if jc.Decode(&msr) == nil { - if slabs, err := b.ms.UnhealthySlabs(jc.Request.Context(), msr.HealthCutoff, msr.ContractSet, msr.Limit); jc.Check("couldn't fetch slabs for migration", err) == nil { + if slabs, err := b.store.UnhealthySlabs(jc.Request.Context(), msr.HealthCutoff, msr.ContractSet, msr.Limit); jc.Check("couldn't fetch slabs for migration", err) == nil { jc.Encode(api.UnhealthySlabsResponse{ Slabs: slabs, }) @@ -1558,7 +1578,7 @@ func (b *Bus) slabsPartialHandlerGET(jc jape.Context) { jc.Error(fmt.Errorf("length must be positive and offset must be non-negative"), http.StatusBadRequest) return } - data, err := b.ms.FetchPartialSlab(jc.Request.Context(), key, uint32(offset), uint32(length)) + data, err := b.store.FetchPartialSlab(jc.Request.Context(), key, uint32(offset), uint32(length)) if errors.Is(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1598,7 +1618,7 @@ func (b *Bus) slabsPartialHandlerPOST(jc jape.Context) { if jc.Check("failed to read request body", err) != nil { return } - slabs, bufferSize, err := b.ms.AddPartialSlab(jc.Request.Context(), data, uint8(minShards), uint8(totalShards), contractSet) + slabs, bufferSize, err := b.store.AddPartialSlab(jc.Request.Context(), data, uint8(minShards), uint8(totalShards), contractSet) if jc.Check("failed to add partial slab", err) != nil { return } @@ -1614,7 +1634,7 @@ func (b *Bus) slabsPartialHandlerPOST(jc jape.Context) { } func (b *Bus) settingsHandlerGET(jc jape.Context) { - if settings, err := b.ss.Settings(jc.Request.Context()); jc.Check("couldn't load settings", err) == nil { + if settings, err := b.store.Settings(jc.Request.Context()); jc.Check("couldn't load settings", err) == nil { jc.Encode(settings) } } @@ -1628,7 +1648,7 @@ func (b *Bus) settingKeyHandlerGET(jc jape.Context) { return } - setting, err := b.ss.Setting(jc.Request.Context(), jc.PathParam("key")) + setting, err := b.store.Setting(jc.Request.Context(), jc.PathParam("key")) if errors.Is(err, api.ErrSettingNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1648,7 +1668,7 @@ func (b *Bus) settingKeyHandlerGET(jc jape.Context) { pps.Autopilots = make(map[string]api.AutopilotPins) } // populate the Autopilots map with the current autopilots - aps, err := b.as.Autopilots(jc.Request.Context()) + aps, err := b.store.Autopilots(jc.Request.Context()) if jc.Check("failed to fetch autopilots", err) != nil { return } @@ -1731,7 +1751,7 @@ func (b *Bus) settingKeyHandlerPUT(jc jape.Context) { b.pinMgr.TriggerUpdate() } - if jc.Check("could not update setting", b.ss.UpdateSetting(jc.Request.Context(), key, string(data))) == nil { + if jc.Check("could not update setting", b.store.UpdateSetting(jc.Request.Context(), key, string(data))) == nil { b.broadcastAction(webhooks.Event{ Module: api.ModuleSetting, Event: api.EventUpdate, @@ -1751,7 +1771,7 @@ func (b *Bus) settingKeyHandlerDELETE(jc jape.Context) { return } - if jc.Check("could not delete setting", b.ss.DeleteSetting(jc.Request.Context(), key)) == nil { + if jc.Check("could not delete setting", b.store.DeleteSetting(jc.Request.Context(), key)) == nil { b.broadcastAction(webhooks.Event{ Module: api.ModuleSetting, Event: api.EventDelete, @@ -1772,7 +1792,7 @@ func (b *Bus) contractIDAncestorsHandler(jc jape.Context) { if jc.DecodeForm("minStartHeight", &minStartHeight) != nil { return } - ancestors, err := b.ms.AncestorContracts(jc.Request.Context(), fcid, uint64(minStartHeight)) + ancestors, err := b.store.AncestorContracts(jc.Request.Context(), fcid, uint64(minStartHeight)) if jc.Check("failed to fetch ancestor contracts", err) != nil { return } @@ -1852,14 +1872,14 @@ func (b *Bus) paramsHandlerGougingGET(jc jape.Context) { func (b *Bus) gougingParams(ctx context.Context) (api.GougingParams, error) { var gs api.GougingSettings - if gss, err := b.ss.Setting(ctx, api.SettingGouging); err != nil { + if gss, err := b.store.Setting(ctx, api.SettingGouging); err != nil { return api.GougingParams{}, err } else if err := json.Unmarshal([]byte(gss), &gs); err != nil { b.logger.Panicf("failed to unmarshal gouging settings '%s': %v", gss, err) } var rs api.RedundancySettings - if rss, err := b.ss.Setting(ctx, api.SettingRedundancy); err != nil { + if rss, err := b.store.Setting(ctx, api.SettingRedundancy); err != nil { return api.GougingParams{}, err } else if err := json.Unmarshal([]byte(rss), &rs); err != nil { b.logger.Panicf("failed to unmarshal redundancy settings '%s': %v", rss, err) @@ -1935,7 +1955,7 @@ func (b *Bus) accountsHandlerGET(jc jape.Context) { if jc.DecodeForm("owner", &owner) != nil { return } - accounts, err := b.accounts.Accounts(jc.Request.Context(), owner) + accounts, err := b.store.Accounts(jc.Request.Context(), owner) if err != nil { jc.Error(err, http.StatusInternalServerError) return @@ -1954,13 +1974,13 @@ func (b *Bus) accountsHandlerPOST(jc jape.Context) { return } } - if b.accounts.SaveAccounts(jc.Request.Context(), req.Accounts) != nil { + if b.store.SaveAccounts(jc.Request.Context(), req.Accounts) != nil { return } } func (b *Bus) autopilotsListHandlerGET(jc jape.Context) { - if autopilots, err := b.as.Autopilots(jc.Request.Context()); jc.Check("failed to fetch autopilots", err) == nil { + if autopilots, err := b.store.Autopilots(jc.Request.Context()); jc.Check("failed to fetch autopilots", err) == nil { jc.Encode(autopilots) } } @@ -1970,7 +1990,7 @@ func (b *Bus) autopilotsHandlerGET(jc jape.Context) { if jc.DecodeParam("id", &id) != nil { return } - ap, err := b.as.Autopilot(jc.Request.Context(), id) + ap, err := b.store.Autopilot(jc.Request.Context(), id) if errors.Is(err, api.ErrAutopilotNotFound) { jc.Error(err, http.StatusNotFound) return @@ -1998,7 +2018,7 @@ func (b *Bus) autopilotsHandlerPUT(jc jape.Context) { return } - if jc.Check("failed to update autopilot", b.as.UpdateAutopilot(jc.Request.Context(), ap)) == nil { + if jc.Check("failed to update autopilot", b.store.UpdateAutopilot(jc.Request.Context(), ap)) == nil { b.pinMgr.TriggerUpdate() } } @@ -2017,7 +2037,7 @@ func (b *Bus) autopilotHostCheckHandlerPUT(jc jape.Context) { return } - err := b.hs.UpdateHostCheck(jc.Request.Context(), id, hk, hc) + err := b.store.UpdateHostCheck(jc.Request.Context(), id, hk, hc) if errors.Is(err, api.ErrAutopilotNotFound) { jc.Error(err, http.StatusNotFound) return @@ -2147,7 +2167,7 @@ func (b *Bus) metricsHandlerDELETE(jc jape.Context) { return } - err := b.mtrcs.PruneMetrics(jc.Request.Context(), metric, cutoff) + err := b.store.PruneMetrics(jc.Request.Context(), metric, cutoff) if jc.Check("failed to prune metrics", err) != nil { return } @@ -2164,7 +2184,7 @@ func (b *Bus) metricsHandlerPUT(jc jape.Context) { if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil { jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest) return - } else if jc.Check("failed to record contract prune metric", b.mtrcs.RecordContractPruneMetric(jc.Request.Context(), req.Metrics...)) != nil { + } else if jc.Check("failed to record contract prune metric", b.store.RecordContractPruneMetric(jc.Request.Context(), req.Metrics...)) != nil { return } case api.MetricContractSetChurn: @@ -2173,7 +2193,7 @@ func (b *Bus) metricsHandlerPUT(jc jape.Context) { if err := json.NewDecoder(jc.Request.Body).Decode(&req); err != nil { jc.Error(fmt.Errorf("couldn't decode request type (%T): %w", req, err), http.StatusBadRequest) return - } else if jc.Check("failed to record contract churn metric", b.mtrcs.RecordContractSetChurnMetric(jc.Request.Context(), req.Metrics...)) != nil { + } else if jc.Check("failed to record contract churn metric", b.store.RecordContractSetChurnMetric(jc.Request.Context(), req.Metrics...)) != nil { return } default: @@ -2270,15 +2290,15 @@ func (b *Bus) metricsHandlerGET(jc jape.Context) { func (b *Bus) metrics(ctx context.Context, key string, start time.Time, n uint64, interval time.Duration, opts interface{}) (interface{}, error) { switch key { case api.MetricContract: - return b.mtrcs.ContractMetrics(ctx, start, n, interval, opts.(api.ContractMetricsQueryOpts)) + return b.store.ContractMetrics(ctx, start, n, interval, opts.(api.ContractMetricsQueryOpts)) case api.MetricContractPrune: - return b.mtrcs.ContractPruneMetrics(ctx, start, n, interval, opts.(api.ContractPruneMetricsQueryOpts)) + return b.store.ContractPruneMetrics(ctx, start, n, interval, opts.(api.ContractPruneMetricsQueryOpts)) case api.MetricContractSet: - return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts)) + return b.store.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts)) case api.MetricContractSetChurn: - return b.mtrcs.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts)) + return b.store.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts)) case api.MetricWallet: - return b.mtrcs.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts)) + return b.store.WalletMetrics(ctx, start, n, interval, opts.(api.WalletMetricsQueryOpts)) } return nil, fmt.Errorf("unknown metric '%s'", key) } @@ -2298,7 +2318,7 @@ func (b *Bus) multipartHandlerCreatePOST(jc jape.Context) { key = *req.Key } - resp, err := b.ms.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType, req.Metadata) + resp, err := b.store.CreateMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, key, req.MimeType, req.Metadata) if jc.Check("failed to create multipart upload", err) != nil { return } @@ -2310,7 +2330,7 @@ func (b *Bus) multipartHandlerAbortPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - err := b.ms.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID) + err := b.store.AbortMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID) if jc.Check("failed to abort multipart upload", err) != nil { return } @@ -2321,7 +2341,7 @@ func (b *Bus) multipartHandlerCompletePOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts, api.CompleteMultipartOptions{ + resp, err := b.store.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts, api.CompleteMultipartOptions{ Metadata: req.Metadata, }) if jc.Check("failed to complete multipart upload", err) != nil { @@ -2350,14 +2370,14 @@ func (b *Bus) multipartHandlerUploadPartPUT(jc jape.Context) { jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest) return } - err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices) + err := b.store.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices) if jc.Check("failed to upload part", err) != nil { return } } func (b *Bus) multipartHandlerUploadGET(jc jape.Context) { - resp, err := b.ms.MultipartUpload(jc.Request.Context(), jc.PathParam("id")) + resp, err := b.store.MultipartUpload(jc.Request.Context(), jc.PathParam("id")) if jc.Check("failed to get multipart upload", err) != nil { return } @@ -2369,7 +2389,7 @@ func (b *Bus) multipartHandlerListUploadsPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.MultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.PathMarker, req.UploadIDMarker, req.Limit) + resp, err := b.store.MultipartUploads(jc.Request.Context(), req.Bucket, req.Prefix, req.PathMarker, req.UploadIDMarker, req.Limit) if jc.Check("failed to list multipart uploads", err) != nil { return } @@ -2381,7 +2401,7 @@ func (b *Bus) multipartHandlerListPartsPOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.MultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit)) + resp, err := b.store.MultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit)) if jc.Check("failed to list multipart upload parts", err) != nil { return } diff --git a/internal/sql/sql.go b/internal/sql/sql.go index 2b5920f9d..8859f0a2f 100644 --- a/internal/sql/sql.go +++ b/internal/sql/sql.go @@ -78,6 +78,10 @@ func NewDB(db *sql.DB, log *zap.Logger, dbLockedMsgs []string, longQueryDuration }, nil } +func (s *DB) DB() *sql.DB { + return s.db +} + // exec executes a query without returning any rows. The args are for // any placeholder parameters in the query. func (s *DB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) { diff --git a/stores/backups.go b/stores/backups.go new file mode 100644 index 000000000..bf4c9a348 --- /dev/null +++ b/stores/backups.go @@ -0,0 +1,29 @@ +package stores + +import ( + "context" + + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/stores/sql/sqlite" +) + +func (s *SQLStore) Backup(ctx context.Context, dbID, destPath string) error { + switch dbID { + case "main": + switch db := s.db.(type) { + case *sqlite.MainDatabase: + return db.Backup(ctx, dbID, destPath) + default: + return api.ErrBackupNotSupported + } + case "metrics": + switch db := s.dbMetrics.(type) { + case *sqlite.MetricsDatabase: + return db.Backup(ctx, dbID, destPath) + default: + return api.ErrBackupNotSupported + } + default: + return api.ErrInvalidDatabase + } +} diff --git a/stores/sql/sqlite/backup.go b/stores/sql/sqlite/backup.go new file mode 100644 index 000000000..37236b719 --- /dev/null +++ b/stores/sql/sqlite/backup.go @@ -0,0 +1,134 @@ +package sqlite + +import ( + "context" + dsql "database/sql" + "errors" + "fmt" + "os" + + "github.com/mattn/go-sqlite3" +) + +func sqlConn(ctx context.Context, db *dsql.DB) (c *sqlite3.SQLiteConn, err error) { + if err := db.PingContext(ctx); err != nil { + return nil, fmt.Errorf("failed to ping database: %w", err) + } + + raw, err := db.Conn(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create connection: %w", err) + } + err = raw.Raw(func(driverConn any) error { + var ok bool + c, ok = driverConn.(*sqlite3.SQLiteConn) + if !ok { + return errors.New("connection is not a SQLiteConn") + } + return nil + }) + return +} + +// backupDB is a helper function that creates a backup of the source database at +// the specified path. The backup is created using the SQLite backup API, which +// is safe to use with a live database. +func backupDB(ctx context.Context, src *dsql.DB, destPath string) (err error) { + // create the destination database + dest, err := dsql.Open("sqlite3", destPath) + if err != nil { + return fmt.Errorf("failed to open destination database: %w", err) + } + defer func() { + // errors are ignored + dest.Close() + if err != nil { + // remove the destination file if an error occurred during backup + os.Remove(destPath) + } + }() + + // initialize the source conn + sc, err := sqlConn(ctx, src) + if err != nil { + return fmt.Errorf("failed to create source connection: %w", err) + } + defer sc.Close() + + // initialize the destination conn + dc, err := sqlConn(ctx, dest) + if err != nil { + return fmt.Errorf("failed to create destination connection: %w", err) + } + defer dc.Close() + + // start the backup + // NOTE: 'main' referes to the schema of the database + backup, err := dc.Backup("main", sc, "main") + if err != nil { + return fmt.Errorf("failed to create backup: %w", err) + } + // ensure the backup is closed + defer func() { + if err := backup.Finish(); err != nil { + panic(fmt.Errorf("failed to finish backup: %w", err)) + } + }() + + for step := 1; ; step++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if done, err := backup.Step(100); err != nil { + return fmt.Errorf("backup step %d failed: %w", step, err) + } else if done { + break + } + } + if _, err := dest.Exec("VACUUM"); err != nil { + return fmt.Errorf("failed to vacuum destination database: %w", err) + } + return nil +} + +func (s *MainDatabase) Backup(ctx context.Context, dbID, destPath string) error { + return backupDB(ctx, s.db.DB(), destPath) +} + +func (s *MetricsDatabase) Backup(ctx context.Context, dbID, destPath string) error { + return backupDB(ctx, s.db.DB(), destPath) +} + +// Backup creates a backup of the database at the specified path. The backup is +// created using the SQLite backup API, which is safe to use with a +// live database. +// +// This function should be used if the database is not already open in the +// current process. If the database is already open, use Store.Backup. +func Backup(ctx context.Context, srcPath, destPath string) (err error) { + // ensure the source file exists + if _, err := os.Stat(srcPath); err != nil { + return fmt.Errorf("source file does not exist: %w", err) + } + + // prevent overwriting the destination file + if _, err := os.Stat(destPath); !errors.Is(err, os.ErrNotExist) { + return errors.New("destination file already exists") + } else if destPath == "" { + return errors.New("empty destination path") + } + + // open a new connection to the source database. We don't want to run + // any migrations or other operations on the source database since it + // might be open in another process. + src, err := dsql.Open("sqlite3", srcPath) + if err != nil { + return fmt.Errorf("failed to open source database: %w", err) + } + defer src.Close() + + return backupDB(ctx, src, destPath) +} diff --git a/stores/sql/sqlite/common.go b/stores/sql/sqlite/common.go index fd46688b8..6a061e84f 100644 --- a/stores/sql/sqlite/common.go +++ b/stores/sql/sqlite/common.go @@ -22,7 +22,7 @@ var deadlockMsgs = []string{ var migrationsFs embed.FS func Open(path string) (*dsql.DB, error) { - return dsql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=30000&_foreign_keys=1&_journal_mode=WAL&_secure_delete=false&_cache_size=65536", path)) + return dsql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=30000&_foreign_keys=1&_journal_mode=WAL&_secure_delete=false&_auto_vacuum=INCREMENTAL&_cache_size=65536", path)) } func OpenEphemeral(name string) (*dsql.DB, error) { From 5a5c3802f603b8f76e473099bd4898da7e9afb16 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 25 Sep 2024 16:14:00 +0200 Subject: [PATCH 2/7] cmd: add cmd to cli for backing up a sqlite database --- cmd/renterd/commands.go | 11 +++++++++++ cmd/renterd/main.go | 8 +++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/renterd/commands.go b/cmd/renterd/commands.go index 819eb9de5..554dd0adb 100644 --- a/cmd/renterd/commands.go +++ b/cmd/renterd/commands.go @@ -1,16 +1,27 @@ package main import ( + "context" + "flag" "fmt" + "log" "os" "go.sia.tech/core/types" "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/build" "go.sia.tech/renterd/config" + "go.sia.tech/renterd/stores/sql/sqlite" "gopkg.in/yaml.v3" ) +func cmdBackup() { + err := sqlite.Backup(context.Background(), flag.Arg(2), flag.Arg(3)) + if err != nil { + log.Fatal("failed to create backup", err) + } +} + func cmdBuildConfig(cfg *config.Config) { if _, err := os.Stat("renterd.yml"); err == nil { if !promptYesNo("renterd.yml already exists. Would you like to overwrite it?") { diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index a32ecceed..b557512cb 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -23,10 +23,13 @@ Usage: ` // usageFooter is the footer for the CLI usage text. usageFooter = ` -There are 3 commands: +There are 4 commands: - version: prints the network as well as build information - config: builds a YAML config file through a series of prompts - seed: generates a new seed and prints the recovery phrase + - sqlite backup : backs up the sqlite database at a + specified source path to the specified destination path + (safe to use while renterd is running) See the documentation (https://docs.sia.tech/) for more information and examples on how to configure and use renterd. @@ -52,6 +55,9 @@ func main() { } else if flag.Arg(0) == "config" { cmdBuildConfig(&cfg) return + } else if flag.Arg(0) == "sqlite" && flag.Arg(1) == "backup" && + flag.Arg(2) != "" && flag.Arg(3) != "" { + cmdBackup() } else if flag.Arg(0) != "" { flag.Usage() return From dd500bf39a642ed07ea8abfc4f4b60d468d6ce88 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 25 Sep 2024 16:30:13 +0200 Subject: [PATCH 3/7] e2e: add TestBackup --- cmd/renterd/main.go | 1 + internal/test/e2e/cluster_test.go | 41 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index b557512cb..4c172143a 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -58,6 +58,7 @@ func main() { } else if flag.Arg(0) == "sqlite" && flag.Arg(1) == "backup" && flag.Arg(2) != "" && flag.Arg(3) != "" { cmdBackup() + return } else if flag.Arg(0) != "" { flag.Usage() return diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index ef81e8d39..f13ec8149 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -30,6 +30,7 @@ import ( "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" + "go.sia.tech/renterd/stores/sql/sqlite" "go.uber.org/zap" "lukechampine.com/frand" ) @@ -2707,3 +2708,43 @@ func TestHostScan(t *testing.T) { t.Fatalf("expected 1 hosts, got %v", len(toScan)) } } + +func TestBackup(t *testing.T) { + cluster := newTestCluster(t, clusterOptsDefault) + defer cluster.Shutdown() + bus := cluster.Bus + + // test that backup fails for MySQL + isSqlite := config.MySQLConfigFromEnv().URI == "" + if !isSqlite { + err := bus.Backup(context.Background(), "", "") + cluster.tt.AssertIs(err, api.ErrBackupNotSupported) + return + } + + // test creating a backup + tmpDir := t.TempDir() + mainDst := filepath.Join(tmpDir, "main.sqlite") + metricsDst := filepath.Join(tmpDir, "metrics.sqlite") + cluster.tt.OK(bus.Backup(context.Background(), "main", mainDst)) + cluster.tt.OK(bus.Backup(context.Background(), "metrics", metricsDst)) + cluster.tt.OKAll(os.Stat(mainDst)) + cluster.tt.OKAll(os.Stat(metricsDst)) + + // test creating backing up an invalid db + invalidDst := filepath.Join(tmpDir, "invalid.sqlite") + err := bus.Backup(context.Background(), "invalid", invalidDst) + cluster.tt.AssertIs(err, api.ErrInvalidDatabase) + _, err = os.Stat(invalidDst) + if !os.IsNotExist(err) { + t.Fatal("expected file to not exist") + } + + // ping backups + dbMain, err := sqlite.Open(mainDst) + cluster.tt.OK(err) + cluster.tt.OK(dbMain.Ping()) + dbMetrics, err := sqlite.Open(metricsDst) + cluster.tt.OK(err) + cluster.tt.OK(dbMetrics.Ping()) +} From 8a6e877e1522c4f826659621ef9c8ba54125eb60 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 25 Sep 2024 17:26:35 +0200 Subject: [PATCH 4/7] e2e: close db conns --- internal/test/e2e/cluster_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index f13ec8149..56130769b 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -2743,8 +2743,11 @@ func TestBackup(t *testing.T) { // ping backups dbMain, err := sqlite.Open(mainDst) cluster.tt.OK(err) + defer dbMain.Close() cluster.tt.OK(dbMain.Ping()) + dbMetrics, err := sqlite.Open(metricsDst) cluster.tt.OK(err) + defer dbMetrics.Close() cluster.tt.OK(dbMetrics.Ping()) } From 567e8de18041adbda411ff1d422b0bc42fac68e2 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 21 Oct 2024 15:35:46 +0200 Subject: [PATCH 5/7] e2e: fix TestBackup --- internal/test/e2e/cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 56130769b..7a65b685e 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -2717,7 +2717,7 @@ func TestBackup(t *testing.T) { // test that backup fails for MySQL isSqlite := config.MySQLConfigFromEnv().URI == "" if !isSqlite { - err := bus.Backup(context.Background(), "", "") + err := bus.Backup(context.Background(), "main", "backup.sql") cluster.tt.AssertIs(err, api.ErrBackupNotSupported) return } From 8e8dbcba6d4fe5c8081c7b0a9c59600d57c345d3 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 22 Oct 2024 11:34:22 +0200 Subject: [PATCH 6/7] bus: rename route --- bus/bus.go | 4 ++-- bus/client/client.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bus/bus.go b/bus/bus.go index 27c07f381..6d49a502e 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -189,7 +189,7 @@ type ( // BackupStore is the interface of a store that can be backed up. BackupStore interface { - Backup(ctx context.Context, dbID, destPath string) error + Backup(ctx context.Context, dbID, dst string) error } // A ChainStore stores information about the chain. @@ -492,7 +492,7 @@ func (b *Bus) Handler() http.Handler { "POST /syncer/connect": b.syncerConnectHandler, "GET /syncer/peers": b.syncerPeersHandler, - "POST /system/database/backup": b.postSystemSQLite3BackupHandler, + "POST /system/sqlite3/backup": b.postSystemSQLite3BackupHandler, "GET /txpool/recommendedfee": b.txpoolFeeHandler, "GET /txpool/transactions": b.txpoolTransactionsHandler, diff --git a/bus/client/client.go b/bus/client/client.go index ee3dacc6c..e5460c08f 100644 --- a/bus/client/client.go +++ b/bus/client/client.go @@ -23,7 +23,7 @@ func New(addr, password string) *Client { } func (c *Client) Backup(ctx context.Context, database, dstPath string) (err error) { - err = c.c.WithContext(ctx).POST("/system/database/backup", api.BackupRequest{ + err = c.c.WithContext(ctx).POST("/system/sqlite3/backup", api.BackupRequest{ Database: database, Path: dstPath, }, nil) From ac75808df313a8f200eb007001123821609515aa Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 22 Oct 2024 15:46:55 +0200 Subject: [PATCH 7/7] bus: add context to error --- bus/routes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bus/routes.go b/bus/routes.go index 5c34025e4..0f52eefda 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -189,7 +189,7 @@ func (b *Bus) postSystemSQLite3BackupHandler(jc jape.Context) { switch req.Database { case "main", "metrics": default: - jc.Error(api.ErrInvalidDatabase, http.StatusBadRequest) + jc.Error(fmt.Errorf("%w: valid values are 'main' and 'metrics'", api.ErrInvalidDatabase), http.StatusBadRequest) return } err := b.store.Backup(jc.Request.Context(), req.Database, req.Path)