Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ESDB-169-13] Support async reads & writes in chunks #66

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ public class ChunkDataReadStream(Stream chunkFileStream) : Stream {
public sealed override bool CanRead => true;
public sealed override bool CanSeek => true;
public sealed override bool CanWrite => false;
public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync");
public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) =>
throw new InvalidOperationException();
public sealed override void Flush() => throw new InvalidOperationException();
public sealed override Task FlushAsync(CancellationToken cancellationToken) =>
throw new InvalidOperationException();
public sealed override void SetLength(long value) => throw new InvalidOperationException();
public override long Length => throw new NotSupportedException();

// reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint
public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count);
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) =>
ChunkFileStream.ReadAsync(buffer, cancellationToken);

// seeks need to support only `SeekOrigin.Begin`
public override long Seek(long offset, SeekOrigin origin) {
Expand Down
30 changes: 24 additions & 6 deletions src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).

using System;
using System.IO;
using System.Buffers;
using System.Security.Cryptography;

namespace EventStore.Plugins.Transforms;
Expand All @@ -15,14 +14,21 @@ public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksum
public sealed override bool CanSeek => false;
public sealed override bool CanWrite => true;
public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public sealed override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) =>
throw new InvalidOperationException();
public sealed override void Write(byte[] buffer, int offset, int count) =>
throw new InvalidOperationException("use WriteAsync");
public sealed override void Flush() =>
throw new InvalidOperationException("use FlushAsync");

public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();

public override void Write(byte[] buffer, int offset, int count) {
ChunkFileStream.Write(buffer, offset, count);
ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0);
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) {
await ChunkFileStream.WriteAsync(buffer, cancellationToken);
Checksum(buffer);
}

public override void Flush() => ChunkFileStream.Flush();
public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct);
public override void SetLength(long value) => ChunkFileStream.SetLength(value);
public override long Length => ChunkFileStream.Length;
public override long Position {
Expand All @@ -38,6 +44,18 @@ public override long Position {
}
}

public void Checksum(ReadOnlyMemory<byte> data) {
// HashAlgorithm.TransformBlock() doesn't support span/memory, so we need to rent a byte array from the pool
byte[] tmp = ArrayPool<byte>.Shared.Rent(data.Length);
try {
data.CopyTo(tmp.AsMemory());
ChecksumAlgorithm.TransformBlock(tmp, 0, data.Length, null, 0);
Array.Clear(tmp, 0, data.Length);
} finally {
ArrayPool<byte>.Shared.Return(tmp);
}
}

private void ReadAndChecksum(long count) {
var buffer = new byte[4096];
long toRead = count;
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace EventStore.Plugins.Transforms;

public interface IChunkWriteTransform {
ChunkDataWriteStream TransformData(ChunkDataWriteStream stream);
void CompleteData(int footerSize, int alignmentSize);
void WriteFooter(ReadOnlySpan<byte> footer, out int fileSize);
ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken cancellationToken = default);
ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, CancellationToken cancellationToken = default);
}
Loading