diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs index 31d5f23..91600f4 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -12,13 +12,19 @@ public class ChunkDataReadStream(Stream chunkFileStream) : Stream { public sealed override bool CanRead => true; public sealed override bool CanSeek => true; public sealed override bool CanWrite => false; + public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync"); public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => + throw new InvalidOperationException(); public sealed override void Flush() => throw new InvalidOperationException(); + public sealed override Task FlushAsync(CancellationToken cancellationToken) => + throw new InvalidOperationException(); public sealed override void SetLength(long value) => throw new InvalidOperationException(); public override long Length => throw new NotSupportedException(); // reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint - public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count); + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + ChunkFileStream.ReadAsync(buffer, cancellationToken); // seeks need to support only `SeekOrigin.Begin` public override long Seek(long offset, SeekOrigin origin) { diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs index 47ad52b..9fbeb0a 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -1,8 +1,7 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; +using System.Buffers; using System.Security.Cryptography; namespace EventStore.Plugins.Transforms; @@ -15,14 +14,21 @@ public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksum public sealed override bool CanSeek => false; public sealed override bool CanWrite => true; public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public sealed override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + throw new InvalidOperationException(); + public sealed override void Write(byte[] buffer, int offset, int count) => + throw new InvalidOperationException("use WriteAsync"); + public sealed override void Flush() => + throw new InvalidOperationException("use FlushAsync"); + public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); - public override void Write(byte[] buffer, int offset, int count) { - ChunkFileStream.Write(buffer, offset, count); - ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0); + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { + await ChunkFileStream.WriteAsync(buffer, cancellationToken); + Checksum(buffer); } - public override void Flush() => ChunkFileStream.Flush(); + public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct); public override void SetLength(long value) => ChunkFileStream.SetLength(value); public override long Length => ChunkFileStream.Length; public override long Position { @@ -38,6 +44,18 @@ public override long Position { } } + public void Checksum(ReadOnlyMemory data) { + // HashAlgorithm.TransformBlock() doesn't support span/memory, so we need to rent a byte array from the pool + byte[] tmp = ArrayPool.Shared.Rent(data.Length); + try { + data.CopyTo(tmp.AsMemory()); + ChecksumAlgorithm.TransformBlock(tmp, 0, data.Length, null, 0); + Array.Clear(tmp, 0, data.Length); + } finally { + ArrayPool.Shared.Return(tmp); + } + } + private void ReadAndChecksum(long count) { var buffer = new byte[4096]; long toRead = count; diff --git a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs index b354d47..f52169d 100644 --- a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs +++ b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs @@ -7,6 +7,6 @@ namespace EventStore.Plugins.Transforms; public interface IChunkWriteTransform { ChunkDataWriteStream TransformData(ChunkDataWriteStream stream); - void CompleteData(int footerSize, int alignmentSize); - void WriteFooter(ReadOnlySpan footer, out int fileSize); + ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken cancellationToken = default); + ValueTask WriteFooter(ReadOnlyMemory footer, CancellationToken cancellationToken = default); }