Skip to content

Commit

Permalink
[core] Allow out-of-order incremental hashing
Browse files Browse the repository at this point in the history
The memory cache enables us to support some level of out-of-order
incremental hashing.

If a block is still in the cache then we can incremental it!
Once we have read the block from the cache it is flushed to disk.

One potential race condition has been eliminated by keeping
blocks available in the memory cache while they are being
flushed to disk:

The scenario is:
--

Thread 1 writes a block to the cache.
The cache is full.
Thread pool choose the oldest unused block, removes it from
the cache and then flushes it.

Thread 2 then tries to read the block which thread 1 evicted.
Thread 2 cannot find it in the memory cache.
Thread 2 tries to read it from disk, but due to the lack of
strong guarantess in the order in which async Tasks actually
execute it's Read will be handled before Thread 1 actually
writes the block to disk, which means Thread 2 unintentionally
reads junk from disk.

--

Now the blocks are guaranteed to be available in the cache until
the corresponding IPieceWriter.WriteAsync invocation has completed.

This makes out-of-order incremental hashing exceptionally effective
and it's possible to download gigs of data using a 5MB memory cache
and flush only a few kB of data to disk *before* it is incrementally
hashed.
  • Loading branch information
alanmcgovern committed Feb 25, 2021
1 parent 0cf0c4e commit a0de480
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
10 changes: 8 additions & 2 deletions src/MonoTorrent.Tests/Client/DiskManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,9 @@ public async Task WriteBlock_SpanTwoFiles ()
}

[Test]
public async Task WritePiece_FirstTwoSwapped ()
public async Task WritePiece_FirstTwoSwapped ([Values (0, Piece.BlockSize, Piece.BlockSize * 3)] int cacheSize)
{
await diskManager.UpdateSettingsAsync (new EngineSettingsBuilder { DiskCacheBytes = cacheSize }.ToSettings ());
writer.Data = null;

var blocks = fileData.Data
Expand All @@ -419,7 +420,12 @@ public async Task WritePiece_FirstTwoSwapped ()
await diskManager.WriteAsync (fileData, new BlockInfo (0, Piece.BlockSize * 2, Piece.BlockSize), blocks[2]);

Assert.IsTrue (Toolbox.ByteMatch (fileData.Hashes[0], await diskManager.GetHashAsync (fileData, 0)), "#1");
Assert.AreEqual (Piece.BlockSize * 2, writer.ReadData.Sum (t => t.Item3), "#2");
// If we have at least Piece.BlockSize in the disk cache we'll need to read nothing from disk
if (cacheSize < Piece.BlockSize)
Assert.AreEqual (Piece.BlockSize * 2, writer.ReadData.Sum (t => t.Item3), "#2");
else
Assert.AreEqual (0, writer.ReadData.Sum (t => t.Item3), "#2");


writer.ReadData.Clear ();
Assert.IsTrue (Toolbox.ByteMatch (fileData.Hashes[0], await diskManager.GetHashAsync (fileData, 0)), "#3");
Expand Down
9 changes: 4 additions & 5 deletions src/MonoTorrent.Tests/Client/MemoryCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ public async Task ReadBlockWhileWriting ()
// verify the original block is still accessible. Note: Reading a block implicitly flushes it, but we skip the
// flush as the block was already marked as flushing by the 'Write' invocation.
var result = new byte[3];
var read = await cache.ReadAsync (torrent, new BlockInfo (0, 0, 3), result).WithTimeout ();
Assert.AreEqual (3, read);
Assert.IsTrue (await cache.ReadAsync (torrent, new BlockInfo (0, 0, 3), result).WithTimeout ());
Assert.AreEqual (1, writer.Writes.Count);
Assert.AreEqual (3, cache.CacheHits);

Expand Down Expand Up @@ -271,11 +270,11 @@ public async Task WriteSameBlockTwice ()
Assert.AreEqual (6, memory.CacheUsed);

var result = new byte[3];
Assert.AreEqual (3, await memory.ReadAsync (torrent, new BlockInfo (0, 0, 3), result));
Assert.IsTrue (await memory.ReadAsync (torrent, new BlockInfo (0, 0, 3), result));
CollectionAssert.AreEqual (data1, result);
Assert.AreEqual (3, memory.CacheUsed);

Assert.AreEqual (3, await memory.ReadAsync (torrent, new BlockInfo (0, 3, 3), result));
Assert.IsTrue (await memory.ReadAsync (torrent, new BlockInfo (0, 3, 3), result));
CollectionAssert.AreEqual (data3, result);
Assert.AreEqual (0, memory.CacheUsed);
}
Expand Down Expand Up @@ -332,7 +331,7 @@ public async Task MemoryWriter_ZeroCapacity_Write()
Assert.AreEqual (0, cache.CacheUsed);

var data = new byte[1];
Assert.AreEqual (1, await cache.ReadAsync (torrent, new BlockInfo (0, 0, 1), data));
Assert.IsTrue (await cache.ReadAsync (torrent, new BlockInfo (0, 0, 1), data));
Assert.AreEqual (7, data[0]);
}
}
Expand Down
28 changes: 27 additions & 1 deletion src/MonoTorrent/MonoTorrent.Client.PieceWriters/IBlockCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,33 @@ namespace MonoTorrent.Client.PieceWriters
{
public interface IBlockCache : IDisposable
{
ReusableTask<int> ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer);
/// <summary>
/// Reads data from the cache and flushes it to disk, or reads the data from disk if it is not available in the cache.
/// </summary>
/// <param name="torrent"></param>
/// <param name="block"></param>
/// <param name="buffer"></param>
/// <returns></returns>
ReusableTask<bool> ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer);

