From 5e3f9d966a7e5f2ce7f45f938be87bc60dc76b5e Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Thu, 16 Jun 2022 18:11:23 +0530 Subject: [PATCH 1/8] chain: adds batched async calls support for bitcoind client This commit adds a batch client connection and async RPC calls to the bitcoind client which will enable batching multiple requests speeding up operations by reducing round trips. --- chain/bitcoind_client.go | 54 ++++++++++++++++++++++++++++++++++++++++ chain/bitcoind_conn.go | 12 +++++++++ 2 files changed, 66 insertions(+) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index ac355b56db..5f829ed6f2 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcwallet/waddrmgr" @@ -113,6 +114,12 @@ func (c *BitcoindClient) BackEnd() string { return "bitcoind" } +// SendAsyncQueue sends a bulk request to the server and waits for the receive +// call made by the client. +func (c *BitcoindClient) SendAsyncQueue() error { + return c.chainConn.batchClient.Send() +} + // GetBestBlock returns the highest block known to bitcoind. func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { bcinfo, err := c.chainConn.client.GetBlockChainInfo() @@ -139,11 +146,29 @@ func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { return header.Height, nil } +// GetBlockAsync returns an instance of a type that can be used to get the +// result of the RPC at some future time by invoking the Receive function on the +// returned instance. +func (c *BitcoindClient) GetBlockAsync( + hash *chainhash.Hash) rpcclient.FutureGetBlockResult { + + return c.chainConn.batchClient.GetBlockAsync(hash) +} + // GetBlock returns a block from the hash. func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { return c.chainConn.GetBlock(hash) } +// GetBlockVerboseAsync returns an instance of a type that can be used to get +// the result of the RPC at some future time by invoking the Receive function on +// the returned instance. +func (c *BitcoindClient) GetBlockVerboseAsync( + hash *chainhash.Hash) rpcclient.FutureGetBlockVerboseResult { + + return c.chainConn.batchClient.GetBlockVerboseAsync(hash) +} + // GetBlockVerbose returns a verbose block from the hash. func (c *BitcoindClient) GetBlockVerbose( hash *chainhash.Hash) (*btcjson.GetBlockVerboseResult, error) { @@ -151,11 +176,29 @@ func (c *BitcoindClient) GetBlockVerbose( return c.chainConn.client.GetBlockVerbose(hash) } +// GetBestBlockHashAsync returns an instance of a type that can be used to get +// the result of the RPC at some future time by invoking the Receive function on +// the returned instance. +func (c *BitcoindClient) GetBlockHashAsync( + height int64) rpcclient.FutureGetBlockHashResult { + + return c.chainConn.batchClient.GetBlockHashAsync(height) +} + // GetBlockHash returns a block hash from the height. func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { return c.chainConn.client.GetBlockHash(height) } +// GetBlockHeaderAsync returns an instance of a type that can be used to get the +// result of the RPC at some future time by invoking the Receive function on the +// returned instance. +func (c *BitcoindClient) GetBlockHeaderAsync( + hash *chainhash.Hash) rpcclient.FutureGetBlockHeaderResult { + + return c.chainConn.batchClient.GetBlockHeaderAsync(hash) +} + // GetBlockHeader returns a block header from the hash. func (c *BitcoindClient) GetBlockHeader( hash *chainhash.Hash) (*wire.BlockHeader, error) { @@ -163,6 +206,17 @@ func (c *BitcoindClient) GetBlockHeader( return c.chainConn.client.GetBlockHeader(hash) } +// GetBlockHeaderVerboseAsync returns an instance of a type that can be used to +// get the result of the RPC at some future time by invoking the Receive +// function on the returned instance. +// +// See GetBlockHeader for the blocking version and more details. +func (c *BitcoindClient) GetBlockHeaderVerboseAsync( + hash *chainhash.Hash) rpcclient.FutureGetBlockHeaderVerboseResult { + + return c.chainConn.batchClient.GetBlockHeaderVerboseAsync(hash) +} + // GetBlockHeaderVerbose returns a block header from the hash. func (c *BitcoindClient) GetBlockHeaderVerbose( hash *chainhash.Hash) (*btcjson.GetBlockHeaderVerboseResult, error) { diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 47f9a566f7..623b35a61e 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -94,6 +94,9 @@ type BitcoindConn struct { // client is the RPC client to the bitcoind node. client *rpcclient.Client + // batchClient is the batched RPC client to the bitcoind node. + batchClient *rpcclient.Client + // prunedBlockDispatcher handles all of the pruned block requests. // // NOTE: This is nil when the bitcoind node is not pruned. @@ -135,6 +138,11 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) { return nil, err } + batchClient, err := rpcclient.NewBatch(clientCfg) + if err != nil { + return nil, err + } + // Verify that the node is running on the expected network. net, err := getCurrentNet(client) if err != nil { @@ -177,6 +185,7 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) { bc := &BitcoindConn{ cfg: *cfg, client: client, + batchClient: batchClient, prunedBlockDispatcher: prunedBlockDispatcher, rescanClients: make(map[uint64]*BitcoindClient), quit: make(chan struct{}), @@ -234,11 +243,14 @@ func (c *BitcoindConn) Stop() { log.Errorf("error shutting down bitcoind events: %w", err) } + c.batchClient.Shutdown() + if c.prunedBlockDispatcher != nil { c.prunedBlockDispatcher.Stop() } c.client.WaitForShutdown() + c.batchClient.WaitForShutdown() c.wg.Wait() } From 51c887fd00bb05b1e4553512ff62e0f12d96e7ed Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Mon, 11 Jul 2022 22:04:51 +0530 Subject: [PATCH 2/8] chain: adds GetBlocksBatch() to the bitcoind client This will leverage the updated batch client to fetch multiple blocks in one go which can speed up rescan and filter blocks. --- chain/bitcoind_client.go | 7 +++++ chain/bitcoind_conn.go | 63 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 5f829ed6f2..7c394ff309 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -160,6 +160,13 @@ func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) return c.chainConn.GetBlock(hash) } +// BitcoindConn.GetBlocksBatch() returns batchBlocks from batchHashes. +func (c *BitcoindClient) GetBlocksBatch( + hashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { + + return c.chainConn.GetBlocksBatch(hashes) +} + // GetBlockVerboseAsync returns an instance of a type that can be used to get // the result of the RPC at some future time by invoking the Receive function on // the returned instance. diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 623b35a61e..dba20f7be3 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -437,6 +437,69 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { } } +// GetBlocksBatch returns a batch of raw blocks from the server given their +// hashes. If the server has already pruned some of the blocks, the missing blocks +// will be retrieved from their peers. +func (c *BitcoindConn) GetBlocksBatch( + hashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { + + batchRequests := make([]rpcclient.FutureGetBlockResult, len(hashes)) + + for i := range hashes { + batchRequests[i] = c.batchClient.GetBlockAsync(hashes[i]) + } + + err := c.batchClient.Send() + if err != nil { + return nil, err + } + + batchRawBlocks := make([]*wire.MsgBlock, 0, len(hashes)) + + for i := range hashes { + block, err := batchRequests[i].Receive() + // Got the block from the backend successfully, add to the response. + if err == nil { + batchRawBlocks = append(batchRawBlocks, block) + continue + } + + // We failed getting the block from the backend for whatever + // reason. If it wasn't due to the block being pruned, return + // the error immediately. + if !isBlockPrunedErr(err) || c.prunedBlockDispatcher == nil { + return nil, err + } + + // Now that we know the block has been pruned for sure, request + // it from our backend peers. + blockChan, errChan := c.prunedBlockDispatcher.Query( + []*chainhash.Hash{hashes[i]}, + ) + out: + for { + select { + case block := <-blockChan: + batchRawBlocks = append(batchRawBlocks, block) + break out + + case err := <-errChan: + if err != nil { + return nil, err + } + + // errChan fired before blockChan with a nil + // error, wait for the block now. + + case <-c.quit: + return nil, ErrBitcoindClientShuttingDown + } + } + } + + return batchRawBlocks, nil +} + // isASCII is a helper method that checks whether all bytes in `data` would be // printable ASCII characters if interpreted as a string. func isASCII(s string) bool { From 41ce8a8aa4a47c10c6fe4ece0a2be021e1d7fac3 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Mon, 11 Jul 2022 22:12:24 +0530 Subject: [PATCH 3/8] chain: adds RescanBlocksBatched() and FilterBlocksBatched() Both RescanBlocksBatched() and FilterBlocksBatched() leverages GetBlocksBatch() which works with the updated batch client for speed ups. --- chain/bitcoind_client.go | 157 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 7c394ff309..d393605585 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -469,6 +469,99 @@ func (c *BitcoindClient) RescanBlocks( return rescannedBlocks, nil } +// RescanBlocksBatched rescans any blocks passed in batched manner using async +// calls, returning only the blocks that matched as []btcjson.BlockDetails. +func (c *BitcoindClient) RescanBlocksBatched( + blockHashes []chainhash.Hash) ([]btcjson.RescannedBlock, error) { + + // requests slice will store rpcclient.FutureGetBlockHeaderVerboseResult + // for each blockHash index. + requests := make( + []rpcclient.FutureGetBlockHeaderVerboseResult, 0, len(blockHashes), + ) + + for i := range blockHashes { + hash := blockHashes[i] + headerRequest := c.GetBlockHeaderVerboseAsync(&hash) + requests = append(requests, headerRequest) + } + + err := c.SendAsyncQueue() + if err != nil { + return nil, fmt.Errorf("unable to send batchRequests to "+ + "batchClient: %v", err) + } + + blockHashPointers := make([]*chainhash.Hash, 0, len(blockHashes)) + filteredHeaders := make( + map[chainhash.Hash]*btcjson.GetBlockHeaderVerboseResult, len(requests), + ) + + for i := range blockHashes { + hash := blockHashes[i] + header, err := requests[i].Receive() + if err != nil { + return nil, fmt.Errorf("Unable to get header %s from "+ + "bitcoind: %s", hash, err) + } + + // Prevent fetching the block completely if we know we shouldn't + // filter it. + if !c.shouldFilterBlock(time.Unix(header.Time, 0)) { + continue + } + filteredHeaders[hash] = header + blockHashPointers = append(blockHashPointers, &hash) + } + + rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(filteredHeaders)) + + // Now we fetch blocks in interval of 15 to avoid out of memory errors in + // case of fetching too many blocks with GetBlocksBatch(). + const batchSize = 15 + total := len(blockHashPointers) + + for i := 0; i < total; i += batchSize { + start := i + end := i + batchSize + if end > total { + end = total + } + + blocks, err := c.chainConn.GetBlocksBatch( + blockHashPointers[start:end], + ) + if err != nil { + return nil, err + } + batchedBlockHashPointers := blockHashPointers[start:end] + for j := range batchedBlockHashPointers { + hash := batchedBlockHashPointers[j] + relevantTxs := c.filterBlock( + blocks[j], filteredHeaders[*hash].Height, false, + ) + if len(relevantTxs) > 0 { + rescannedBlock := btcjson.RescannedBlock{ + Hash: hash.String(), + Transactions: make([]string, 0), + } + for _, tx := range relevantTxs { + rescannedBlock.Transactions = append( + rescannedBlock.Transactions, + hex.EncodeToString(tx.SerializedTx), + ) + } + + rescannedBlocks = append( + rescannedBlocks, rescannedBlock, + ) + } + } + } + + return rescannedBlocks, nil +} + // Rescan rescans from the block with the given hash until the current block, // after adding the passed addresses and outpoints to the client's watch list. func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, @@ -1017,6 +1110,70 @@ func (c *BitcoindClient) FilterBlocks( return nil, nil } +// FilterBlocksBatched scans the blocks in batched manner using async calls +// contained in the FilterBlocksRequest for any addresses of interest returning +// a FilterBlocksResponse for the first block containing a matching address. +// If no matches are found in the range of blocks requested, the returned +// response will be nil. +func (c *BitcoindClient) FilterBlocksBatched( + req *FilterBlocksRequest) (*FilterBlocksResponse, error) { + + blockFilterer := NewBlockFilterer(c.chainConn.cfg.ChainParams, req) + + blockHashes := make([]*chainhash.Hash, 0, len(req.Blocks)) + for i := range req.Blocks { + blockHashes = append(blockHashes, &req.Blocks[i].Hash) + } + + // Now we fetch blocks in interval of 15 to avoid out of memory errors + // in case of fetching too many blocks with GetBlocksBatch(). + const batchSize = 15 + total := len(blockHashes) + for i := 0; i < total; i += batchSize { + start := i + end := i + batchSize + if end > total { + end = total + } + + rawBlocks, err := c.chainConn.GetBlocksBatch( + blockHashes[start:end], + ) + if err != nil { + return nil, err + } + + // Iterate over the requested blocks, fetching each from the rpc + // client. Each block will scanned using the reverse addresses + // indexes generated above, breaking out early if any addresses + // are found. + for j := range rawBlocks { + if !blockFilterer.FilterBlock(rawBlocks[j]) { + continue + } + + // If any external or internal addresses were detected + // in this block, we return them to the caller so that + // the rescan windows can widened with subsequent addresses. + // The `BatchIndex` is returned so that the caller can + // compute the *next* block from which to begin again. + resp := &FilterBlocksResponse{ + BatchIndex: uint32(start + j), + BlockMeta: req.Blocks[start+j], + FoundExternalAddrs: blockFilterer.FoundExternal, + FoundInternalAddrs: blockFilterer.FoundInternal, + FoundOutPoints: blockFilterer.FoundOutPoints, + RelevantTxns: blockFilterer.RelevantTxns, + } + + return resp, nil + } + } + + // No addresses were found for this range. + return nil, nil +} + // rescan performs a rescan of the chain using a bitcoind backend, from the // specified hash to the best known hash, while watching out for reorgs that // happen during the rescan. It uses the addresses and outputs being tracked by From 418f510df9a986f8e5c94f517455771d46f2bf01 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Fri, 29 Jul 2022 15:42:21 +0530 Subject: [PATCH 4/8] chain: testcoverage for GetBlocksBatch() Testcoverage to ensure that GetBlocksBatch() which leverages batchAPI works as expected with enhanced performance. --- chain/bitcoind_client_test.go | 134 ++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 chain/bitcoind_client_test.go diff --git a/chain/bitcoind_client_test.go b/chain/bitcoind_client_test.go new file mode 100644 index 0000000000..cfda134d5c --- /dev/null +++ b/chain/bitcoind_client_test.go @@ -0,0 +1,134 @@ +package chain + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +var ( + // TrickleInterval is the interval at which the miner should trickle + // transactions to its peers. We'll set it small to ensure the miner + // propagates transactions quickly in the tests. + TrickleInterval = 10 * time.Millisecond + + NetParams = &chaincfg.RegressionNetParams +) + +// dummyHash is a helper function which creates 32 bytes valid hash as per the +// given hex value. +func dummyHash(value byte) *chainhash.Hash { + var hash chainhash.Hash + copy(hash[:], bytes.Repeat([]byte{value}, 32)) + return &hash +} + +// NewMiner spawns testing harness backed by a bitcoind node that can serve as a +// miner. +func NewMiner(t *testing.T, extraArgs []string, createChain bool, + spendableOutputs uint32) (*rpctest.Harness, func()) { + + t.Helper() + + // Add the trickle interval argument to the extra args. + trickle := fmt.Sprintf("--trickleinterval=%v", TrickleInterval) + extraArgs = append(extraArgs, trickle) + + node, err := rpctest.New(NetParams, nil, extraArgs, "") + require.NoError(t, err, "unable to create backend node") + if err := node.SetUp(createChain, spendableOutputs); err != nil { + node.TearDown() + t.Fatalf("unable to set up backend node: %v", err) + } + + return node, func() { node.TearDown() } +} + +// syncBitcoindWithMiner is a helper method that attempts to wait until the +// bitcoind is synced (in terms of the chain) with the miner. +func syncBitcoindWithMiner(t *testing.T, bitcoindClient *BitcoindClient, + miner *rpctest.Harness) uint32 { + + t.Helper() + + _, minerHeight, err := miner.Client.GetBestBlock() + require.NoError(t, err, "unable to retrieve miner's current height") + + timeout := time.After(10 * time.Second) + for { + _, bitcoindHeight, err := bitcoindClient.GetBestBlock() + require.NoError(t, err, "unable to retrieve bitcoind's current "+ + "height") + + if bitcoindHeight == minerHeight { + return uint32(bitcoindHeight) + } + + select { + case <-time.After(100 * time.Millisecond): + case <-timeout: + t.Fatalf("timed out waiting to sync bitcoind") + } + } +} + +// TestBitcoindGetBlocksBatch ensures that we correctly retrieve +// rawBlocks details using updated batchClient. +func TestBitcoindGetBlocksBatch(t *testing.T) { + testBitcoindGetBlocksBatch(t, true) + testBitcoindGetBlocksBatch(t, false) +} + +func testBitcoindGetBlocksBatch(t *testing.T, rpcpolling bool) { + miner, tearDown := NewMiner( + t, []string{"--txindex"}, true, 25, + ) + defer tearDown() + + bitcoindClient := setupBitcoind(t, miner.P2PAddress(), rpcpolling) + + // Blocks shouldn't be retrieved from bitcoind when passing unknown + // block hashes. + blockHashes := []*chainhash.Hash{ + dummyHash(0x10), + dummyHash(0x12), + dummyHash(0x14), + } + broadcastHeight := syncBitcoindWithMiner(t, bitcoindClient, miner) + _, err := bitcoindClient.GetBlocksBatch(blockHashes) + require.EqualError(t, err, "-5: Block not found") + + // Here, we'll generate multiple valid testblocks and retrieve it + // linearly with GetBlock() (store the result for each GetBlock() in a + // slice) further also try to retrieve blocks in batch with GetBlocksBatch(). + // At last will compare the results from both which should be equal. + if _, err := miner.Client.Generate(2*15 + 7); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + currentHeight := syncBitcoindWithMiner(t, bitcoindClient, miner) + + blockHashes = make([]*chainhash.Hash, 0, currentHeight-broadcastHeight) + msgBlocksStandard := make([]*wire.MsgBlock, 0, currentHeight-broadcastHeight) + for i := currentHeight; i >= broadcastHeight; i-- { + blockHash, err := bitcoindClient.GetBlockHash(int64(i)) + require.NoError(t, err, "unable to retrieve blockhash") + blockHashes = append(blockHashes, blockHash) + + msgBlock, err := bitcoindClient.GetBlock(blockHash) + require.NoError(t, err) + msgBlocksStandard = append(msgBlocksStandard, msgBlock) + } + + msgBlocksBatchClient, err := bitcoindClient.GetBlocksBatch(blockHashes) + require.NoError(t, err) + require.Equal( + t, msgBlocksStandard, msgBlocksBatchClient, "blocks result mismatch", + ) +} From e9caa428a73e0209d4a49cb885e5c2686ca256b7 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Fri, 29 Jul 2022 23:39:22 +0530 Subject: [PATCH 5/8] chain: testcoverage for RescanBlocksBatched() Testcoverage to ensure that RescanBlocksBatched() which leverages updated batchAPI for scanning the chain works as expected with enhanced performance. --- chain/bitcoind_client_test.go | 50 +++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/chain/bitcoind_client_test.go b/chain/bitcoind_client_test.go index cfda134d5c..51d2f48ffa 100644 --- a/chain/bitcoind_client_test.go +++ b/chain/bitcoind_client_test.go @@ -132,3 +132,53 @@ func testBitcoindGetBlocksBatch(t *testing.T, rpcpolling bool) { t, msgBlocksStandard, msgBlocksBatchClient, "blocks result mismatch", ) } + +// TestBitcoindRescanBlocksBatched ensures that we correctly retrieve the required +// details for the watched txn. +func TestBitcoindRescanBlocksBatched(t *testing.T) { + testBitcoindRescanBlocksBatched(t, true) + testBitcoindRescanBlocksBatched(t, false) +} + +func testBitcoindRescanBlocksBatched(t *testing.T, rpcpolling bool) { + miner, tearDown := NewMiner( + t, []string{"--txindex"}, true, 25, + ) + defer tearDown() + + bitcoindClient := setupBitcoind(t, miner.P2PAddress(), rpcpolling) + + // Here, we'll generate multiple valid testblocks. Moving forward, using + // bitcoindClient assign a random txn for watchedTxs. Now, we hit + // RescanBlocksBatched (leveraging updated BatchAPI). Further using its + // result will try to retrieve blockHash and compare it with actual txn + // blockHash which should be equal. + broadcastHeight := syncBitcoindWithMiner(t, bitcoindClient, miner) + if _, err := miner.Client.Generate(50); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + currentHeight := syncBitcoindWithMiner(t, bitcoindClient, miner) + blockHashes := make([]chainhash.Hash, 0, uint32(currentHeight)-broadcastHeight) + + for i := uint32(currentHeight); i >= broadcastHeight; i-- { + blockHash, err := bitcoindClient.GetBlockHash(int64(i)) + require.NoError(t, err, "unable to retrieve blockhash") + blockHashes = append(blockHashes, *blockHash) + } + + block, err := bitcoindClient.GetBlock(&blockHashes[25]) + require.NoError(t, err, "unable to fetch block") + + txn := block.Transactions[0] + bitcoindClient.watchedTxs[txn.TxHash()] = struct{}{} + + rescanBlocksBatchAPI, err := bitcoindClient.RescanBlocksBatched(blockHashes) + require.NoError(t, err) + require.Equal(t, blockHashes[25].String(), rescanBlocksBatchAPI[0].Hash) + + // We can also compare the results with former implemetation "RescanBlocks" + // which should be equal. + rescanBlocksSync, err := bitcoindClient.RescanBlocks(blockHashes) + require.NoError(t, err) + require.Equal(t, rescanBlocksSync, rescanBlocksBatchAPI) +} From 1aff3277eb66f7d7b69adea632daf1f3bd3b1e86 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Sat, 30 Jul 2022 14:21:53 +0530 Subject: [PATCH 6/8] chain: adds utility functions for testing --- chain/utils_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/chain/utils_test.go b/chain/utils_test.go index f65a99cd53..8332dcf6dc 100644 --- a/chain/utils_test.go +++ b/chain/utils_test.go @@ -7,12 +7,23 @@ import ( "net" "runtime" "sync" + "testing" "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +const ( + testTimeout = 10 * time.Millisecond + pollInterval = 100 * time.Millisecond ) // setupConnPair initiates a tcp connection between two peers. @@ -246,3 +257,88 @@ func produceInvalidBlock(block *wire.MsgBlock) *wire.MsgBlock { blockCopy.AddTransaction(lastTx) return blockCopy } + +// WaitForMempoolTx waits for the txid to be seen in the miner's mempool. +func WaitForMempoolTx(miner *rpctest.Harness, txid *chainhash.Hash) error { + timeout := time.After(testTimeout) + trickle := time.After(2 * TrickleInterval) + for { + // Check for the harness' knowledge of the txid. + tx, err := miner.Client.GetRawTransaction(txid) + if err != nil { + jsonErr, ok := err.(*btcjson.RPCError) + if ok && jsonErr.Code == btcjson.ErrRPCNoTxInfo { + continue + } + return err + } + + if tx != nil && tx.Hash().IsEqual(txid) { + break + } + + select { + case <-time.After(pollInterval): + case <-timeout: + return errors.New("timed out waiting for tx") + } + } + + // To ensure any transactions propagate from the miner to the peers + // before returning, ensure we have waited for at least + // 2*trickleInterval before returning. + select { + case <-trickle: + case <-timeout: + return errors.New("timeout waiting for trickle interval. " + + "Trickle interval to large?") + } + + return nil +} + +// CreateSpendableOutput creates and returns an output that can be spent later +// on. +func CreateSpendableOutput(t *testing.T, + miner *rpctest.Harness) (*wire.OutPoint, *wire.TxOut, *btcec.PrivateKey) { + + t.Helper() + + // Create a transaction that only has one output, the one destined for + // the recipient. + pkScript, privKey, err := randPubKeyHashScript() + require.NoError(t, err, "unable to generate pkScript") + output := &wire.TxOut{Value: 2e8, PkScript: pkScript} + txid, err := miner.SendOutputsWithoutChange([]*wire.TxOut{output}, 10) + require.NoError(t, err, "unable to create tx") + + // Mine the transaction to mark the output as spendable. + if err := WaitForMempoolTx(miner, txid); err != nil { + t.Fatalf("tx not relayed to miner: %v", err) + } + if _, err := miner.Client.Generate(1); err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + return wire.NewOutPoint(txid, 0), output, privKey +} + +// CreateSpendTx creates a transaction spending the specified output. +func CreateSpendTx(t *testing.T, prevOutPoint *wire.OutPoint, + prevOutput *wire.TxOut, privKey *btcec.PrivateKey) *wire.MsgTx { + + t.Helper() + + spendingTx := wire.NewMsgTx(1) + spendingTx.AddTxIn(&wire.TxIn{PreviousOutPoint: *prevOutPoint}) + spendingTx.AddTxOut(&wire.TxOut{Value: 1e8, PkScript: prevOutput.PkScript}) + + sigScript, err := txscript.SignatureScript( + spendingTx, 0, prevOutput.PkScript, txscript.SigHashAll, + privKey, true, + ) + require.NoError(t, err, "unable to sign tx") + spendingTx.TxIn[0].SignatureScript = sigScript + + return spendingTx +} From ad849752ba66982d134276c23bef9801fdfa4de0 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Sat, 30 Jul 2022 14:25:22 +0530 Subject: [PATCH 7/8] chain: testcoverage for FilterBlocksBatched() Testcoverage to ensure that FilterBlocksBatched() which leverages batchAPI for scanning the chain works as expected with enhanced performance. --- chain/bitcoind_client_test.go | 85 +++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/chain/bitcoind_client_test.go b/chain/bitcoind_client_test.go index 51d2f48ffa..191b79c1f2 100644 --- a/chain/bitcoind_client_test.go +++ b/chain/bitcoind_client_test.go @@ -6,10 +6,12 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/wtxmgr" "github.com/stretchr/testify/require" ) @@ -182,3 +184,86 @@ func testBitcoindRescanBlocksBatched(t *testing.T, rpcpolling bool) { require.NoError(t, err) require.Equal(t, rescanBlocksSync, rescanBlocksBatchAPI) } + +// TestBitcoindFilterBlocksBatched ensures that we correctly retrieve the required +// FilterBlocksResponse for the requested watchPoints. +func TestBitcoindFilterBlocksBatched(t *testing.T) { + testBitcoindFilterBlocksBatched(t, true) + testBitcoindFilterBlocksBatched(t, false) +} + +func testBitcoindFilterBlocksBatched(t *testing.T, rpcpolling bool) { + miner, tearDown := NewMiner( + t, []string{"--txindex"}, true, 25, + ) + defer tearDown() + + bitcoindClient := setupBitcoind(t, miner.P2PAddress(), rpcpolling) + + // Now, we'll create a test transaction, confirm it, and attempt to + // retrieve its relevant txn details. The transaction must be in the + // chain and not contain any unspent outputs. To ensure this, we'll + // create a transaction with only one output, which we will manually + // spend. + outpoint, output, privKey := CreateSpendableOutput(t, miner) + heightHint := syncBitcoindWithMiner(t, bitcoindClient, miner) + spendTx := CreateSpendTx(t, outpoint, output, privKey) + spendTxHash, err := miner.Client.SendRawTransaction(spendTx, true) + require.NoError(t, err, "unable to broadcast tx") + err = WaitForMempoolTx(miner, spendTxHash) + require.NoError(t, err, "tx not relayed to miner") + _, err = miner.Client.Generate(50) + require.NoError(t, err, "unable to generate blocks") + currentHeight := syncBitcoindWithMiner(t, bitcoindClient, miner) + + blockStore := make([]wtxmgr.BlockMeta, 0, currentHeight-heightHint) + for i := currentHeight; i >= heightHint; i-- { + blockHash, err := bitcoindClient.GetBlockHash(int64(i)) + require.NoError(t, err, "unable to retrieve blockhash") + block, err := bitcoindClient.GetBlock(blockHash) + require.NoError(t, err, "unable to get raw block") + + blockWtxmgr := wtxmgr.Block{ + Hash: *blockHash, + Height: int32(i), + } + temp := wtxmgr.BlockMeta{ + Block: blockWtxmgr, + Time: block.Header.Timestamp, + } + blockStore = append(blockStore, temp) + } + + watchedOutPoint := make(map[wire.OutPoint]btcutil.Address) + watchedOutPoint[*outpoint] = &btcutil.AddressWitnessPubKeyHash{} + + // Construct a filter request, watching only for the outpoints above, + // and hit FilterBlocksBatched with the request created. + req := FilterBlocksRequest{ + Blocks: blockStore, + WatchedOutPoints: watchedOutPoint, + } + + // Now, we hit FilterBlocksBatched (leveraging updated BatchAPI). Further + // using its result will reteieve the height, blockHash, txnHash and + // compare it with actual spendTxn height, blockHash and spendTxHash. + filterBlockResponseBatchAPI, err := bitcoindClient.FilterBlocksBatched(&req) + require.NoError(t, err) + require.Equal(t, heightHint+1, uint32(filterBlockResponseBatchAPI.BlockMeta.Height)) + actualBlockHash, err := bitcoindClient.GetBlockHash(int64(heightHint + 1)) + require.NoError(t, err, "unable to get block hash") + require.Equal(t, actualBlockHash.String(), filterBlockResponseBatchAPI.BlockMeta.Hash.String()) + txnExist := false + for _, txn := range filterBlockResponseBatchAPI.RelevantTxns { + if txn.TxHash().String() == spendTx.TxHash().String() { + txnExist = true + } + } + require.Equal(t, txnExist, true, "txn mismatch") + + // We can also compare the results with former implemetation "FilterBlocks" + // which should be equal. + filterBlockResponseSync, err := bitcoindClient.FilterBlocks(&req) + require.NoError(t, err) + require.Equal(t, filterBlockResponseSync, filterBlockResponseBatchAPI) +} From 931e55dd015e5bb64d40b4d57975d34759849255 Mon Sep 17 00:00:00 2001 From: Vib-UX Date: Sun, 20 Nov 2022 23:07:00 +0530 Subject: [PATCH 8/8] chain: refactors and adds generic fnc() --- chain/bitcoind_client.go | 216 +++++++++++++++++++++++++++++++++++++++ chain/bitcoind_conn.go | 4 + 2 files changed, 220 insertions(+) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index d393605585..cc3b5868c0 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -26,6 +26,31 @@ var ( ErrBitcoindClientShuttingDown = errors.New("client is shutting down") ) +// TxConfStatus denotes the status of a transaction's lookup. +type TxConfStatus uint8 + +const ( + // TxFoundMempool denotes that the transaction was found within the + // backend node's mempool. + TxFoundMempool TxConfStatus = iota + + // TxFoundIndex denotes that the transaction was found within the + // backend node's txindex. + TxFoundIndex + + // TxNotFoundIndex denotes that the transaction was not found within the + // backend node's txindex. + TxNotFoundIndex + + // TxFoundManually denotes that the transaction was found within the + // chain by scanning for it manually. + TxFoundManually + + // TxNotFoundManually denotes that the transaction was not found within + // the chain by scanning for it manually. + TxNotFoundManually +) + // BitcoindClient represents a persistent client connection to a bitcoind server // for information regarding the current best block chain. type BitcoindClient struct { @@ -114,6 +139,123 @@ func (c *BitcoindClient) BackEnd() string { return "bitcoind" } +// String returns the string representation of the TxConfStatus. +func (t TxConfStatus) String() string { + switch t { + case TxFoundMempool: + return "TxFoundMempool" + + case TxFoundIndex: + return "TxFoundIndex" + + case TxNotFoundIndex: + return "TxNotFoundIndex" + + case TxFoundManually: + return "TxFoundManually" + + case TxNotFoundManually: + return "TxNotFoundManually" + + default: + return "unknown" + } +} + +// FindTxInBlockRange looks for the mathcing txn over the requested block range +// using the cb and returns the required txn details if cb isMatch()==true +func (c *BitcoindClient) FindTxInBlockRange(startHeight, endHeight uint32, + isMatch func(tx *wire.MsgTx) bool) (uint32, uint32, *wire.MsgBlock, + TxConfStatus, error) { + + // batchRequests will store the scan requests at every height to + // determine if the output was spent. + batchRequests := make( + map[uint32]rpcclient.FutureGetBlockHashResult, endHeight-startHeight, + ) + + for height := endHeight; height >= startHeight && height > 0; height-- { + batchRequests[height] = c.GetBlockHashAsync(int64(height)) + } + + // Sending the bulk request using updated batchClient. + err := c.SendAsyncQueue() + if err != nil { + return 0, 0, nil, TxNotFoundManually, err + } + + blockHashes := make([]*chainhash.Hash, 0, len(batchRequests)) + for height := endHeight; height >= startHeight && height > 0; height-- { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-c.quit: + return 0, 0, nil, TxNotFoundManually, + errors.New("bitcoind client shutting down") + default: + } + + // Receive the next block hash from the async queue. + blockHash, err := batchRequests[height].Receive() + if err != nil { + return 0, 0, nil, TxNotFoundManually, + fmt.Errorf("unable to retrieve hash for block "+ + "with height %d: %v", height, err) + } + + blockHashes = append(blockHashes, blockHash) + } + // Now we fetch blocks in interval of 15 to avoid out of memory + // errors in case of fetching too many blocks with GetBlocksBatch(). + const batchSize = 15 + total := len(batchRequests) + for i := 0; i < total; i += batchSize { + // Ensure we haven't been requested to shut down before + // processing next set of blocks. + select { + case <-c.quit: + return 0, 0, nil, TxNotFoundManually, + errors.New("bitcoind client shutting down") + default: + } + + start := i + end := i + batchSize + + if end > total { + end = total + } + + blocks, err := c.GetBlocksBatch(blockHashes[start:end]) + + if err != nil { + return 0, 0, nil, TxNotFoundManually, err + } + + // Note:- blockHashes are stored in reverse order + // currentHeight --> heightHint, so we maintain the same refs + // of currentHeight to return the correct BlockHeight. + height := int(endHeight) - start + + for j := range blocks { + // For every transaction in the block, check which one + // matches our request. If we find one that does, we can + // dispatch its confirmation details. + for txIndex, tx := range blocks[j].Transactions { + if isMatch(tx) { + return uint32(height - j), + uint32(txIndex), blocks[j], + TxFoundManually, nil + } + } + } + } + + // If we reach here, then we were not able to find the transaction + // within a block, so we avoid returning an error. + return 0, 0, nil, TxNotFoundManually, nil +} + // SendAsyncQueue sends a bulk request to the server and waits for the receive // call made by the client. func (c *BitcoindClient) SendAsyncQueue() error { @@ -1174,6 +1316,80 @@ func (c *BitcoindClient) FilterBlocksBatched( return nil, nil } +func (c *BitcoindClient) ForEachTxOfBlockRange(startHeight, endHeight uint32, + inspectTx func(absoluteHeight int, block *wire.MsgBlock, + txIndex int, tx *wire.MsgTx) bool) (TxConfStatus, + error) { + + // batchRequests will store the scan requests at every height to + // determine if the output was spent. + batchRequests := make( + map[uint32]rpcclient.FutureGetBlockHashResult, endHeight-startHeight, + ) + + for height := endHeight; height >= startHeight && height > 0; height-- { + batchRequests[height] = c.GetBlockHashAsync(int64(height)) + } + + // Sending the bulk request using updated batchClient. + err := c.SendAsyncQueue() + if err != nil { + return TxNotFoundManually, err + } + + blockHashes := make([]*chainhash.Hash, 0, len(batchRequests)) + + // Now we fetch blocks in interval of 15 to avoid out of memory + // errors in case of fetching too many blocks with GetBlocksBatch(). + const batchSize = 15 + total := len(blockHashes) + + for i := 0; i < total; i += batchSize { + // Ensure we haven't been requested to shut down before + // processing next set of blocks. + select { + case <-c.quit: + return TxNotFoundManually, + errors.New("bitcoind client shutting down") + default: + } + + start := i + end := i + batchSize + + if end > total { + end = total + } + + blocks, err := c.GetBlocksBatch(blockHashes[start:end]) + + if err != nil { + return TxNotFoundManually, err + } + + // Note:- blockHashes are stored in reverse order + // currentHeight --> heightHint, so we maintain the same refs + // of currentHeight to return the correct BlockHeight. + height := int(endHeight) - start + + for j := range blocks { + // For every transaction in the block, check which one + // matches our request. If we find one that does, we can + // dispatch its confirmation details. + for txIndex, tx := range blocks[j].Transactions { + abort := inspectTx(height, blocks[j], txIndex, tx) + if abort { + return TxFoundManually, nil + } + } + } + } + + // If we reach here, then we were not able to find the transaction + // within a block, so we avoid returning an error. + return TxNotFoundManually, nil +} + // rescan performs a rescan of the chain using a bitcoind backend, from the // specified hash to the best known hash, while watching out for reorgs that // happen during the rescan. It uses the addresses and outputs being tracked by diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index dba20f7be3..a60b37e4b8 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -76,6 +76,10 @@ type BitcoindConfig struct { // // NOTE: This only applies for pruned bitcoind nodes. PrunedModeMaxPeers int + + // BlocksBatchSize value will determine how many blocks in single batch + // needs to fetched from the batch client. + BlockBatchSize uint } // BitcoindConn represents a persistent client connection to a bitcoind node