From 6a52364b6171dd9ffa0e7e4a73739d34be5c6e82 Mon Sep 17 00:00:00 2001 From: Say Cheong Date: Mon, 2 Sep 2024 14:11:43 +0900 Subject: [PATCH 1/4] Removed the usage of BlockCompletion in PreloadAsync() --- src/Libplanet.Net/Swarm.BlockSync.cs | 47 ++++--------------- src/Libplanet.Net/Swarm.cs | 4 +- test/Libplanet.Net.Tests/SwarmTest.Preload.cs | 41 +--------------- 3 files changed, 13 insertions(+), 79 deletions(-) diff --git a/src/Libplanet.Net/Swarm.BlockSync.cs b/src/Libplanet.Net/Swarm.BlockSync.cs index fb5a3006ab1..9eee58bacfa 100644 --- a/src/Libplanet.Net/Swarm.BlockSync.cs +++ b/src/Libplanet.Net/Swarm.BlockSync.cs @@ -82,11 +82,7 @@ private async Task PullBlocksAsync( try { - var blockCompletion = new BlockCompletion( - completionPredicate: BlockChain.ContainsBlock, - window: InitialBlockDownloadWindow - ); - var demandBlockHashes = await GetDemandBlockHashes( + (var peer, var demandBlockHashes) = await GetDemandBlockHashes( BlockChain, peersWithBlockExcerpt, progress, @@ -122,10 +118,7 @@ private async Task PullBlocksAsync( index, hash ); - if (blockCompletion.Demand(hash)) - { - totalBlocksToDownload++; - } + totalBlocksToDownload++; } if (totalBlocksToDownload == 0) @@ -134,23 +127,20 @@ private async Task PullBlocksAsync( return; } - IAsyncEnumerable> completedBlocks = - blockCompletion.Complete( - peers: peersWithBlockExcerpt.Select(pair => pair.Item1).ToList(), - blockFetcher: GetBlocksAsync, - cancellationToken: cancellationToken - ); + var downloadedBlocks = GetBlocksAsync( + peer, + demandBlockHashes.Select(pair => pair.Item2), + cancellationToken); await foreach ( - (Block block, BlockCommit commit, BoundPeer sourcePeer) - in completedBlocks.WithCancellation(cancellationToken)) + (Block block, BlockCommit commit) + in downloadedBlocks.WithCancellation(cancellationToken)) { _logger.Verbose( "Got #{BlockIndex} {BlockHash} from {Peer}", block.Index, block.Hash, - sourcePeer - ); + peer); cancellationToken.ThrowIfCancellationRequested(); if (block.Index == 0 && !block.Hash.Equals(BlockChain.Genesis.Hash)) @@ -195,26 +185,9 @@ in completedBlocks.WithCancellation(cancellationToken)) ++receivedBlockCount), ReceivedBlockCount = receivedBlockCount, ReceivedBlockHash = block.Hash, - SourcePeer = sourcePeer, + SourcePeer = peer, }); } - - BlockHash? previousHash = blocks.First().Item1.PreviousHash; - Block branchpoint; - BlockCommit branchpointCommit; - if (previousHash != null) - { - branchpoint = BlockChain.Store.GetBlock( - (BlockHash)previousHash); - branchpointCommit = BlockChain.GetBlockCommit(branchpoint.Hash); - } - else - { - branchpoint = BlockChain.Genesis; - branchpointCommit = null; - } - - blocks.Insert(0, (branchpoint, branchpointCommit)); } catch (Exception e) { diff --git a/src/Libplanet.Net/Swarm.cs b/src/Libplanet.Net/Swarm.cs index 093cd11d6bf..01364c0dd0c 100644 --- a/src/Libplanet.Net/Swarm.cs +++ b/src/Libplanet.Net/Swarm.cs @@ -981,7 +981,7 @@ internal async IAsyncEnumerable GetTxsAsync( /// to download. /// /// - internal async Task> GetDemandBlockHashes( + internal async Task<(BoundPeer, List<(long, BlockHash)>)> GetDemandBlockHashes( BlockChain blockChain, IList<(BoundPeer, IBlockExcerpt)> peersWithExcerpts, IProgress progress = null, @@ -1008,7 +1008,7 @@ internal async IAsyncEnumerable GetTxsAsync( cancellationToken); if (downloadedHashes.Any()) { - return downloadedHashes; + return (peer, downloadedHashes); } else { diff --git a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs index aefeed95f82..161daa85bc3 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs @@ -212,19 +212,6 @@ await receiverSwarm.PreloadAsync( expectedStates.Add(state); } - for (var i = 1; i < minerChain.Count; i++) - { - var b = minerChain[i]; - var state = new BlockDownloadState - { - ReceivedBlockHash = b.Hash, - TotalBlockCount = i == 10 || i == 11 ? 12 : 11, - ReceivedBlockCount = i, - SourcePeer = minerSwarm.AsPeer, - }; - expectedStates.Add(state); - } - for (var i = 1; i < minerChain.Count; i++) { var b = minerChain[i]; @@ -248,13 +235,6 @@ await receiverSwarm.PreloadAsync( _logger.Debug("Expected preload states: {@expectedStates}", expectedStates); _logger.Debug("Actual preload states: {@actualStates}", actualStates); - Assert.Equal( - expectedStates - .OfType() - .Select(state => state.ReceivedBlockHash), - actualStates - .OfType() - .Select(state => state.ReceivedBlockHash)); Assert.Equal( expectedStates .OfType() @@ -601,18 +581,6 @@ public async Task PreloadFromNominer() expectedStates.Add(state); } - for (var i = 1; i < minerChain.Count; i++) - { - var state = new BlockDownloadState - { - ReceivedBlockHash = minerChain[i].Hash, - TotalBlockCount = 10, - ReceivedBlockCount = i, - SourcePeer = nominerSwarm1.AsPeer, - }; - expectedStates.Add(state); - } - for (var i = 1; i < minerChain.Count; i++) { var state1 = new ActionExecutionState @@ -633,13 +601,6 @@ public async Task PreloadFromNominer() } // FIXME: this test does not ensures block download in order - Assert.Equal( - expectedStates - .OfType() - .Select(state => state.ReceivedBlockHash), - actualStates - .OfType() - .Select(state => state.ReceivedBlockHash)); Assert.Equal( expectedStates .OfType() @@ -855,7 +816,7 @@ public async Task GetDemandBlockHashes() (minerSwarm.AsPeer, minerChain.Tip.Header), }; - List<(long, BlockHash)> demands = await receiverSwarm.GetDemandBlockHashes( + (var _, List<(long, BlockHash)> demands) = await receiverSwarm.GetDemandBlockHashes( receiverChain, peersWithExcerpt, progress: null, From 77b1ddecf6d583356ce435e60170591238dcc868 Mon Sep 17 00:00:00 2001 From: Say Cheong Date: Mon, 2 Sep 2024 14:55:51 +0900 Subject: [PATCH 2/4] Removed reporting BlockHashDownloadState and BlockDownloadState --- src/Libplanet.Net/BlockDownloadState.cs | 1 + src/Libplanet.Net/BlockHashDownloadState.cs | 1 + src/Libplanet.Net/Swarm.BlockSync.cs | 11 -------- src/Libplanet.Net/Swarm.cs | 7 ----- test/Libplanet.Net.Tests/SwarmTest.Preload.cs | 26 ++----------------- 5 files changed, 4 insertions(+), 42 deletions(-) diff --git a/src/Libplanet.Net/BlockDownloadState.cs b/src/Libplanet.Net/BlockDownloadState.cs index 56617f80df9..3766b026117 100644 --- a/src/Libplanet.Net/BlockDownloadState.cs +++ b/src/Libplanet.Net/BlockDownloadState.cs @@ -6,6 +6,7 @@ namespace Libplanet.Net /// /// Indicates a progress of downloading blocks. /// + [Obsolete("This is no longer compatible with the current preloading scheme.")] public class BlockDownloadState : BlockSyncState, IEquatable { /// diff --git a/src/Libplanet.Net/BlockHashDownloadState.cs b/src/Libplanet.Net/BlockHashDownloadState.cs index a6cdca3f39f..63c2b6f69d9 100644 --- a/src/Libplanet.Net/BlockHashDownloadState.cs +++ b/src/Libplanet.Net/BlockHashDownloadState.cs @@ -5,6 +5,7 @@ namespace Libplanet.Net /// /// Indicates a progress of downloading block hashes. /// + [Obsolete("This is no longer compatible with the current preloading scheme.")] public class BlockHashDownloadState : BlockSyncState, IEquatable { /// diff --git a/src/Libplanet.Net/Swarm.BlockSync.cs b/src/Libplanet.Net/Swarm.BlockSync.cs index 9eee58bacfa..73ea9a76836 100644 --- a/src/Libplanet.Net/Swarm.BlockSync.cs +++ b/src/Libplanet.Net/Swarm.BlockSync.cs @@ -76,7 +76,6 @@ private async Task PullBlocksAsync( } long totalBlocksToDownload = 0L; - long receivedBlockCount = 0L; Block tempTip = BlockChain.Tip; var blocks = new List<(Block, BlockCommit)>(); @@ -177,16 +176,6 @@ in downloadedBlocks.WithCancellation(cancellationToken)) { tempTip = block; } - - progress?.Report(new BlockDownloadState - { - TotalBlockCount = Math.Max( - totalBlocksToDownload, - ++receivedBlockCount), - ReceivedBlockCount = receivedBlockCount, - ReceivedBlockHash = block.Hash, - SourcePeer = peer, - }); } } catch (Exception e) diff --git a/src/Libplanet.Net/Swarm.cs b/src/Libplanet.Net/Swarm.cs index 01364c0dd0c..48879c5a0a0 100644 --- a/src/Libplanet.Net/Swarm.cs +++ b/src/Libplanet.Net/Swarm.cs @@ -1070,13 +1070,6 @@ internal async IAsyncEnumerable GetTxsAsync( pair.Item1, pair.Item2); downloaded.Add(pair); - progress?.Report( - new BlockHashDownloadState - { - EstimatedTotalBlockHashCount = blockHashes.Count, - ReceivedBlockHashCount = downloaded.Count, - SourcePeer = peer, - }); } return downloaded; diff --git a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs index 161daa85bc3..1c191426ed9 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.Preload.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.Preload.cs @@ -200,18 +200,6 @@ await receiverSwarm.PreloadAsync( var expectedStates = new List(); - for (var i = 1; i < minerChain.Count; i++) - { - var b = minerChain[i]; - var state = new BlockHashDownloadState - { - EstimatedTotalBlockHashCount = 10, - ReceivedBlockHashCount = 1, - SourcePeer = minerSwarm.AsPeer, - }; - expectedStates.Add(state); - } - for (var i = 1; i < minerChain.Count; i++) { var b = minerChain[i]; @@ -570,17 +558,6 @@ public async Task PreloadFromNominer() var expectedStates = new List(); - for (var i = 1; i < minerChain.Count; i++) - { - var state = new BlockHashDownloadState - { - EstimatedTotalBlockHashCount = 10, - ReceivedBlockHashCount = i, - SourcePeer = nominerSwarm1.AsPeer, - }; - expectedStates.Add(state); - } - for (var i = 1; i < minerChain.Count; i++) { var state1 = new ActionExecutionState @@ -669,9 +646,10 @@ public async Task PreloadRetryWithNextPeers(int blockCount) var shouldStopSwarm = swarm0.AsPeer.Equals(receiverSwarm.Peers.First()) ? swarm0 : swarm1; + // FIXME: This relies on progress report to artificially stop preloading. async void Action(BlockSyncState state) { - if (!startedStop && state is BlockDownloadState) + if (!startedStop) { startedStop = true; await shouldStopSwarm.StopAsync(TimeSpan.Zero); From f665d18533ac25a039aba4d822f231f9f4820a38 Mon Sep 17 00:00:00 2001 From: Say Cheong Date: Mon, 2 Sep 2024 14:56:59 +0900 Subject: [PATCH 3/4] Removed BlockCompletion --- src/Libplanet.Net/BlockCompletion.cs | 454 ------------------ .../BlockCompletionTest.cs | 423 ---------------- 2 files changed, 877 deletions(-) delete mode 100644 src/Libplanet.Net/BlockCompletion.cs delete mode 100644 test/Libplanet.Net.Tests/BlockCompletionTest.cs diff --git a/src/Libplanet.Net/BlockCompletion.cs b/src/Libplanet.Net/BlockCompletion.cs deleted file mode 100644 index 444d4d0dfb2..00000000000 --- a/src/Libplanet.Net/BlockCompletion.cs +++ /dev/null @@ -1,454 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Diagnostics.Contracts; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; -using Libplanet.Types.Blocks; -using Nito.AsyncEx; -using Serilog; -using Serilog.Events; - -namespace Libplanet.Net -{ - internal class BlockCompletion - where TPeer : notnull - { - private readonly ILogger _logger; - private readonly Func? _completionPredicate; - private readonly int _window; - private readonly ConcurrentDictionary _satisfiedBlocks; - private readonly ConcurrentQueue _demands; - private readonly SemaphoreSlim _demandEnqueued; - private bool _started; - - public BlockCompletion(Func? completionPredicate = null, int window = 100) - { - _logger = Log.ForContext>(); - _completionPredicate = completionPredicate; - _window = window; - _satisfiedBlocks = new ConcurrentDictionary(); - _started = false; - _demands = new ConcurrentQueue(); - _demandEnqueued = new SemaphoreSlim(0); - } - - public delegate IAsyncEnumerable<(Block, BlockCommit?)> BlockFetcher( - TPeer peer, - IEnumerable blockHashes, - CancellationToken cancellationToken - ); - - public bool Demand(BlockHash blockHash) => - Demand(blockHash, retry: false); - - public int Demand(IEnumerable blockHashes) => - Demand(blockHashes, retry: false); - - public async IAsyncEnumerable> EnumerateChunks( - [EnumeratorCancellation] CancellationToken cancellationToken = default - ) - { - var chunk = new List(capacity: _window); - while (!(cancellationToken.IsCancellationRequested || - QueuedDemandCompleted())) - { - cancellationToken.ThrowIfCancellationRequested(); - if (_logger.IsEnabled(LogEventLevel.Verbose)) - { - _logger.Verbose( - "Waiting a demand enqueued...\n" + - "Demands in the buffer: {DemandCount}.\n" + - "Incomplete downloads: {IncompleteDownloads}.", - chunk.Count, - _satisfiedBlocks.Count(kv => !kv.Value && !chunk.Contains(kv.Key)) - ); - } - - await _demandEnqueued.WaitAsync(100, cancellationToken); - if (_demands.TryDequeue(out BlockHash demand)) - { - chunk.Add(demand); - } - - if (chunk.Count >= _window || ( - _started && _demands.IsEmpty && _satisfiedBlocks.All(kv => - kv.Value || chunk.Contains(kv.Key)))) - { - yield return chunk.ToImmutableArray(); - _logger.Verbose("A chunk of {Window} demands have made", chunk.Count); - - chunk.Clear(); - } - else - { - if (_logger.IsEnabled(LogEventLevel.Verbose)) - { - _logger.Verbose( - "The number of buffered demands: {DemandCount}.\n" + - "The number of demands in the backlog: {BacklogCount}.\n" + - "The number of incomplete downloads: {IncompleteDownloads}.", - chunk.Count, - _demands.Count, - _satisfiedBlocks.Count(kv => !kv.Value && !chunk.Contains(kv.Key)) - ); - } - } - } - - if (!cancellationToken.IsCancellationRequested && chunk.Count > 0) - { - _logger.Verbose("The last chunk of {Window} demands have made.", chunk.Count); - yield return chunk; - } - - _logger.Verbose("The stream of demand chunks finished"); - - cancellationToken.ThrowIfCancellationRequested(); - } - - [Pure] - public bool Satisfies(BlockHash blockHash, bool ignoreTransientCompletions = false) - { - return (!ignoreTransientCompletions && _satisfiedBlocks.ContainsKey(blockHash)) || - (!(_completionPredicate is null) && _completionPredicate(blockHash)); - } - - public bool Satisfy(Block block) - { - if (block is null) - { - throw new ArgumentNullException(nameof(block)); - } - - if (block.PreviousHash is { } prevHash) - { - _logger.Verbose( - "Block #{BlockIndex} {BlockHash}'s previous block " + - "#{PreviousIndex} {PreviousHash} is needed; adding it to the queue...", - block.Index, - block.Hash, - block.Index - 1, - prevHash - ); - Demand(prevHash); - } - - if (_satisfiedBlocks.TryUpdate(block.Hash, true, false)) - { - _logger.Verbose( - "Completed block #{BlockIndex} {BlockHash}; " + - "remaining incomplete demands: {IncompleteDemands}", - block.Index, - block.Hash, - _demands.Count + _satisfiedBlocks.Count(kv => !kv.Value) - ); - return true; - } - - if (_satisfiedBlocks.ContainsKey(block.Hash)) - { - _logger.Verbose( - "Block #{BlockIndex} {BlockHash} is already complete; " + - "remaining incomplete demands: {IncompleteDemands}", - block.Index, - block.Hash, - _demands.Count + _satisfiedBlocks.Count(kv => !kv.Value) - ); - return false; - } - - _logger.Verbose( - "Block #{BlockIndex} {BlockHash} was never demanded; " + - "remaining incomplete demands: {IncompleteDemands}", - block.Index, - block.Hash, - _demands.Count + _satisfiedBlocks.Count(kv => !kv.Value) - ); - return false; - } - - /// - /// Downloads blocks from in parallel, - /// using the given function. - /// - /// A list of peers to download blocks. - /// A function to take demands and a peer, and then - /// download corresponding blocks. - /// A cancellation token to observe while waiting - /// for the task to complete. - /// An async enumerable that yields pairs of a fetched block and its source - /// peer. It terminates when all demands are satisfied. - public async IAsyncEnumerable> Complete( - IReadOnlyList peers, - BlockFetcher blockFetcher, - [EnumeratorCancellation] CancellationToken cancellationToken = default - ) - { - if (!peers.Any()) - { - throw new ArgumentException("The list of peers must not be empty.", nameof(peers)); - } - - var pool = new PeerPool(peers); - var queue = - new AsyncProducerConsumerQueue>(); - - Task producer = Task.Run(async () => - { - try - { - await foreach (var hashes in EnumerateChunks(cancellationToken)) - { - cancellationToken.ThrowIfCancellationRequested(); - IList blockHashes = - hashes is IList l ? l : hashes.ToList(); - - cancellationToken.ThrowIfCancellationRequested(); - await pool.SpawnAsync( - CreateEnqueuing( - blockHashes, - blockFetcher, - cancellationToken, - queue - ), - cancellationToken: cancellationToken - ); - } - - await pool.WaitAll(cancellationToken); - } - finally - { - queue.CompleteAdding(); - } - }); - - while (await queue.OutputAvailableAsync(cancellationToken)) - { - Tuple triple; - try - { - triple = await queue.DequeueAsync(cancellationToken); - } - catch (InvalidOperationException) - { - break; - } - - yield return triple; - _logger.Verbose( - "Completed block #{BlockIndex} {BlockHash} from {Peer}", - triple.Item1.Index, - triple.Item1.Hash, - triple.Item2 - ); - } - - await producer; - } - - private bool Demand(BlockHash blockHash, bool retry) - { - if (Satisfies(blockHash, ignoreTransientCompletions: retry)) - { - return false; - } - - if (_satisfiedBlocks.TryAdd(blockHash, false) || retry) - { - _demands.Enqueue(blockHash); - _started = true; - _demandEnqueued.Release(); - _logger.Verbose("A demand was enqueued: {BlockHash}", blockHash); - return true; - } - - return false; - } - - private int Demand(IEnumerable blockHashes, bool retry) - { - int sum = 0; - foreach (BlockHash hash in blockHashes) - { - if (Demand(hash, retry)) - { - sum++; - } - } - - return sum; - } - - private bool QueuedDemandCompleted() => - _started && _demands.IsEmpty && _satisfiedBlocks.All(kv => kv.Value); - - private Func CreateEnqueuing( - IList blockHashes, - BlockFetcher blockFetcher, - CancellationToken cancellationToken, - AsyncProducerConsumerQueue> queue - ) => - async (peer, ct) => - { - ct.ThrowIfCancellationRequested(); - var demands = new HashSet(blockHashes); - try - { - _logger.Debug( - "Requesting {BlockCount} blocks from {Peer}...", - blockHashes.Count, - peer - ); - - try - { - ConfiguredCancelableAsyncEnumerable<(Block, BlockCommit?)> blocks = - blockFetcher(peer, blockHashes, cancellationToken) - .WithCancellation(cancellationToken); - await foreach ((Block block, BlockCommit? commit) in blocks) - { - _logger.Debug( - "Downloaded block #{BlockIndex} {BlockHash} from {Peer}", - block.Index, - block.Hash, - peer - ); - - if (Satisfy(block)) - { - await queue.EnqueueAsync( - Tuple.Create(block, commit, peer), - cancellationToken - ); - } - - demands.Remove(block.Hash); - } - } - catch (OperationCanceledException e) - { - if (ct.IsCancellationRequested) - { - _logger.Error( - e, - "A blockFetcher job (peer: {Peer}) is cancelled", - peer - ); - throw; - } - - _logger.Debug( - e, - "Timed out while waiting for a response from {Peer}", - peer - ); - } - catch (Exception e) - { - _logger.Error(e, "A blockFetcher job (peer: {Peer}) has failed", peer); - } - } - finally - { - if (demands.Any()) - { - _logger.Verbose( - "Fetched blocks from {Peer}, but there are still " + - "unsatisfied demands ({UnsatisfiedDemandsNumber}) so " + - "enqueue them again: {UnsatisfiedDemands}", - peer, - demands.Count, - demands - ); - Demand(demands, retry: true); - } - else - { - _logger.Verbose("Fetched blocks from {Peer}", peer); - } - } - }; - - internal class PeerPool - { - private readonly ConcurrentQueue _completions; - private readonly ConcurrentDictionary _tasks; - private long _taken; - private long _finished; - - public PeerPool(IEnumerable peers) - { - _completions = new ConcurrentQueue(peers); - _tasks = new ConcurrentDictionary(); - _taken = 0; - _finished = 0; - } - - public async Task SpawnAsync( - Func action, - CancellationToken cancellationToken = default - ) - { - Interlocked.Increment(ref _taken); - TPeer? peer; - while (!_completions.TryDequeue(out peer)) - { - Task[] tasks = _tasks.Values - .Concat(new[] { Task.Delay(500, cancellationToken) }) - .ToArray(); - await Task.WhenAny(tasks); - cancellationToken.ThrowIfCancellationRequested(); - } - - if (_tasks.TryRemove(peer, out Task? completeTask)) - { - await completeTask; - } - - _tasks[peer] = Task.Run( - async () => - { - cancellationToken.ThrowIfCancellationRequested(); - try - { - await action(peer, cancellationToken); - } - finally - { - _completions.Enqueue(peer); - Interlocked.Increment(ref _finished); - } - }, - cancellationToken: cancellationToken - ); - } - - public async Task WaitAll(CancellationToken cancellationToken = default) - { - while (Interlocked.Read(ref _taken) > Interlocked.Read(ref _finished)) - { - cancellationToken.ThrowIfCancellationRequested(); - Task[] tasks = _tasks.Values.ToArray(); - if (tasks.Any()) - { - var tcs = new TaskCompletionSource(); - using CancellationTokenRegistration ctr = cancellationToken.Register( - () => tcs.TrySetCanceled(), - useSynchronizationContext: false - ); - await Task.WhenAny(Task.WhenAll(tasks), tcs.Task); - } - else - { - await Task.Delay(100, cancellationToken); - } - } - } - } - } -} diff --git a/test/Libplanet.Net.Tests/BlockCompletionTest.cs b/test/Libplanet.Net.Tests/BlockCompletionTest.cs deleted file mode 100644 index c6ee1f6f39f..00000000000 --- a/test/Libplanet.Net.Tests/BlockCompletionTest.cs +++ /dev/null @@ -1,423 +0,0 @@ -#nullable disable -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Dasync.Collections; -using Libplanet.Crypto; -using Libplanet.Types.Blocks; -using Nito.AsyncEx; -using Serilog; -using xRetry; -using Xunit; -using Xunit.Abstractions; -#if NETFRAMEWORK && (NET47 || NET471) -using static Libplanet.Tests.HashSetExtensions; -#endif -using static Libplanet.Tests.TestUtils; -using AsyncEnumerable = System.Linq.AsyncEnumerable; - -namespace Libplanet.Net.Tests -{ - public class BlockCompletionTest - { - private const int Timeout = 20000; - private readonly ITestOutputHelper _output; - private readonly ILogger _logger; - - public BlockCompletionTest(ITestOutputHelper output) - { - const string outputTemplate = - "{Timestamp:HH:mm:ss:ffffffZ} [thread/{ThreadId}] {Message}"; - Log.Logger = new LoggerConfiguration() - .MinimumLevel.Verbose() - .Enrich.WithThreadId() - .WriteTo.TestOutput(output, outputTemplate: outputTemplate) - .CreateLogger() - .ForContext(); - _logger = Log.ForContext(); - _output = output; - } - - [Fact(Timeout = Timeout * 3)] - public async Task PeerPool() - { - const int tasks = 50; - ConcurrentDictionary peers = new ConcurrentDictionary( - Enumerable.Range(0, 3).Select(peerId => new KeyValuePair(peerId, 0)) - ); - var concurrentWorkersLogs = new ConcurrentBag(); - long done = 0; - var pool = new BlockCompletion.PeerPool(peers.Keys); - var random = new System.Random(); - Task[] spawns = Enumerable.Range(0, tasks).Select(i => - { - int sleep = random.Next(5, 50); - return pool.SpawnAsync(async (peerId, cancellationToken) => - { - try - { - int counter; - do - { - counter = peers[peerId]; - } - while (!peers.TryUpdate(peerId, counter + 1, counter)); - - concurrentWorkersLogs.Add( - peers.Where(kv => kv.Key != peerId).Sum(kv => kv.Value) + - counter + 1 - ); - - await Task.Delay(sleep); - - do - { - counter = peers[peerId]; - } - while (!peers.TryUpdate(peerId, counter - 1, counter)); - } - catch (Exception e) - { - _output.WriteLine(e.ToString()); - } - finally - { - Interlocked.Increment(ref done); - _output.WriteLine($"Task {i} finished."); - } - }); - }).ToArray(); - - _output.WriteLine("Wait spawned tasks to be finished..."); - await Task.WhenAll(spawns); - _output.WriteLine("All spawned tasks finished; wait PeerPool to be finished..."); - await pool.WaitAll(); - _output.WriteLine("PeerPool finished."); - - Assert.Equal(tasks, Interlocked.Read(ref done)); - Assert.Equal(tasks, concurrentWorkersLogs.Count); - foreach (int log in concurrentWorkersLogs) - { - Assert.InRange(log, 0, 3); - } - } - - [RetryFact(Timeout = Timeout)] - public async void EnumerateChunks() - { - // 0, 1: Already existed blocks - // 2, 3, 4, 5, 6: first chunk - // 7, 8, 9, 10, 11: second chunk - // 12, 13, 14: last chunk - ImmutableArray fixture = GenerateBlocks(15).ToImmutableArray(); - const int initialHeight = 2; - const int window = 5; - var bc = new BlockCompletion( - fixture.Take(initialHeight).Select(b => b.Hash).ToImmutableHashSet().Contains, - window - ); - var logs = new ConcurrentBag<(int, ImmutableArray)>(); - var ev = new AsyncAutoResetEvent(false); - var bg = Task.Run(async () => - { - await Task.Delay(100); - int i = 0; - await AsyncEnumerable.ForEachAsync(bc.EnumerateChunks(), hashes => - { - ImmutableArray hashesArray = hashes.ToImmutableArray(); - logs.Add((i, hashesArray)); - i++; - - // To test dynamic demands - if (hashesArray.Contains(fixture[7].Hash)) - { - bc.Demand(fixture[14].Hash); - bc.Demand(fixture[0].Hash); // This should be ignored as it's existed. - bc.Demand(fixture[3].Hash); // This should be ignored as it's satisfied. - } - - ev.Set(); - _logger.Verbose("Got a chunk of hashes: {0}", string.Join(", ", hashesArray)); - }); - }); - - // Demand: 2, 3, 4, 5, 6 - bc.Demand(fixture.Skip(initialHeight).Take(5).Select(b => b.Hash)); - - // Chunk: 2, 3, 4, 5, 6 - _logger.Verbose("Waiting demand #2-6..."); - - // TODO change waiting condition - await Task.Delay(1000); - _logger.Verbose("Demand #2-6 processed"); - var actual = new List(); - while (logs.TryTake(out var log)) - { - actual.AddRange(log.Item2); - } - - Assert.Equal(fixture.Skip(initialHeight).Take(window).Select(b => b.Hash), actual); - - // Complete: 2, 3, 4, 5 (and no 6) - for (int i = initialHeight; i < initialHeight + window - 1; i++) - { - bc.Satisfy(fixture[i]); - } - - // Demand: 7, 8, 9, 10, 11, 12, 13 (and 14 <- 7 will be added soon) - bc.Demand(fixture.Skip(initialHeight + window).Select(b => b.Hash)); - - // Chunk: 7, 8, 9, 10, 11 - _logger.Verbose("Waiting demand #7-11..."); - // TODO change waiting condition - await Task.Delay(1000); - _logger.Verbose("Demand #7-11 processed"); - - actual = new List(); - while (logs.TryTake(out var log)) - { - actual.AddRange(log.Item2); - } - - Assert.Equal( - fixture.Skip(initialHeight + window).Take(window).Select(b => b.Hash), - actual - ); - - // Complete: 6, 7, 8, 9, 10, 11 - for (int i = initialHeight + window - 1; i < initialHeight + window * 2; i++) - { - bc.Satisfy(fixture[i]); - } - - // Chunk: 12, 13, 14 - _logger.Verbose("Waiting demand #12-14..."); - // TODO change waiting condition - await Task.Delay(1000); - _logger.Verbose("Demand #12-14 processed"); - actual = new List(); - while (logs.TryTake(out var log)) - { - actual.AddRange(log.Item2); - } - - Assert.Equal( - fixture.Skip(initialHeight + window * 2).Select(b => b.Hash).ToImmutableHashSet(), - actual.ToImmutableHashSet() - ); - - // Complete: 12, 13, 14 - for (int i = initialHeight + window * 2; i < fixture.Length; i++) - { - bc.Satisfy(fixture[i]); - } - - await bg; - } - - [Fact(Timeout = Timeout)] - public async Task Complete() - { - // 0, 1: Already existed blocks - // 2, 3, 4, 5, 6: first chunk - // 7, 8, 9, 10, 11: second chunk - // 12, 13, 14: last chunk - ImmutableArray fixture = GenerateBlocks(15).ToImmutableArray(); - - // Blocks each block has: - // A: 0, 4, 8, 12 - // B: 1, 5, 9, 13 - // C: 2, 6, 10, 14 - // D: 3, 7, 11 - char[] peers = { 'A', 'B', 'C', 'D' }; - ImmutableDictionary - > - peerBlocks = peers.ToImmutableDictionary( - p => p, - p => fixture - .Skip(p - 'A') - .Where((_, i) => i % 4 < 1) - .ToImmutableDictionary( - b => b.Hash, - b => (b, TestUtils.CreateBlockCommit(b)))); - - const int initialHeight = 2; - const int window = 5; - var bc = new BlockCompletion( - fixture.Take(initialHeight).Select(b => b.Hash).ToImmutableHashSet().Contains, - window - ); - ImmutableArray initialDemands = fixture - .Skip(initialHeight + 10) - .Select(b => b.Hash) - .ToImmutableArray(); - bc.Demand(initialDemands); - _logger.Verbose("Initial demands: {0}", initialDemands); - IAsyncEnumerable> rv = bc.Complete( - new[] { 'A', 'B', 'C', 'D' }, - (peer, hashes, token) => new AsyncEnumerable<(Block, BlockCommit)>( - async yield => - { - var blocksPeerHas = peerBlocks[peer]; - var sent = new HashSet(); - foreach (BlockHash hash in hashes) - { - if (blocksPeerHas.ContainsKey(hash)) - { - (Block block, BlockCommit commit) = blocksPeerHas[hash]; - await yield.ReturnAsync((block, commit)); - sent.Add(block.Hash); - } - } - - _logger.Verbose("Peer {Peer} sent blocks: {SentBlockHashes}", peer, sent); - }) - ); - - var downloadedBlocks = new HashSet>(); - var sourcePeers = new HashSet(); - await AsyncEnumerable.ForEachAsync(rv, triple => - { - downloadedBlocks.Add(Tuple.Create(triple.Item1, triple.Item2)); - sourcePeers.Add(triple.Item3); - }); - - Assert.Equal( - fixture.Skip(2).ToHashSet(), - downloadedBlocks.Select(pair => pair.Item1).ToHashSet()); - Assert.Subset(peers.ToHashSet(), sourcePeers); - } - - [Fact(Timeout = Timeout)] - public async Task CompleteWithBlockFetcherGivingWrongBlocks() - { - Block genesis = ProposeGenesisBlock(GenesisProposer); - Block demand = ProposeNextBlock(genesis, new PrivateKey()); - BlockCommit demandCommit = TestUtils.CreateBlockCommit(demand); - Block wrong = ProposeNextBlock(genesis, new PrivateKey()); - BlockCommit wrongCommit = TestUtils.CreateBlockCommit(wrong); - _logger.Debug("Genesis: #{Index} {Hash}", genesis.Index, genesis.Hash); - _logger.Debug("Demand: #{Index} {Hash}", demand.Index, demand.Hash); - _logger.Debug("Wrong: #{Index} {Hash}", wrong.Index, wrong.Hash); - var bc = new BlockCompletion( - ((IEquatable)genesis.Hash).Equals, - 5 - ); - bc.Demand(demand.Hash); - - long counter = 0; - BlockCompletion.BlockFetcher wrongBlockFetcher = - (peer, blockHashes, token) => - new AsyncEnumerable<(Block, BlockCommit)>(async yield => - { - // Provides a wrong block (i.e., not corresponding to the demand) - // at first call, and then provide a proper block later calls. - await yield.ReturnAsync(Interlocked.Read(ref counter) < 1 - ? (wrong, wrongCommit) - : (demand, demandCommit)); - Interlocked.Increment(ref counter); - }); - - Tuple[] expected = - new[] { Tuple.Create(demand, demandCommit, 'A') }; - Tuple[] result = - await AsyncEnumerable.ToArrayAsync(bc.Complete(new[] { 'A' }, wrongBlockFetcher)); - - Assert.Equal(expected, result); - } - - [Fact(Timeout = Timeout)] - public async Task CompleteWithNonRespondingPeers() - { - ImmutableArray fixture = GenerateBlocks(15).ToImmutableArray(); - var bc = new BlockCompletion(_ => false, 5); - bc.Demand(fixture.Select(b => b.Hash)); - - BlockCompletion.BlockFetcher blockFetcher = - (peer, blockHashes, token) => - new AsyncEnumerable<(Block, BlockCommit)>(async yield => - { - // Peer A does not respond and Peer B does respond. - if (peer == 'A') - { - const int transportTimeout = 3_000; - await Task.Delay(transportTimeout, yield.CancellationToken); - - // Technically this should throw CommunicationException, but that would - // require much more scaffolding. - throw new Exception("Peer failed to respond"); - } - - foreach (Block b in fixture) - { - if (blockHashes.Contains(b.Hash)) - { - await yield.ReturnAsync((b, TestUtils.CreateBlockCommit(b))); - } - } - }); - - Tuple[] result = - await AsyncEnumerable.ToArrayAsync( - bc.Complete(new[] { 'A', 'B' }, blockFetcher)); - Assert.Equal( - fixture.Select(b => (b, 'B')).ToHashSet(), - result.Select(triple => (triple.Item1, triple.Item3)).ToHashSet()); - } - - [Fact(Timeout = Timeout)] - public async Task CompleteWithCrashingPeers() - { - ImmutableArray fixture = GenerateBlocks(15).ToImmutableArray(); - var bc = new BlockCompletion(_ => false, 5); - bc.Demand(fixture.Select(b => b.Hash)); - - BlockCompletion.BlockFetcher blockFetcher = - (peer, blockHashes, token) => - new AsyncEnumerable<(Block, BlockCommit)>(async yield => - { - // Peer A does crash and Peer B does respond. - if (peer == 'A') - { - throw new Exception("Peer A can't respond."); - } - - foreach (Block b in fixture) - { - if (blockHashes.Contains(b.Hash)) - { - await yield.ReturnAsync((b, TestUtils.CreateBlockCommit(b))); - } - } - }); - - Tuple[] result = - await AsyncEnumerable.ToArrayAsync(bc.Complete(new[] { 'A', 'B' }, blockFetcher)); - Assert.Equal( - fixture.Select(b => (b, 'B')).ToHashSet(), - result.Select(triple => (triple.Item1, triple.Item3)).ToHashSet()); - } - - private IEnumerable GenerateBlocks(int count) - { - if (count >= 1) - { - Block block = ProposeGenesisBlock(GenesisProposer); - yield return block; - - for (int i = 1; i < count; i++) - { - block = ProposeNextBlock( - block, - GenesisProposer, - lastCommit: CreateBlockCommit(block)); - yield return block; - } - } - } - } -} From b022e4fab29b9e412cd10f2c583bf0f120f32608 Mon Sep 17 00:00:00 2001 From: Say Cheong Date: Mon, 2 Sep 2024 16:25:16 +0900 Subject: [PATCH 4/4] Changelog --- CHANGES.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 80e7f8c981d..27bf52c3fa3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,8 @@ To be released. - Changed `BlockLocator()` to take a single `BlockHash`. [[#3942]] - Changed `BlockLocator()` to no longer implemnet `IEnumerable`. [[#3942]] + - (Libplanet.Net) Changed `BlockHashDownloadState` and `BlockDownloadState` + to be `Obsolete`. [[#3943]] ### Backward-incompatible network protocol changes @@ -36,10 +38,9 @@ To be released. - (Libplanet.Store) Optimized `HashNode.ToBencodex()` method. [[#3922], [#3924]] - (Libplanet.Store) Optimized internal conversions to `KeyBytes`. [[#3926]] - - (Libplanet.Net) Changed the reporting behavior for `BlockHashDownloadState` - and `BlockDownloadState` during preloading. These no longer report - meaningful data and it is strongly advised not to rely on these to track - the progress of preloading. [[#3931]] + - (Libplanet.Net) Changed to no longer report `BlockHashDownloadState` + and `BlockDownloadState` during preloading. It is strongly advised + not to rely on these to track the progress of preloading. [[#3943]] ### Bug fixes @@ -51,9 +52,9 @@ To be released. [#3922]: https://github.com/planetarium/libplanet/issues/3922 [#3924]: https://github.com/planetarium/libplanet/pull/3924 [#3926]: https://github.com/planetarium/libplanet/pull/3926 -[#3931]: https://github.com/planetarium/libplanet/pull/3931 [#3934]: https://github.com/planetarium/libplanet/pull/3934 [#3942]: https://github.com/planetarium/libplanet/pull/3942 +[#3943]: https://github.com/planetarium/libplanet/pull/3943 Version 5.2.2