Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Akka.IO / Akka.Streams: ByteString rewrite #7136

Draft
wants to merge 19 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions src/core/Akka.Streams/Dsl/Framing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,39 +169,45 @@ public FramingException(string message) : base(message)
protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { }
}

private static readonly Func<IEnumerator<byte>, int, int> BigEndianDecoder = (enumerator, length) =>
private static int BigEndianDecoder(ref ReadOnlySpan<byte> enumerator, int length)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var count = length;
var decoded = 0;
while (count > 0)
if (length > enumerator.Length)
{
throw new ArgumentException("Length exceeds the size of the enumerator.");
}

var result = 0;

// Assuming 'length' is 4 for a 32-bit integer.
// Adjust the loop and shift amounts if dealing with different sizes.
for (var i = 0; i < length; i++)
{
decoded <<= 8;
if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string");
decoded |= enumerator.Current & 0xFF;
count--;
result |= enumerator[i] << ((length - 1 - i) * 8);
}

return decoded;
};
return result;
}

private static readonly Func<IEnumerator<byte>, int, int> LittleEndianDecoder = (enumerator, length) =>
private static int LittleEndianDecoder(ref ReadOnlySpan<byte> span, int length)
{
var highestOcted = (length - 1) << 3;
var mask = (int) (1L << (length << 3)) - 1;
var count = length;
if (length > span.Length)
{
throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte span");
}

var decoded = 0;
var highestOctetShift = (length - 1) << 3;
var mask = (int)(1L << (length << 3)) - 1;

while (count > 0)
for (var i = 0; i < length; i++)
{
// decoded >>>= 8 on the jvm
decoded = (int) ((uint) decoded >> 8);
if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string");
decoded += (enumerator.Current & 0xFF) << highestOcted;
count--;
// Shift and add the ith byte to 'decoded'. No need for >>>= as in JVM; just shift appropriately.
var shiftAmount = highestOctetShift - (i << 3);
decoded |= (span[i] & 0xFF) << shiftAmount;
}

return decoded & mask;
};
}

private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage<ByteString>
{
Expand Down Expand Up @@ -342,8 +348,8 @@ private void DoParse()
else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition))
{
// Found a match
var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact();
_buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact();
var parsedFrame = _buffer.Slice(0, possibleMatchPosition);
_buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count);
_nextPossibleMatch = 0;
Push(_stage.Outlet, parsedFrame);

Expand Down Expand Up @@ -422,7 +428,7 @@ public override void OnUpstreamFinish()
/// </summary>
private void PushFrame()
{
var emit = _buffer.Slice(0, _frameSize).Compact();
var emit = _buffer.Slice(0, _frameSize);
_buffer = _buffer.Slice(_frameSize);
_frameSize = int.MaxValue;
Push(_stage.Outlet, emit);
Expand All @@ -440,9 +446,14 @@ private void TryPushFrame()
PushFrame();
else if (bufferSize >= _stage._minimumChunkSize)
{
var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator();
var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength);

var iterator = _buffer.Memory.Span.Slice(_stage._lengthFieldOffset);
var parsedLength = _stage._byteOrder switch {
ByteOrder.BigEndian => BigEndianDecoder(ref iterator, _stage._lengthFieldLength),
ByteOrder.LittleEndian => LittleEndianDecoder(ref iterator, _stage._lengthFieldLength),
_ => throw new NotSupportedException($"ByteOrder {_stage._byteOrder} is not supported")
};

// TODO: AVOID ARRAY COPYING AGAIN HERE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to try to do that here but haven't gotten around to it yet

_frameSize = _stage._computeFrameSize.HasValue
? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength)
: parsedLength + _stage._minimumChunkSize;
Expand Down Expand Up @@ -480,7 +491,7 @@ private void TryPull()
private readonly int _maximumFramelength;
private readonly int _lengthFieldOffset;
private readonly int _minimumChunkSize;
private readonly Func<IEnumerator<byte>, int, int> _intDecoder;
private readonly ByteOrder _byteOrder;
private readonly Option<Func<IReadOnlyList<byte>, int, int>> _computeFrameSize;

