Skip to content

Commit

Permalink
ChunkDataWriteStream / IChunkWriteTransform: Support async writes & r…
Browse files Browse the repository at this point in the history
…emove support for sync writes
  • Loading branch information
shaan1337 committed Nov 6, 2024
1 parent 93d0971 commit 4920714
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ public class ChunkDataReadStream(Stream chunkFileStream) : Stream {
public sealed override bool CanWrite => false;
public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync");
public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) =>
throw new InvalidOperationException();
public sealed override void Flush() => throw new InvalidOperationException();
public sealed override Task FlushAsync(CancellationToken cancellationToken) =>
throw new InvalidOperationException();
public sealed override void SetLength(long value) => throw new InvalidOperationException();
public override long Length => throw new NotSupportedException();

Expand Down
28 changes: 22 additions & 6 deletions src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).

using System;
using System.IO;
using System.Buffers;
using System.Security.Cryptography;

namespace EventStore.Plugins.Transforms;
Expand All @@ -17,14 +16,19 @@ public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksum
public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public sealed override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) =>
throw new InvalidOperationException();
public sealed override void Write(byte[] buffer, int offset, int count) =>
throw new InvalidOperationException("use WriteAsync");
public sealed override void Flush() =>
throw new InvalidOperationException("use FlushAsync");

public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();

public override void Write(byte[] buffer, int offset, int count) {
ChunkFileStream.Write(buffer, offset, count);
ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0);
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) {
await ChunkFileStream.WriteAsync(buffer, cancellationToken);
Checksum(buffer);
}

public override void Flush() => ChunkFileStream.Flush();
public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct);
public override void SetLength(long value) => ChunkFileStream.SetLength(value);
public override long Length => ChunkFileStream.Length;
public override long Position {
Expand All @@ -40,6 +44,18 @@ public override long Position {
}
}

public void Checksum(ReadOnlyMemory<byte> data) {
// HashAlgorithm.TransformBlock() doesn't support span/memory, so we need to rent a byte array from the pool
byte[] tmp = ArrayPool<byte>.Shared.Rent(data.Length);
try {
data.CopyTo(tmp.AsMemory());
ChecksumAlgorithm.TransformBlock(tmp, 0, data.Length, null, 0);
Array.Clear(tmp, 0, data.Length);
} finally {
ArrayPool<byte>.Shared.Return(tmp);
}
}

private void ReadAndChecksum(long count) {
var buffer = new byte[4096];
long toRead = count;
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace EventStore.Plugins.Transforms;

public interface IChunkWriteTransform {
ChunkDataWriteStream TransformData(ChunkDataWriteStream stream);
void CompleteData(int footerSize, int alignmentSize);
void WriteFooter(ReadOnlySpan<byte> footer, out int fileSize);
ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken cancellationToken = default);
ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, CancellationToken cancellationToken = default);
}

0 comments on commit 4920714

Please sign in to comment.