From cabf92251e86d49248489ab2122de882290942ee Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Aug 2024 18:20:44 +0300 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Signed-off-by: Ekaterina Pavlova --- cli/server/dump_bin.go | 92 +++++ cli/server/server.go | 7 + config/protocol.testnet.yml | 8 + go.mod | 2 +- internal/fakechain/fakechain.go | 6 + pkg/config/application_config.go | 20 +- pkg/config/blockfetcher_config.go | 33 ++ pkg/network/server.go | 27 +- pkg/network/server_config.go | 4 + pkg/services/blockfetcher/blockfetcher.go | 353 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 74 ++++ pkg/services/oracle/neofs/neofs.go | 79 +++- 12 files changed, 683 insertions(+), 22 deletions(-) create mode 100644 cli/server/dump_bin.go create mode 100644 pkg/config/blockfetcher_config.go create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go new file mode 100644 index 0000000000..59d713c956 --- /dev/null +++ b/cli/server/dump_bin.go @@ -0,0 +1,92 @@ +package server + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/urfave/cli/v2" +) + +func fetchBin(ctx *cli.Context) error { + var err error + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + count := uint32(ctx.Uint("count")) + start := uint32(ctx.Uint("start")) + + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + chainCount := chain.BlockHeight() + 1 + if start+count > chainCount { + return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainCount-1, count, start), 1) + } + if count == 0 { + count = chainCount - start + } + + testDir := ctx.String("out") + if _, err = os.Stat(testDir); os.IsNotExist(err) { + if err = os.MkdirAll(testDir, 0755); err != nil { + return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", testDir, err), 1) + } + } + + for i := start; i < start+count; i++ { + bh := chain.GetHeaderHash(i) + blk, err2 := chain.GetBlock(bh) + if err2 != nil { + return cli.Exit(fmt.Sprintf("failed to get block %d: %v", i, err), 1) + } + filePath := filepath.Join(testDir, fmt.Sprintf("block-%d.bin", i)) + if err = saveBlockToFile(blk, filePath); err != nil { + return cli.Exit(fmt.Sprintf("failed to save block %d to file: %v", i, err), 1) + } + } + return nil +} + +func saveBlockToFile(blk *block.Block, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + writer := io.NewBinWriterFromIO(file) + + var buf = io.NewBufBinWriter() + blk.EncodeBinary(buf.BinWriter) + bytes := buf.Bytes() + + writer.WriteU32LE(uint32(len(bytes))) + blk.EncodeBinary(writer) + if writer.Err != nil { + return writer.Err + } + return nil +} diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..c1daa41c4b 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -105,6 +105,13 @@ func NewCommands() []*cli.Command { Action: dumpDB, Flags: cfgCountOutFlags, }, + { + Name: "fetch-bin", + Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format", + UsageText: "neo-go db fetch-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: fetchBin, + Flags: cfgCountOutFlags, + }, { Name: "restore", Usage: "Restore blocks from the file", diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..4b429bc51c 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,11 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" + NeoFSBlockFetcher: + Enabled: true + Addresses: + - st1.t5.fs.neo.org:8080 + Timeout: 10s + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + Mode: "indexSearch" + diff --git a/go.mod b/go.mod index 73d0953100..a6d73c5ee3 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 golang.org/x/tools v0.19.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -67,7 +68,6 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.33.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..5420bef155 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac panic("TODO") } +// LastBatch returns last persisted storage batch. +func (chain *FakeChain) LastBatch() *storage.MemBatch { + panic("TODO") +} + // AddBlock implements the StateSync interface. func (s *FakeStateSync) AddBlock(block *block.Block) error { panic("TODO") diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2e94961d7b..510a0060b0 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -145,3 +146,10 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return err + } + return nil +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..e507a1392a --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,33 @@ +package config + +import ( + "errors" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// NeoFSBlockFetcher represents the configuration for the blockfetcher service. +type ( + NeoFSBlockFetcher struct { + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Mode string `yaml:"Mode"` + BasicService `yaml:",inline"` + } +) + +func (f NeoFSBlockFetcher) Validate() error { + if f.Timeout == 0 { + return errors.New("timeout is not set") + } + if f.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(f.ContainerID) + if err != nil { + return errors.New("invalid container ID") + } + return nil +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..c6f5c23116 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + blockFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -220,6 +223,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.blockFetcherQueue = bqueue.New(chain, log, nil, updateBlockQueueLenMetric) + s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, s.blockFetcherQueue, log) + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +301,13 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + if s.ServerConfig.NeoFSCfg.Enabled { + go s.blockFetcherQueue.Run() + go func() { + s.blockFetcher.Start() + s.log.Info("BlockFetcher service finished") + }() + } for _, tr := range s.transports { go tr.Accept() } @@ -319,6 +332,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.blockFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -706,7 +720,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -769,6 +783,10 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } + if s.blockFetcher.IsActive() { + return nil + //return s.blockFetcherQueue.PutBlock(block) + } return s.bQueue.PutBlock(block) } @@ -788,6 +806,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { func (s *Server) requestBlocksOrHeaders(p Peer) error { if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { + if s.blockFetcher.IsActive() { + return s.blockFetcher.GetHeaders() + } return s.requestHeaders(p) } return nil @@ -1434,6 +1455,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + s.log.Info("Postponing StateSync until BlockFetcher completes") + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..83f0bda9e6 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,9 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + // NeoFSCfg is NeoFS configuration. + NeoFSCfg config.NeoFSBlockFetcher } ) @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { StateRootCfg: appConfig.StateRoot, ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..b50a45dcb6 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,353 @@ +package blockfetcher + +import ( + "context" + "errors" + "fmt" + "io" + "net/url" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + ModeIndexSearch = "indexSearch" + ModeOidSearch = "oidSearch" +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + AddBlock(block *block.Block) error + GetConfig() config.Blockchain + BlockHeight() uint32 + AddHeaders(...*block.Header) error + HeaderHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + chain Ledger + client *client.Client + log *zap.Logger + quit chan bool + cfg config.NeoFSBlockFetcher + queue *bqueue.Queue + privateKey *keys.PrivateKey +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, queue *bqueue.Queue, logger *zap.Logger) *Service { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil + } + return &Service{ + chain: chain, + log: logger, + quit: make(chan bool), + cfg: cfg, + queue: queue, + privateKey: privateKey, + } +} + +// Name implements the core.Service interface. +func (bfs *Service) Name() string { + return "BlockFetcherService" +} + +// Start implements the core.Service interface. +func (bfs *Service) Start() { + bfs.log.Info("Starting Block Fetcher Service") + c, err := neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0]) + defer func(c *client.Client) { + err := c.Close() + if err != nil { + bfs.log.Error("Failed to close SDK client", zap.Error(err)) + } + }(c) + + if err != nil { + bfs.log.Error("Failed to create SDK client", zap.Error(err)) + close(bfs.quit) + return + } + bfs.client = c + switch bfs.cfg.Mode { + case ModeIndexSearch: + err = bfs.fetchData() + case ModeOidSearch: + err = bfs.fetchDataWithOid() + default: + bfs.log.Error("Invalid mode specified", zap.String("mode", bfs.cfg.Mode)) + close(bfs.quit) + return + } + + if err != nil { + bfs.log.Error("Fetch operation failed", zap.Error(err)) + close(bfs.quit) + return + } +} + +func (bfs *Service) GetHeaders() error { + startIndex := bfs.chain.HeaderHeight() / 2000 + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_header", fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter("index_header", fmt.Sprintf("%d", startIndex), object.MatchNumLE) + prm.SetFilters(filters) + + headerOids, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find 'index_header' object with index %d", startIndex), zap.Error(err)) + return err + } + + if len(headerOids) == 0 { + bfs.log.Info(fmt.Sprintf("No 'index_header' object found with index %d, stopping.", startIndex)) + return errors.New("no 'index_header' object found") + } + + data, err := bfs.get(headerOids[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch 'index_header' object with index %d", startIndex), zap.Error(err)) + return err + } + + err = bfs.processHeaders(data) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process headers for index %d", startIndex), zap.Error(err)) + return err + } + startIndex++ + } + } +} + +// Shutdown implements the core.Service interface. +func (bfs *Service) Shutdown() { + bfs.log.Info("Shutting down Block Fetcher Service") + close(bfs.quit) +} + +// IsActive implements the Service interface. +func (bfs *Service) IsActive() bool { + if !bfs.cfg.Enabled { + return false + } + select { + case <-bfs.quit: + return false + default: + return true + } +} + +func (bfs *Service) fetchData() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(30) + + for { + select { + case <-bfs.quit: + bfs.log.Info("Stopping fetchData operation.") + return nil + default: + bfs.log.Info(fmt.Sprintf("Current BlockHeight %d", bfs.chain.BlockHeight())) + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_block_2", fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter("index_block_2", fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + bfs.log.Info(fmt.Sprintf("Found %d blocks from index %d to %d", len(blockOids), startIndex, startIndex+batchSize-1)) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", startIndex), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("No block found with index %d, stopping.", startIndex)) + return errors.New("no block found") + } + for _, oidBlock := range blockOids { + go func(oidBlock oid.ID) { + data, err := bfs.get(oidBlock.String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", oidBlock.String()), zap.Error(err)) + } + err = bfs.processBlock(data) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block with index %d", startIndex), zap.Error(err)) + } + }(oidBlock) + } + + startIndex += batchSize + } + } +} + +func (bfs *Service) fetchDataWithOid() error { + startIndex := bfs.chain.BlockHeight() / 2000 + skip := bfs.chain.BlockHeight() % 2000 + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("block_oids", fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter("block_oids", fmt.Sprintf("%d", startIndex), object.MatchNumLE) + prm.SetFilters(filters) + + blockOidsObject, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("No 'block_oids' object found with index %d, stopping.", startIndex)) + return errors.New("no 'block_oids' object found") + } + + blockOidsData, err := bfs.get(blockOidsObject[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch 'block_oids' object %d", startIndex), zap.Error(err)) + return err + } + blockOIDs, err := parseBlockOIDs(blockOidsData) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to parse 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + + for j, oidBlock := range blockOIDs { + if j < int(skip) { + continue + } + data, err := bfs.get(oidBlock.String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block with OID %s", oidBlock.String()), zap.Error(err)) + return err + } + bfs.log.Info(fmt.Sprintf("Fetched block with OID %s", oidBlock.String())) + err = bfs.processBlock(data) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process blocks with index %d", startIndex), zap.Error(err)) + return err + } + } + + startIndex++ + } + } +} + +func (bfs *Service) processBlock(data []byte) error { + var buf []byte + stateRootInHeader := bfs.chain.GetConfig().StateRootInHeader + br := gio.NewBinReaderFromBuf(data) + + readBlock := func(r *gio.BinReader) ([]byte, error) { + var size = r.ReadU32LE() + if uint32(cap(buf)) < size { + buf = make([]byte, size) + } else { + buf = buf[:size] + } + r.ReadBytes(buf) + return buf, r.Err + } + + buf, err := readBlock(br) + if err != nil { + return err + } + b := block.New(stateRootInHeader) + r := gio.NewBinReaderFromBuf(buf) + b.DecodeBinary(r) + err = bfs.queue.PutBlock(b) + if err != nil { + bfs.log.Error("failed to queue block", zap.Error(err)) + return err + } + return nil +} + +func (bfs *Service) processHeaders(data []byte) error { + var resHeader payload.Headers + br := gio.NewBinReaderFromBuf(data) + resHeader.DecodeBinary(br) + err := bfs.chain.AddHeaders(resHeader.Hdrs...) + if err != nil { + return err + } + return nil +} + +func (bfs *Service) get(oid string) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.privateKey, u, bfs.cfg.Addresses[0]) + if err != nil { + return nil, err + } + data, err := io.ReadAll(rc) + if err != nil { + return nil, err + } + return data, nil +} + +func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) { + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + return neofs.ObjectSearch(ctx, bfs.client, bfs.privateKey, bfs.cfg.ContainerID, prm) +} + +func parseBlockOIDs(data []byte) ([]oid.ID, error) { + var oids []oid.ID + oidSize := 32 + + if len(data)%oidSize != 0 { + return nil, fmt.Errorf("invalid data length: not a multiple of oid size") + } + + for i := 0; i < len(data); i += oidSize { + oidBytes := data[i : i+oidSize] + var oid oid.ID + err := oid.Decode(oidBytes) + if err != nil { + return nil, fmt.Errorf("failed to decode OID: %w", err) + } + oids = append(oids, oid) + } + + return oids, nil +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..179de1f4d4 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,74 @@ +package blockfetcher_test + +import ( + "context" + "fmt" + "io" + "net/url" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" +) + +func TestService(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + BasicService: config.BasicService{ + Addresses: []string{"st1.t5.fs.neo.org:8080"}, + Enabled: true, + }, + Timeout: 15 * time.Second, + } + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchStringEqual) + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchNumLE) + prm.SetFilters(filters) + + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + require.NoError(t, err) + var ( + s = user.NewAutoIDSignerRFC6979(privateKey.PrivateKey) + objectIDs []oid.ID + ) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + neofsClient, err := neofs.GetSDKClient(ctx, + cfg.Addresses[0]) + require.NoError(t, err) + + reader, err := neofsClient.ObjectSearchInit(ctx, containerID, s, prm) + require.NoError(t, err) + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + require.NoError(t, err) + fmt.Println(objectIDs) + + oid := "3uHQb3SYPEhoxJigTtRALwhiha3nCzL7GsN6PGYMjwhT" + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", containerID, oid)) + require.NoError(t, err) + rc, err := neofs.GetWithClient(ctx, neofsClient, privateKey, u, "") + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + fmt.Println(data) +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..6abe4dabf1 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -17,6 +17,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -45,28 +47,26 @@ var ( // URI scheme is "neofs://". // If Command is not provided, full object is requested. func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { - objectAddr, ps, err := parseNeoFSURL(u) + c, err := GetSDKClient(ctx, addr) if err != nil { - return nil, err + return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err) } + return GetWithClient(ctx, c, priv, u, addr) +} - c, err := client.New(client.PrmInit{}) +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) + return nil, err } - var ( - res = clientCloseWrapper{c: c} - prmd client.PrmDial + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + res = clientCloseWrapper{c: c} ) - prmd.SetServerURI(addr) - prmd.SetContext(ctx) - err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter - if err != nil { - return res, err - } - var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) switch { case len(ps) == 0 || ps[0] == "": // Get request res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) @@ -220,3 +220,54 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + containerID cid.ID + ) + err := containerID.DecodeString(containerIDStr) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) + } + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object ID iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +func GetSDKClient(ctx context.Context, addr string) (*client.Client, error) { + var ( + prmDial client.PrmDial + ) + + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + + c, err := client.New(client.PrmInit{}) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}