/// <summary>
/// If the block of data is available in the cache, the data is read into the buffer and the method returns true.
/// If the block is unavailable, the buffer will not be modified and the method will return false.
/// </summary>
/// <param name="torrent"></param>
/// <param name="block"></param>
/// <param name="buffer"></param>
/// <returns></returns>
ReusableTask<bool> ReadFromCacheAsync (ITorrentData torrent, BlockInfo block, byte[] buffer);

/// <summary>
///
/// </summary>
/// <param name="torrent"></param>
/// <param name="block"></param>
/// <param name="buffer"></param>
/// <param name="preferSkipCache"></param>
/// <returns></returns>
ReusableTask WriteAsync (ITorrentData torrent, BlockInfo block, byte[] buffer, bool preferSkipCache);
}
}
16 changes: 12 additions & 4 deletions src/MonoTorrent/MonoTorrent.Client.PieceWriters/MemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ internal MemoryCache (int capacity, IPieceWriter writer)
WriteMonitor = new SpeedMonitor ();
}

public async ReusableTask<int> ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer)
public async ReusableTask<bool> ReadAsync (ITorrentData torrent, BlockInfo block, byte[] buffer)
{
if (await ReadFromCacheAsync (torrent, block, buffer))
return true;

Interlocked.Add (ref cacheMisses, block.RequestLength);
return await ReadFromFilesAsync (torrent, block, buffer).ConfigureAwait (false) == block.RequestLength;
}

public async ReusableTask<bool> ReadFromCacheAsync (ITorrentData torrent, BlockInfo block, byte[] buffer)
{
if (torrent == null)
throw new ArgumentNullException (nameof (torrent));
Expand All @@ -150,12 +159,11 @@ public async ReusableTask<int> ReadAsync (ITorrentData torrent, BlockInfo block,
}
}
Interlocked.Add (ref cacheHits, block.RequestLength);
return block.RequestLength;
return true;
}
}

Interlocked.Add (ref cacheMisses, block.RequestLength);
return await ReadFromFilesAsync (torrent, block, buffer).ConfigureAwait (false);
return false;
}

public async ReusableTask WriteAsync (ITorrentData torrent, BlockInfo block, byte[] buffer, bool preferSkipCache)
Expand Down
25 changes: 23 additions & 2 deletions src/MonoTorrent/MonoTorrent.Client/Managers/DiskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ internal async ReusableTask<bool> ReadAsync (ITorrentData manager, BlockInfo req

try {
if (ReadLimiter.TryProcess (request.RequestLength)) {
return await Cache.ReadAsync (manager, request, buffer).ConfigureAwait (false) == request.RequestLength;
return await Cache.ReadAsync (manager, request, buffer).ConfigureAwait (false);
} else {
var tcs = new ReusableTaskCompletionSource<bool> ();
ReadQueue.Enqueue (new BufferedIO (manager, request, buffer, false, tcs));
Expand Down Expand Up @@ -408,6 +408,9 @@ internal async ReusableTask WriteAsync (ITorrentData manager, BlockInfo request,
ReusableTask writeTask = default;

using (incrementalHash == null ? default : await incrementalHash.Locker.EnterAsync ()) {
if (incrementalHash != null && incrementalHash.NextOffsetToHash < request.StartOffset)
await TryIncrementallyHashFromMemory (manager, pieceIndex, incrementalHash);

bool canIncrementallyHash = incrementalHash != null && request.StartOffset == incrementalHash.NextOffsetToHash;

// If we can incrementally hash the data, instruct the cache to write the block straight through
Expand Down Expand Up @@ -442,6 +445,24 @@ internal async ReusableTask WriteAsync (ITorrentData manager, BlockInfo request,
}
}

async ReusableTask TryIncrementallyHashFromMemory (ITorrentData torrent, int pieceIndex, IncrementalHashData incrementalHash)
{
var sizeOfPiece = torrent.BytesPerPiece (pieceIndex);
using var releaser = BufferPool.Rent (Piece.BlockSize, out byte[] buffer);
while (incrementalHash.NextOffsetToHash < sizeOfPiece) {
var remaining = Math.Min (Piece.BlockSize, sizeOfPiece - incrementalHash.NextOffsetToHash);
if (await Cache.ReadFromCacheAsync (torrent, new BlockInfo (pieceIndex, incrementalHash.NextOffsetToHash, remaining), buffer)) {
incrementalHash.Hasher.TransformBlock (buffer, 0, remaining, buffer, 0);
incrementalHash.NextOffsetToHash += remaining;
} else {
break;
}
}

if (incrementalHash.NextOffsetToHash == sizeOfPiece)
incrementalHash.Hasher.TransformFinalBlock (Array.Empty<byte> (), 0, 0);
}

async ReusableTask ProcessBufferedIOAsync (bool force = false)
{
await IOLoop;
Expand Down Expand Up @@ -478,7 +499,7 @@ async ReusableTask ProcessBufferedIOAsync (bool force = false)
io = ReadQueue.Dequeue ();

try {
bool result = await Cache.ReadAsync (io.manager, io.request, io.buffer) == io.request.RequestLength;
bool result = await Cache.ReadAsync (io.manager, io.request, io.buffer);
io.tcs.SetResult (result);
} catch (Exception ex) {
io.tcs.SetException (ex);
Expand Down

0 comments on commit a0de480

Please sign in to comment.