diff --git a/src/MonoTorrent.Tests/Client/DiskManagerTests.cs b/src/MonoTorrent.Tests/Client/DiskManagerTests.cs index cae757c0b..f1f1df27e 100644 --- a/src/MonoTorrent.Tests/Client/DiskManagerTests.cs +++ b/src/MonoTorrent.Tests/Client/DiskManagerTests.cs @@ -404,8 +404,9 @@ public async Task WriteBlock_SpanTwoFiles () } [Test] - public async Task WritePiece_FirstTwoSwapped () + public async Task WritePiece_FirstTwoSwapped ([Values (0, Piece.BlockSize, Piece.BlockSize * 3)] int cacheSize) { + await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { DiskCacheBytes = cacheSize }.ToSettings ()); writer.Data = null; var blocks = fileData.Data @@ -419,7 +420,12 @@ public async Task WritePiece_FirstTwoSwapped () await diskManager.WriteAsync (fileData, new BlockInfo (0, Piece.BlockSize * 2, Piece.BlockSize), blocks[2]); Assert.IsTrue (Toolbox.ByteMatch (fileData.Hashes[0], await diskManager.GetHashAsync (fileData, 0)), "#1"); - Assert.AreEqual (Piece.BlockSize * 2, writer.ReadData.Sum (t => t.Item3), "#2"); + // If we have at least Piece.BlockSize in the disk cache we'll need to read nothing from disk + if (cacheSize < Piece.BlockSize) + Assert.AreEqual (Piece.BlockSize * 2, writer.ReadData.Sum (t => t.Item3), "#2"); + else + Assert.AreEqual (0, writer.ReadData.Sum (t => t.Item3), "#2"); + writer.ReadData.Clear (); Assert.IsTrue (Toolbox.ByteMatch (fileData.Hashes[0], await diskManager.GetHashAsync (fileData, 0)), "#3"); diff --git a/src/MonoTorrent.Tests/Client/MemoryCacheTests.cs b/src/MonoTorrent.Tests/Client/MemoryCacheTests.cs index 035cf8ea4..7f0cf8311 100644 --- a/src/MonoTorrent.Tests/Client/MemoryCacheTests.cs +++ b/src/MonoTorrent.Tests/Client/MemoryCacheTests.cs @@ -219,8 +219,7 @@ public async Task ReadBlockWhileWriting () // verify the original block is still accessible. Note: Reading a block implicitly flushes it, but we skip the // flush as the block was already marked as flushing by the 'Write' invocation. var result = new byte[3]; - var read = await cache.ReadAsync (torrent, new BlockInfo (0, 0, 3), result).WithTimeout (); - Assert.AreEqual (3, read); + Assert.IsTrue (await cache.ReadAsync (torrent, new BlockInfo (0, 0, 3), result).WithTimeout ()); Assert.AreEqual (1, writer.Writes.Count); Assert.AreEqual (3, cache.CacheHits); @@ -271,11 +270,11 @@ public async Task WriteSameBlockTwice () Assert.AreEqual (6, memory.CacheUsed); var result = new byte[3]; - Assert.AreEqual (3, await memory.ReadAsync (torrent, new BlockInfo (0, 0, 3), result)); + Assert.IsTrue (await memory.ReadAsync (torrent, new BlockInfo (0, 0, 3), result)); CollectionAssert.AreEqual (data1, result); Assert.AreEqual (3, memory.CacheUsed); - Assert.AreEqual (3, await memory.ReadAsync (torrent, new BlockInfo (0, 3, 3), result)); + Assert.IsTrue (await memory.ReadAsync (torrent, new BlockInfo (0, 3, 3), result)); CollectionAssert.AreEqual (data3, result); Assert.AreEqual (0, memory.CacheUsed); } @@ -332,7 +331,7 @@ public async Task MemoryWriter_ZeroCapacity_Write() Assert.AreEqual (0, cache.CacheUsed); var data = new byte[1]; - Assert.AreEqual (1, await cache.ReadAsync (torrent, new BlockInfo (0, 0, 1), data)); + Assert.IsTrue (await cache.ReadAsync (torrent, new BlockInfo (0, 0, 1), data)); Assert.AreEqual (7, data[0]); } } diff --git a/src/MonoTorrent/MonoTorrent.Client.PieceWriters/IBlockCache.cs b/src/MonoTorrent/MonoTorrent.Client.PieceWriters/IBlockCache.cs index e16dad1b8..35d80c841 100644 --- a/src/MonoTorrent/MonoTorrent.Client.PieceWriters/IBlockCache.cs +++ b/src/MonoTorrent/MonoTorrent.Client.PieceWriters/IBlockCache.cs @@ -35,7 +35,33 @@ namespace MonoTorrent.Client.PieceWriters { public interface IBlockCache : IDisposable { - ReusableTask ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer); + /// + /// Reads data from the cache and flushes it to disk, or reads the data from disk if it is not available in the cache. + /// + /// + /// + /// + /// + ReusableTask ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer); + + /// + /// If the block of data is available in the cache, the data is read into the buffer and the method returns true. + /// If the block is unavailable, the buffer will not be modified and the method will return false. + /// + /// + /// + /// + /// + ReusableTask ReadFromCacheAsync (ITorrentData torrent, BlockInfo block, byte[] buffer); + + /// + /// + /// + /// + /// + /// + /// + /// ReusableTask WriteAsync (ITorrentData torrent, BlockInfo block, byte[] buffer, bool preferSkipCache); } } diff --git a/src/MonoTorrent/MonoTorrent.Client.PieceWriters/MemoryCache.cs b/src/MonoTorrent/MonoTorrent.Client.PieceWriters/MemoryCache.cs index cfbf684e4..b20de55fc 100644 --- a/src/MonoTorrent/MonoTorrent.Client.PieceWriters/MemoryCache.cs +++ b/src/MonoTorrent/MonoTorrent.Client.PieceWriters/MemoryCache.cs @@ -124,7 +124,16 @@ internal MemoryCache (int capacity, IPieceWriter writer) WriteMonitor = new SpeedMonitor (); } - public async ReusableTask ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer) + public async ReusableTask ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer) + { + if (await ReadFromCacheAsync (torrent, block, buffer)) + return true; + + Interlocked.Add (ref cacheMisses, block.RequestLength); + return await ReadFromFilesAsync (torrent, block, buffer).ConfigureAwait (false) == block.RequestLength; + } + + public async ReusableTask ReadFromCacheAsync (ITorrentData torrent, BlockInfo block, byte[] buffer) { if (torrent == null) throw new ArgumentNullException (nameof (torrent)); @@ -150,12 +159,11 @@ public async ReusableTask ReadAsync (ITorrentData torrent, BlockInfo block, } } Interlocked.Add (ref cacheHits, block.RequestLength); - return block.RequestLength; + return true; } } - Interlocked.Add (ref cacheMisses, block.RequestLength); - return await ReadFromFilesAsync (torrent, block, buffer).ConfigureAwait (false); + return false; } public async ReusableTask WriteAsync (ITorrentData torrent, BlockInfo block, byte[] buffer, bool preferSkipCache) diff --git a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs index 680b3f0f1..b45119e4f 100644 --- a/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs +++ b/src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs @@ -372,7 +372,7 @@ internal async ReusableTask ReadAsync (ITorrentData manager, BlockInfo req try { if (ReadLimiter.TryProcess (request.RequestLength)) { - return await Cache.ReadAsync (manager, request, buffer).ConfigureAwait (false) == request.RequestLength; + return await Cache.ReadAsync (manager, request, buffer).ConfigureAwait (false); } else { var tcs = new ReusableTaskCompletionSource (); ReadQueue.Enqueue (new BufferedIO (manager, request, buffer, false, tcs)); @@ -408,6 +408,9 @@ internal async ReusableTask WriteAsync (ITorrentData manager, BlockInfo request, ReusableTask writeTask = default; using (incrementalHash == null ? default : await incrementalHash.Locker.EnterAsync ()) { + if (incrementalHash != null && incrementalHash.NextOffsetToHash < request.StartOffset) + await TryIncrementallyHashFromMemory (manager, pieceIndex, incrementalHash); + bool canIncrementallyHash = incrementalHash != null && request.StartOffset == incrementalHash.NextOffsetToHash; // If we can incrementally hash the data, instruct the cache to write the block straight through @@ -442,6 +445,24 @@ internal async ReusableTask WriteAsync (ITorrentData manager, BlockInfo request, } } + async ReusableTask TryIncrementallyHashFromMemory (ITorrentData torrent, int pieceIndex, IncrementalHashData incrementalHash) + { + var sizeOfPiece = torrent.BytesPerPiece (pieceIndex); + using var releaser = BufferPool.Rent (Piece.BlockSize, out byte[] buffer); + while (incrementalHash.NextOffsetToHash < sizeOfPiece) { + var remaining = Math.Min (Piece.BlockSize, sizeOfPiece - incrementalHash.NextOffsetToHash); + if (await Cache.ReadFromCacheAsync (torrent, new BlockInfo (pieceIndex, incrementalHash.NextOffsetToHash, remaining), buffer)) { + incrementalHash.Hasher.TransformBlock (buffer, 0, remaining, buffer, 0); + incrementalHash.NextOffsetToHash += remaining; + } else { + break; + } + } + + if (incrementalHash.NextOffsetToHash == sizeOfPiece) + incrementalHash.Hasher.TransformFinalBlock (Array.Empty (), 0, 0); + } + async ReusableTask ProcessBufferedIOAsync (bool force = false) { await IOLoop; @@ -478,7 +499,7 @@ async ReusableTask ProcessBufferedIOAsync (bool force = false) io = ReadQueue.Dequeue (); try { - bool result = await Cache.ReadAsync (io.manager, io.request, io.buffer) == io.request.RequestLength; + bool result = await Cache.ReadAsync (io.manager, io.request, io.buffer); io.tcs.SetResult (result); } catch (Exception ex) { io.tcs.SetException (ex);