// For the sake of binary compatibility
Expand All @@ -500,7 +511,7 @@ public LengthFieldFramingStage(
_lengthFieldOffset = lengthFieldOffset;
_minimumChunkSize = lengthFieldOffset + lengthFieldLength;
_computeFrameSize = computeFrameSize;
_intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder;
_byteOrder = byteOrder;
}

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
Expand Down
63 changes: 23 additions & 40 deletions src/core/Akka.Tests/Util/ByteStringSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Text;
using Akka.IO;
Expand All @@ -14,15 +15,10 @@

namespace Akka.Tests.Util
{

/// <summary>
/// TODO: Should we use the FsCheck.XUnit integration when they upgrade to xUnit 2
/// </summary>
public class ByteStringSpec
{
class Generators
{

// TODO: Align with JVM Akka Generator
public static Arbitrary<ByteString> ByteStrings()
{
Expand All @@ -45,7 +41,7 @@ public void A_ByteString_must_have_correct_size_when_concatenating()
[Fact]
public void A_ByteString_must_have_correct_size_when_slicing_from_index()
{
var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} );
var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 });

(a + b).Slice(b.Count).Count.Should().Be(a.Count);
Expand All @@ -54,32 +50,24 @@ public void A_ByteString_must_have_correct_size_when_slicing_from_index()
[Fact]
public void A_ByteString_must_be_sequential_when_slicing_from_start()
{
Prop.ForAll((ByteString a, ByteString b) => (a + b).Slice(0, a.Count).SequenceEqual(a))
Prop.ForAll((ByteString a, ByteString b) =>
(a + b).Slice(0, a.Count).Memory.Span.SequenceEqual(a.Memory.Span))
.QuickCheckThrowOnFailure();
}

[Fact]
public void A_ByteString_must_be_sequential_when_slicing_from_index()
{
var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} );
var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 });

(a + b).Slice(a.Count).Should().BeEquivalentTo(b);
}

[Fact]
public void A_ByteString_must_be_equal_to_the_original_when_compacting()
{
Prop.ForAll((ByteString xs) =>
{
var ys = xs.Compact();
return xs.SequenceEqual(ys) && ys.IsCompact;
}).QuickCheckThrowOnFailure();
}

[Fact]
public void A_ByteString_must_be_equal_to_the_original_when_recombining()
{
var xs = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} );
var xs = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
var tmp1 = xs.Slice(0, xs.Count / 2);
var tmp2 = xs.Slice(xs.Count / 2);
var tmp11 = tmp1.Slice(0, tmp1.Count / 2);
Expand All @@ -90,32 +78,24 @@ public void A_ByteString_must_be_equal_to_the_original_when_recombining()
[Fact]
public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_String()
{
Prop.ForAll((string s) => ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) == (s ?? "")) // TODO: What should we do with null string?
Prop.ForAll((string s) =>
ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) ==
(s ?? "")) // TODO: What should we do with null string?
.QuickCheckThrowOnFailure();
}

[Fact]
public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_unicode_String()
{
Prop.ForAll((string s) => ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) == (s ?? "")) // TODO: What should we do with null string?
Prop.ForAll(
(string s) =>
ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) ==
(s ?? "")) // TODO: What should we do with null string?
.QuickCheckThrowOnFailure();
}

[Fact]
public void A_ByteString_must_behave_as_expected_when_compacting()
{
Prop.ForAll((ByteString a) =>
{
var wasCompact = a.IsCompact;
var b = a.Compact();
return ((!wasCompact) || (b == a)) &&
b.SequenceEqual(a) &&
b.IsCompact &&
b.Compact() == b;
}).QuickCheckThrowOnFailure();
}

[Fact(DisplayName = @"A concatenated byte string should return the index of a byte in one the two byte strings.")]
[Fact(DisplayName =
@"A concatenated byte string should return the index of a byte in one the two byte strings.")]
public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_string()
{
var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 });
Expand All @@ -124,7 +104,8 @@ public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_s
Assert.Equal(1, offset);
}

[Fact(DisplayName = @"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")]
[Fact(DisplayName =
@"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")]
public void A_concatenated_bytestring_must_return_negative_one_when_an_element_was_not_found()
{
var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 });
Expand All @@ -133,7 +114,8 @@ public void A_concatenated_bytestring_must_return_negative_one_when_an_element_w
Assert.Equal(-1, offset);
}

[Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")]
[Fact(DisplayName =
"A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")]
public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_Unicode()
{
// In Unicode encoding, characters present in the ASCII character set are 2 bytes long.
Expand All @@ -152,7 +134,8 @@ public void A_concatenated_ByteString_with_partial_characters_must_return_correc
Assert.Equal(expected, actual);
}

[Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")]
[Fact(DisplayName =
"A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")]
public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_UTF8()
{
// In UTF-8 encoding, characters present in the ASCII character set are only 1 byte long.
Expand Down Expand Up @@ -213,4 +196,4 @@ public void A_sliced_ByteString_using_Range_must_return_correct_string_for_ToStr
}
#endif
}
}
}
36 changes: 3 additions & 33 deletions src/core/Akka/IO/SocketEventArgsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,44 +82,13 @@ public void Release(SocketAsyncEventArgs e)

internal static class SocketAsyncEventArgsExtensions
{
public static void SetBuffer(this SocketAsyncEventArgs args, ByteString data)
{
if (data.IsCompact)
{
var buffer = data.Buffers[0];
if (args.BufferList != null)
{
// BufferList property setter is not simple member association operation,
// but the getter is. Therefore we first check if we need to clear buffer list
// and only do so if necessary.
args.BufferList = null;
}
args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
}
else
{
if (RuntimeDetector.IsMono)
{
// Mono doesn't support BufferList - falback to compacting ByteString
var compacted = data.Compact();
var buffer = compacted.Buffers[0];
args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
}
else
{
args.SetBuffer(null, 0, 0);
args.BufferList = data.Buffers;
}
}
}

public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable<ByteString> dataCollection)
{
if (RuntimeDetector.IsMono)
{
// Mono doesn't support BufferList - falback to compacting ByteString
var dataList = dataCollection.ToList();
var totalSize = dataList.SelectMany(d => d.Buffers).Sum(d => d.Count);
var totalSize = dataList.Count;
var bytes = new byte[totalSize];
var position = 0;
foreach (var byteString in dataList)
Expand All @@ -132,7 +101,8 @@ public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable<ByteStr
else
{
args.SetBuffer(null, 0, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought:

  • If the 'buffer' is going to be typically null and we instead normally use .BufferList, it is better to hide this behind a if (args.Buffer != null) since the underlying call has a try/catch.

args.BufferList = dataCollection.SelectMany(d => d.Buffers).ToList();
// TODO: fix this before we ship
args.BufferList = new List<ArraySegment<byte>>(dataCollection.Select(bs => new ArraySegment<byte>(bs.ToArray())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing that feels very relevant,

Short term, This -screams- pooling scenario and/or IMemoryOwner even if we have to deal with semantics internally.

I remember hacking something together with one pool or another during my Covid lockdown 'Akka streams remote transport experiment' and it helped a LOT. I think it was somewhere during or just before the hand written protobuf stuff.

Where 'considerations' get important, is the pinning of the arrays underlying the bufferlist.

IDK the current pinning 'status' but last I dug into they all get pinned for a while, and that sucks for the GC.

Thought/suggestion would be in general to use a pooled buffer of some form, but also strongly to consider/measure a single final buffer per SetBuffer() call being set on SAEA.

Having a pooled buffer increases likelyhood that it will, long term, survive to an older segment and hopefully avoid pinning costs in Gen0/Gen1. (Also, longer term, we can hopefully #IFDEF said pooled buffers into POH and get more gains.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than go down that road, I'd prefer to rewrite Akka.IO to not depend on SocketAsyncEventArgs and all of the complicated junk inside the TCP and UDP Akka.IO actors. I think some major simplification is in-order there for .NET 6 at least.

}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ public override void DoWrite(ConnectionInfo info)
{
try
{
// TODO: avoid use of SocketAsyncEventArgs on newer platforms
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid question, are newer APIs to the point where SAEA isn't worth the squeeze? Again it's been a while but last I knew SAEA was still the 'best' way.

_sendArgs.SetBuffer(_dataToSend);
if (!_connection.Socket.SendAsync(_sendArgs))
_self.Tell(SocketSent.Instance);
Expand Down
Loading
Loading