From 96a6d1192ec20e78c9eebface06800a0a418e377 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 28 Mar 2024 15:36:58 -0500 Subject: [PATCH 01/16] replace `ByteString` internals with `ReadOnlyMemory` --- src/core/Akka/IO/SocketEventArgsPool.cs | 36 +-- src/core/Akka/IO/TcpConnection.cs | 1 + src/core/Akka/IO/UdpConnection.cs | 92 +++--- src/core/Akka/Util/ByteString.cs | 371 +++++++----------------- 4 files changed, 162 insertions(+), 338 deletions(-) diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index bd8007f0875..bb75fb9024b 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -82,44 +82,13 @@ public void Release(SocketAsyncEventArgs e) internal static class SocketAsyncEventArgsExtensions { - public static void SetBuffer(this SocketAsyncEventArgs args, ByteString data) - { - if (data.IsCompact) - { - var buffer = data.Buffers[0]; - if (args.BufferList != null) - { - // BufferList property setter is not simple member association operation, - // but the getter is. Therefore we first check if we need to clear buffer list - // and only do so if necessary. - args.BufferList = null; - } - args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); - } - else - { - if (RuntimeDetector.IsMono) - { - // Mono doesn't support BufferList - falback to compacting ByteString - var compacted = data.Compact(); - var buffer = compacted.Buffers[0]; - args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); - } - else - { - args.SetBuffer(null, 0, 0); - args.BufferList = data.Buffers; - } - } - } - public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable dataCollection) { if (RuntimeDetector.IsMono) { // Mono doesn't support BufferList - falback to compacting ByteString var dataList = dataCollection.ToList(); - var totalSize = dataList.SelectMany(d => d.Buffers).Sum(d => d.Count); + var totalSize = dataList.Count; var bytes = new byte[totalSize]; var position = 0; foreach (var byteString in dataList) @@ -132,7 +101,8 @@ public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable d.Buffers).ToList(); + // TODO: fix this before we ship + args.BufferList = new List>(dataCollection.Select(bs => new ArraySegment(bs.ToArray()))); } } } diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 7e12a58b8b4..049a737f755 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -945,6 +945,7 @@ public override void DoWrite(ConnectionInfo info) { try { + // TODO: avoid use of SocketAsyncEventArgs on newer platforms _sendArgs.SetBuffer(_dataToSend); if (!_connection.Socket.SendAsync(_sendArgs)) _self.Tell(SocketSent.Instance); diff --git a/src/core/Akka/IO/UdpConnection.cs b/src/core/Akka/IO/UdpConnection.cs index 1e1b0aacc81..93de1525951 100644 --- a/src/core/Akka/IO/UdpConnection.cs +++ b/src/core/Akka/IO/UdpConnection.cs @@ -66,6 +66,7 @@ private Receive Resolving(DnsEndPoint remoteAddress) => message => DoConnect(new IPEndPoint(r.Addr, remoteAddress.Port)); return true; } + return false; }; @@ -106,62 +107,66 @@ private bool Connected(object message) { switch (message) { - case SuspendReading _: _readingSuspended = true; return true; + case SuspendReading _: + _readingSuspended = true; + return true; case ResumeReading _: + { + _readingSuspended = false; + if (_pendingRead != null) { - _readingSuspended = false; - if (_pendingRead != null) - { - _connect.Handler.Tell(_pendingRead); - _pendingRead = null; - ReceiveAsync(); - } - return true; + _connect.Handler.Tell(_pendingRead); + _pendingRead = null; + ReceiveAsync(); } - + + return true; + } + case SocketReceived socketReceived: _pendingReceive = null; - DoRead(socketReceived, _connect.Handler); + DoRead(socketReceived, _connect.Handler); return true; - + case Disconnect _: - { - Log.Debug("Closing UDP connection to [{0}]", _connect.RemoteAddress); + { + Log.Debug("Closing UDP connection to [{0}]", _connect.RemoteAddress); - _socket.Dispose(); + _socket.Dispose(); - Sender.Tell(Disconnected.Instance); - Log.Debug("Connection closed to [{0}], stopping listener", _connect.RemoteAddress); - Context.Stop(Self); - return true; - } + Sender.Tell(Disconnected.Instance); + Log.Debug("Connection closed to [{0}], stopping listener", _connect.RemoteAddress); + Context.Stop(Self); + return true; + } case Send send: + { + if (WritePending) + { + if (Udp.Settings.TraceLogging) Log.Debug("Dropping write because queue is full"); + Sender.Tell(new CommandFailed(send)); + } + else { - if (WritePending) + if (!send.Payload.IsEmpty) { - if (Udp.Settings.TraceLogging) Log.Debug("Dropping write because queue is full"); - Sender.Tell(new CommandFailed(send)); + _pendingSend = (send, Sender); + DoWrite(); } else { - if (!send.Payload.IsEmpty) - { - _pendingSend = (send, Sender); - DoWrite(); - } - else - { - if (send.WantsAck) - Sender.Tell(send.Ack); - } + if (send.WantsAck) + Sender.Tell(send.Ack); } - return true; } + + return true; + } case SocketSent sent: { if (_pendingSend == null) throw new Exception("There are no pending sent"); - + var (send, sender) = _pendingSend.Value; if (send.WantsAck) sender.Tell(send.Ack); @@ -193,7 +198,13 @@ private void DoWrite() var (send, sender) = _pendingSend.Value; var data = send.Payload; - var bytesWritten = _socket.Send(data.Buffers); +#if NET6_0_OR_GREATER + var bytesWritten = _socket.Send(data.Memory.Span); +#else + // pay the .NET Framework performance penalty + var array = data.ToArray(); + var bytesWritten = _socket.Send(array); +#endif if (Udp.Settings.TraceLogging) Log.Debug("Wrote [{0}] bytes to socket", bytesWritten); @@ -232,19 +243,20 @@ private void ReportConnectFailure(Action thunk) } catch (Exception e) { - Log.Error(e, "Failure while connecting UDP channel to remote address [{0}] local address [{1}]", _connect.RemoteAddress, _connect.LocalAddress); + Log.Error(e, "Failure while connecting UDP channel to remote address [{0}] local address [{1}]", + _connect.RemoteAddress, _connect.LocalAddress); _commander.Tell(new CommandFailed(_connect)); Context.Stop(Self); } } private SocketAsyncEventArgs _pendingReceive; - + private void ReceiveAsync() { if (_pendingReceive != null) return; - + var e = Udp.SocketEventArgsPool.Acquire(Self); _pendingReceive = e; if (!_socket.ReceiveAsync(e)) @@ -254,4 +266,4 @@ private void ReceiveAsync() } } } -} +} \ No newline at end of file diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 8b59807ce4c..24112a341cd 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -25,8 +25,8 @@ namespace Akka.IO /// when concatenating and slicing sequences of bytes, /// and also providing a thread safe way of working with bytes. /// - [DebuggerDisplay("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : IEquatable, IEnumerable + [DebuggerDisplay("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : IEquatable { #region creation methods @@ -244,49 +244,69 @@ public static ByteString FromString(string str, Encoding encoding) #endregion - private readonly int _count; - private readonly ByteBuffer[] _buffers; + private readonly ReadOnlyMemory _memory; + + public ReadOnlyMemory Memory => _memory; + + private static ReadOnlyMemory ConvertToMemory(ByteBuffer[] buffers) + { + return buffers.Length switch + { + 0 => ReadOnlyMemory.Empty, + 1 => new ReadOnlyMemory(buffers[0].Array, buffers[0].Offset, buffers[0].Count), + _ => new ReadOnlyMemory(ConcatBuffers(buffers)) + }; + + static byte[] ConcatBuffers(ByteBuffer[] buffers) + { + var count = buffers.Sum(x => x.Count); + var result = new byte[count]; + var offset = 0; + foreach (var buffer in buffers) + { + Array.Copy(buffer.Array ?? throw new InvalidOperationException(), buffer.Offset, result, offset, buffer.Count); + offset += buffer.Count; + } + + return result; + } + } - private ByteString(ByteBuffer[] buffers, int count) + private ByteString(ByteBuffer[] buffers, int count) : this(ConvertToMemory(buffers)) { - _buffers = buffers; - _count = count; } - private ByteString(ByteBuffer buffer) + private ByteString(ByteBuffer buffer) : + this(new ReadOnlyMemory(buffer.Array, buffer.Offset, buffer.Count)) { - _buffers = new[] { buffer }; - _count = buffer.Count; } - private ByteString(byte[] array, int offset, int count) + private ByteString(byte[] array, int offset, int count) : this(new ReadOnlyMemory(array, offset, count)) + { + } + + private ByteString(in ReadOnlyMemory memory) { - _buffers = new[] { new ByteBuffer(array, offset, count) }; - _count = count; + _memory = memory; } /// /// Gets a total number of bytes stored inside this . /// - public int Count => _count; + public int Count => _memory.Length; /// /// Determines if current has compact representation. /// Compact byte strings represent bytes stored inside single, continuous /// block of memory. /// - /// TBD - public bool IsCompact => _buffers.Length == 1; + [Obsolete("This property will be removed in future versions of Akka.NET.")] + public bool IsCompact => true; /// /// Determines if current is empty. /// - public bool IsEmpty => _count == 0; - - /// - /// Gets sequence of the buffers used underneat. - /// - internal IList Buffers => _buffers; + public bool IsEmpty => Count == 0; /// /// Gets a byte stored under a provided . @@ -296,11 +316,7 @@ public byte this[int index] { get { - if (index >= _count) throw new IndexOutOfRangeException("Requested index is outside of the bounds of the ByteString"); - int j; - var i = GetBufferFittingIndex(index, out j); - var buffer = _buffers[i]; - return buffer.Array[buffer.Offset + j]; + return _memory.Span[index]; } } @@ -308,13 +324,10 @@ public byte this[int index] /// Compacts current , potentially copying its content underneat /// into new byte array. /// - /// TBD + [Obsolete("This method will be removed in future versions of Akka.NET.")] public ByteString Compact() { - if (IsCompact) return this; - - var copy = this.ToArray(); - return new ByteString(copy, 0, copy.Length); + return this; } /// @@ -323,7 +336,7 @@ public ByteString Compact() /// operation. /// /// index inside current , from which slicing should start - public ByteString Slice(int index) => Slice(index, _count - index); + public ByteString Slice(int index) => Slice(index, Count - index); /// /// Slices current , creating a new @@ -335,75 +348,7 @@ public ByteString Compact() /// If index or count result in an invalid . public ByteString Slice(int index, int count) { - if(index < 0) - throw new ArgumentOutOfRangeException(nameof(index), "Index must be positive number"); - if (count < 0) - throw new ArgumentOutOfRangeException(nameof(count), "Count must be positive number"); - if (count == 0) return Empty; - if(index > _count) - throw new ArgumentOutOfRangeException(nameof(index), "Index is outside of the bounds of the ByteString"); - if(index + count > _count) - throw new ArgumentOutOfRangeException(nameof(count), "Index + count is outside of the bounds of the ByteString"); - - if (index == 0 && count == _count) return this; - - var i = GetBufferFittingIndex(index, out var j); - var init = _buffers[i]; - - var copied = Math.Min(init.Count - j, count); - var newBuffers = new ByteBuffer[_buffers.Length - i]; - newBuffers[0] = new ByteBuffer(init.Array, init.Offset + j, copied); - - i++; - var k = 1; - for (; i < _buffers.Length; i++, k++) - { - if (copied >= count) break; - - var buffer = _buffers[i]; - var toCopy = Math.Min(buffer.Count, count - copied); - newBuffers[k] = new ByteBuffer(buffer.Array, buffer.Offset, toCopy); - copied += toCopy; - } - - if (k < newBuffers.Length) - newBuffers = newBuffers.Take(k).ToArray(); - - return new ByteString(newBuffers, count); - } - - /// - /// Given an in current tries to - /// find which buffer will be used to contain that index and return its range. - /// An offset within the buffer itself will be stored in . - /// - /// - /// - /// - private int GetBufferFittingIndex(int index, out int indexWithinBuffer) - { - if (index == 0) - { - indexWithinBuffer = 0; - return 0; - } - - var j = index; - for (var i = 0; i < _buffers.Length; i++) - { - var buffer = _buffers[i]; - if (j >= buffer.Count) - { - j -= buffer.Count; - } - else - { - indexWithinBuffer = j; - return i; - } - } - - throw new IndexOutOfRangeException($"Requested index [{index}] is outside of the bounds of current ByteString."); + return new ByteString(_memory.Slice(index, count)); } /// @@ -414,14 +359,7 @@ private int GetBufferFittingIndex(int index, out int indexWithinBuffer) /// public int IndexOf(byte b) { - var idx = 0; - foreach (var x in this) - { - if (x == b) return idx; - idx++; - } - - return -1; + return _memory.Span.IndexOf(b); } /// @@ -431,22 +369,7 @@ public int IndexOf(byte b) /// public int IndexOf(byte b, int from) { - if (from >= _count) return -1; - - int j; - var i = GetBufferFittingIndex(from, out j); - var idx = from; - for (; i < _buffers.Length; i++) - { - var buffer = _buffers[i]; - for (; j < buffer.Count; j++, idx++) - { - if (buffer.Array[buffer.Offset + j] == b) return idx; - } - j = 0; - } - - return -1; + return _memory.Span[from..].IndexOf(b); } /// @@ -459,53 +382,38 @@ public int IndexOf(byte b, int from) /// public bool HasSubstring(ByteString other, int index) { - // quick check: if subsequence is longer than remaining size, return false - if (other.Count > _count - index) return false; + if (other.Count == 0) return true; // Empty spans are always "found". + if (index < 0 || index > Count) throw new ArgumentOutOfRangeException(nameof(index), "Start index is out of range."); + if (Count - index < other.Count) return false; // Can't find if `toFind` is larger considering the start index. - int thisIdx = 0, otherIdx = 0; - var i = GetBufferFittingIndex(index, out thisIdx); - var j = 0; - while (j < other._buffers.Length) + for (var i = index; i <= Count - other.Count; i++) { - var buffer = _buffers[i]; - var otherBuffer = other._buffers[j]; - - while (thisIdx < buffer.Count && otherIdx < otherBuffer.Count) - { - if (buffer.Array[buffer.Offset + thisIdx] != otherBuffer.Array[otherBuffer.Offset + otherIdx]) - return false; - - thisIdx++; - otherIdx++; - } - - if (thisIdx >= buffer.Count) + // Check if `toFind` starts at position `i` in `container`. + var found = true; + for (var j = 0; j < other.Count; j++) { - i++; - thisIdx = 0; - } - if (otherIdx >= otherBuffer.Count) - { - j++; - otherIdx = 0; + if (this[i + j] != other[j]) + { + found = false; + break; + } } + if (found) return true; } - return true; + return false; } /// /// Copies content of a current into a single byte array. /// - /// TBD + /// + /// WARNING: this method allocates! + /// + /// A new array of data public byte[] ToArray() { - if (_count == 0) - return Array.Empty(); - - var copy = new byte[_count]; - CopyTo(copy, 0, _count); - return copy; + return _memory.ToArray(); } /// @@ -519,16 +427,14 @@ public ByteString Concat(ByteString other) if (other == null) throw new ArgumentNullException(nameof(other), "Cannot append null to ByteString."); if (other.IsEmpty) return this; - if (this.IsEmpty) return other; + if (IsEmpty) return other; - var count = _count + other._count; - var len1 = _buffers.Length; - var len2 = other._buffers.Length; - var array = new ByteBuffer[len1 + len2]; - Array.Copy(this._buffers, 0, array, 0, len1); - Array.Copy(other._buffers, 0, array, len1, len2); - - return new ByteString(array, count); + // combine the two ReadOnlyMemory instances + var array = new byte[_memory.Length + other._memory.Length]; + _memory.Span.CopyTo(array); + other._memory.Span.CopyTo(array.AsSpan(_memory.Length)); + + return new ByteString(array); } /// @@ -544,20 +450,9 @@ public int CopyTo(byte[] buffer, int index, int count) if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - Array.Copy(b.Array, b.Offset, buffer, position, toCopy); - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.CopyTo(buffer.AsSpan(index, count)); + return count; } /// @@ -565,6 +460,7 @@ public int CopyTo(byte[] buffer, int index, int count) /// /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Memory buffer) => CopyTo(ref buffer, 0, buffer.Length); @@ -574,29 +470,16 @@ public int CopyTo(ref Memory buffer) /// buffer and copying a number of bytes. /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Memory buffer, int index, int count) { if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - - var bufferSpan = buffer.Span.Slice(position, toCopy); - b.AsSpan().CopyTo(bufferSpan); - - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.Slice(0, count).CopyTo(buffer.Span.Slice(index)); + return count; } /// @@ -604,6 +487,7 @@ public int CopyTo(ref Memory buffer, int index, int count) /// . /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Span buffer) => CopyTo(ref buffer, 0, buffer.Length); @@ -613,29 +497,16 @@ public int CopyTo(ref Span buffer) /// buffer and copying a number of bytes. /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Span buffer, int index, int count) { if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - - var bufferSpan = buffer.Slice(position, toCopy); - b.AsSpan().CopyTo(bufferSpan); - - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.Slice(0, count).CopyTo(buffer.Slice(index)); + return count; } /// @@ -646,11 +517,10 @@ public int CopyTo(ref Span buffer, int index, int count) public void WriteTo(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - - foreach (var buffer in _buffers) - { - stream.Write(buffer.Array, buffer.Offset, buffer.Count); - } + + // TODO: remove this method in future versions of Akka.NET and use System.Memory APIs + var array = _memory.ToArray(); + stream.Write(array, 0, array.Length); } /// @@ -658,79 +528,50 @@ public void WriteTo(Stream stream) /// to a provided writeable . /// /// - public async Task WriteToAsync(Stream stream) + public Task WriteToAsync(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - foreach (var buffer in _buffers) - { - await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count); - } + // TODO: remove this method in future versions of Akka.NET and use System.Memory APIs + var array = _memory.ToArray(); + return stream.WriteAsync(array, 0, array.Length); } public override bool Equals(object obj) => Equals(obj as ByteString); public override int GetHashCode() { - var hashCode = 0; - foreach (var b in this) - { - hashCode = (hashCode * 397) ^ b.GetHashCode(); - } - return hashCode; + return _memory.GetHashCode(); } public bool Equals(ByteString other) { if (ReferenceEquals(other, this)) return true; if (ReferenceEquals(other, null)) return false; - if (_count != other._count) return false; - - using (var thisEnum = this.GetEnumerator()) - using (var otherEnum = other.GetEnumerator()) - { - while (thisEnum.MoveNext() && otherEnum.MoveNext()) - { - if (thisEnum.Current != otherEnum.Current) return false; - } - } - - return true; - } - - public IEnumerator GetEnumerator() - { - foreach (var buffer in _buffers) - { - for (int i = buffer.Offset; i < buffer.Offset + buffer.Count; i++) - { - yield return buffer.Array[i]; - } - } + + return _memory.Span.SequenceEqual(other._memory.Span); } - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - public override string ToString() => ToString(Encoding.UTF8); public string ToString(Encoding encoding) { - if (IsCompact) - return encoding.GetString(_buffers[0].Array, _buffers[0].Offset, _buffers[0].Count); - - byte[] buffer = ToArray(); - - return encoding.GetString(buffer); + // get span as byte array + return encoding.GetString(_memory.Span.ToArray()); } public static bool operator ==(ByteString x, ByteString y) => Equals(x, y); public static bool operator !=(ByteString x, ByteString y) => !Equals(x, y); - public static explicit operator ByteString(byte[] bytes) => ByteString.CopyFrom(bytes); + public static explicit operator ByteString(byte[] bytes) => CopyFrom(bytes); public static explicit operator byte[] (ByteString byteString) => byteString.ToArray(); + public static explicit operator ByteString(Memory memory) => CopyFrom(memory); + + public static explicit operator ByteString(ReadOnlyMemory memory) => new(memory); + public static ByteString operator +(ByteString x, ByteString y) => x.Concat(y); } From 486f5a3078abb2ded5ff5822dd0fbbc667aba06b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 28 Mar 2024 16:22:34 -0500 Subject: [PATCH 02/16] fixed `ByteStringSpec`s --- src/core/Akka.Tests/Util/ByteStringSpec.cs | 63 ++++++++-------------- 1 file changed, 23 insertions(+), 40 deletions(-) diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index 9f4e85623a3..949f8ca3afb 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Linq; using System.Text; using Akka.IO; @@ -14,15 +15,10 @@ namespace Akka.Tests.Util { - - /// - /// TODO: Should we use the FsCheck.XUnit integration when they upgrade to xUnit 2 - /// public class ByteStringSpec { class Generators { - // TODO: Align with JVM Akka Generator public static Arbitrary ByteStrings() { @@ -45,7 +41,7 @@ public void A_ByteString_must_have_correct_size_when_concatenating() [Fact] public void A_ByteString_must_have_correct_size_when_slicing_from_index() { - var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 }); (a + b).Slice(b.Count).Count.Should().Be(a.Count); @@ -54,32 +50,24 @@ public void A_ByteString_must_have_correct_size_when_slicing_from_index() [Fact] public void A_ByteString_must_be_sequential_when_slicing_from_start() { - Prop.ForAll((ByteString a, ByteString b) => (a + b).Slice(0, a.Count).SequenceEqual(a)) + Prop.ForAll((ByteString a, ByteString b) => + (a + b).Slice(0, a.Count).Memory.Span.SequenceEqual(a.Memory.Span)) .QuickCheckThrowOnFailure(); } + [Fact] public void A_ByteString_must_be_sequential_when_slicing_from_index() { - var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 }); (a + b).Slice(a.Count).Should().BeEquivalentTo(b); } - [Fact] - public void A_ByteString_must_be_equal_to_the_original_when_compacting() - { - Prop.ForAll((ByteString xs) => - { - var ys = xs.Compact(); - return xs.SequenceEqual(ys) && ys.IsCompact; - }).QuickCheckThrowOnFailure(); - } - [Fact] public void A_ByteString_must_be_equal_to_the_original_when_recombining() { - var xs = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var xs = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var tmp1 = xs.Slice(0, xs.Count / 2); var tmp2 = xs.Slice(xs.Count / 2); var tmp11 = tmp1.Slice(0, tmp1.Count / 2); @@ -90,32 +78,24 @@ public void A_ByteString_must_be_equal_to_the_original_when_recombining() [Fact] public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_String() { - Prop.ForAll((string s) => ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) == (s ?? "")) // TODO: What should we do with null string? + Prop.ForAll((string s) => + ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) == + (s ?? "")) // TODO: What should we do with null string? .QuickCheckThrowOnFailure(); } [Fact] public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_unicode_String() { - Prop.ForAll((string s) => ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) == (s ?? "")) // TODO: What should we do with null string? + Prop.ForAll( + (string s) => + ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) == + (s ?? "")) // TODO: What should we do with null string? .QuickCheckThrowOnFailure(); } - [Fact] - public void A_ByteString_must_behave_as_expected_when_compacting() - { - Prop.ForAll((ByteString a) => - { - var wasCompact = a.IsCompact; - var b = a.Compact(); - return ((!wasCompact) || (b == a)) && - b.SequenceEqual(a) && - b.IsCompact && - b.Compact() == b; - }).QuickCheckThrowOnFailure(); - } - - [Fact(DisplayName = @"A concatenated byte string should return the index of a byte in one the two byte strings.")] + [Fact(DisplayName = + @"A concatenated byte string should return the index of a byte in one the two byte strings.")] public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_string() { var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 }); @@ -124,7 +104,8 @@ public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_s Assert.Equal(1, offset); } - [Fact(DisplayName = @"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")] + [Fact(DisplayName = + @"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")] public void A_concatenated_bytestring_must_return_negative_one_when_an_element_was_not_found() { var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 }); @@ -133,7 +114,8 @@ public void A_concatenated_bytestring_must_return_negative_one_when_an_element_w Assert.Equal(-1, offset); } - [Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")] + [Fact(DisplayName = + "A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")] public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_Unicode() { // In Unicode encoding, characters present in the ASCII character set are 2 bytes long. @@ -152,7 +134,8 @@ public void A_concatenated_ByteString_with_partial_characters_must_return_correc Assert.Equal(expected, actual); } - [Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")] + [Fact(DisplayName = + "A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")] public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_UTF8() { // In UTF-8 encoding, characters present in the ASCII character set are only 1 byte long. @@ -213,4 +196,4 @@ public void A_sliced_ByteString_using_Range_must_return_correct_string_for_ToStr } #endif } -} +} \ No newline at end of file From d9c3d9821185f858522d885764ddecb32d5eda8d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 28 Mar 2024 16:36:46 -0500 Subject: [PATCH 03/16] fixed framing issues --- src/core/Akka.Streams/Dsl/Framing.cs | 69 ++++++++++++++++------------ 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 465b7ef14ac..0dee51bb9f6 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -169,39 +169,45 @@ public FramingException(string message) : base(message) protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { } } - private static readonly Func, int, int> BigEndianDecoder = (enumerator, length) => + private static int BigEndianDecoder(ref ReadOnlySpan enumerator, int length) { - var count = length; - var decoded = 0; - while (count > 0) + if (length > enumerator.Length) + { + throw new ArgumentException("Length exceeds the size of the enumerator."); + } + + var result = 0; + + // Assuming 'length' is 4 for a 32-bit integer. + // Adjust the loop and shift amounts if dealing with different sizes. + for (var i = 0; i < length; i++) { - decoded <<= 8; - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded |= enumerator.Current & 0xFF; - count--; + result |= enumerator[i] << ((length - 1 - i) * 8); } - return decoded; - }; + return result; + } - private static readonly Func, int, int> LittleEndianDecoder = (enumerator, length) => + private static int LittleEndianDecoder(ref ReadOnlySpan span, int length) { - var highestOcted = (length - 1) << 3; - var mask = (int) (1L << (length << 3)) - 1; - var count = length; + if (length > span.Length) + { + throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte span"); + } + var decoded = 0; + var highestOctetShift = (length - 1) << 3; + var mask = (int)(1L << (length << 3)) - 1; - while (count > 0) + for (var i = 0; i < length; i++) { - // decoded >>>= 8 on the jvm - decoded = (int) ((uint) decoded >> 8); - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded += (enumerator.Current & 0xFF) << highestOcted; - count--; + // Shift and add the ith byte to 'decoded'. No need for >>>= as in JVM; just shift appropriately. + var shiftAmount = highestOctetShift - (i << 3); + decoded |= (span[i] & 0xFF) << shiftAmount; } return decoded & mask; - }; + } private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage { @@ -342,8 +348,8 @@ private void DoParse() else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) { // Found a match - var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); - _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); + var parsedFrame = _buffer.Slice(0, possibleMatchPosition); + _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count); _nextPossibleMatch = 0; Push(_stage.Outlet, parsedFrame); @@ -422,7 +428,7 @@ public override void OnUpstreamFinish() /// private void PushFrame() { - var emit = _buffer.Slice(0, _frameSize).Compact(); + var emit = _buffer.Slice(0, _frameSize); _buffer = _buffer.Slice(_frameSize); _frameSize = int.MaxValue; Push(_stage.Outlet, emit); @@ -440,9 +446,14 @@ private void TryPushFrame() PushFrame(); else if (bufferSize >= _stage._minimumChunkSize) { - var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator(); - var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength); - + var iterator = _buffer.Memory.Span.Slice(_stage._lengthFieldOffset); + var parsedLength = _stage._byteOrder switch { + ByteOrder.BigEndian => BigEndianDecoder(ref iterator, _stage._lengthFieldLength), + ByteOrder.LittleEndian => LittleEndianDecoder(ref iterator, _stage._lengthFieldLength), + _ => throw new NotSupportedException($"ByteOrder {_stage._byteOrder} is not supported") + }; + + // TODO: AVOID ARRAY COPYING AGAIN HERE _frameSize = _stage._computeFrameSize.HasValue ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength) : parsedLength + _stage._minimumChunkSize; @@ -480,7 +491,7 @@ private void TryPull() private readonly int _maximumFramelength; private readonly int _lengthFieldOffset; private readonly int _minimumChunkSize; - private readonly Func, int, int> _intDecoder; + private readonly ByteOrder _byteOrder; private readonly Option, int, int>> _computeFrameSize; // For the sake of binary compatibility @@ -500,7 +511,7 @@ public LengthFieldFramingStage( _lengthFieldOffset = lengthFieldOffset; _minimumChunkSize = lengthFieldOffset + lengthFieldLength; _computeFrameSize = computeFrameSize; - _intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder; + _byteOrder = byteOrder; } protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); From cd47df36eba9c9e4189d167431a6bd7e933f9227 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 1 Apr 2024 14:39:59 -0500 Subject: [PATCH 04/16] fixed compilation errors --- src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 1 + src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs | 2 +- src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs | 8 ++++---- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index e593ac3e18a..f0a143931bf 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -428,6 +428,7 @@ await this.AssertAllStagesStoppedAsync(async () => var firstGroup = (Source, NotUsed>)Source.FromPublisher(publisherProbe) .GroupBy(256, element => element[0]) + .Select(b => b.ToArray()) // have to convert to an array .Select(b => b.Reverse()) .MergeSubstreams(); var secondGroup = (Source, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First()) diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 4bab68295b5..1ca842618c5 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -338,7 +338,7 @@ await TargetFileAsync(async f => var completion = Source.From(_testByteStrings) .Select(bytes => { - if (bytes.Contains(Convert.ToByte('b'))) throw new TestException("bees!"); + if (bytes.ToArray().Contains(Convert.ToByte('b'))) throw new TestException("bees!"); return bytes; }) .RunWith(FileIO.ToFile(f), _materializer); diff --git a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs index ace20df0823..23958a34d7c 100644 --- a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs @@ -66,7 +66,7 @@ await this.AssertAllStagesStoppedAsync(() => { result = ReadN(inputStream, 2); result.Item1.Should().Be(2); - result.Item2.Should().BeEquivalentTo(Enumerable.Concat(_byteString.Slice(2), byteString2.Slice(0, 1))); + result.Item2.Should().BeEquivalentTo((_byteString.Slice(2).Concat(byteString2.Slice(0, 1)))); result = ReadN(inputStream, 2); result.Item1.Should().Be(2); @@ -86,7 +86,7 @@ await this.AssertAllStagesStoppedAsync(() => { var arr = new byte[_byteString.Count + 1]; inputStream.Read(arr, 0, arr.Length).Should().Be(arr.Length - 1); inputStream.Dispose(); - ByteString.FromBytes(arr).Should().BeEquivalentTo(Enumerable.Concat(_byteString, ByteString.FromBytes(new byte[] { 0 }))); + ByteString.FromBytes(arr).Should().BeEquivalentTo((_byteString.Concat(ByteString.FromBytes(new byte[] { 0 })))); return Task.CompletedTask; }, _materializer); } @@ -214,7 +214,7 @@ await this.AssertAllStagesStoppedAsync(() => { { var r = ReadN(inputStream, 8); r.Item1.Should().Be(8); - r.Item2.Should().BeEquivalentTo(Enumerable.Concat(bytes[i * 2], bytes[i * 2 + 1])); + r.Item2.Should().BeEquivalentTo(bytes[i * 2].Concat(bytes[i * 2 + 1])); } inputStream.Dispose(); @@ -236,7 +236,7 @@ await this.AssertAllStagesStoppedAsync(() => { var r1 = ReadN(inputStream, 15); r1.Item1.Should().Be(15); - r1.Item2.Should().BeEquivalentTo(Enumerable.Concat(bytes1, bytes2.Slice(0, 5))); + r1.Item2.Should().BeEquivalentTo(bytes1.Concat(bytes2.Slice(0, 5))); var r2 = ReadN(inputStream, 15); r2.Item1.Should().Be(5); From 31122d7a74e24b7a980a3ca89b862d8486709707 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 1 Apr 2024 14:44:35 -0500 Subject: [PATCH 05/16] fixed issue with fuzzing spec --- src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index f0a143931bf..1addce95512 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -424,17 +424,18 @@ public async Task GroupBy_must_work_under_fuzzing_stress_test() await this.AssertAllStagesStoppedAsync(async () => { var publisherProbe = this.CreateManualPublisherProbe(); - var subscriber = this.CreateManualSubscriberProbe>(); + var subscriber = this.CreateManualSubscriberProbe(); var firstGroup = (Source, NotUsed>)Source.FromPublisher(publisherProbe) .GroupBy(256, element => element[0]) .Select(b => b.ToArray()) // have to convert to an array .Select(b => b.Reverse()) .MergeSubstreams(); - var secondGroup = (Source, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First()) + var secondGroup = (Source)firstGroup.GroupBy(256, bytes => bytes.First()) .Select(b => b.Reverse()) + .Select(b => ByteString.FromBytes(b.ToArray())) .MergeSubstreams(); - var publisher = secondGroup.RunWith(Sink.AsPublisher>(false), Materializer); + var publisher = secondGroup.RunWith(Sink.AsPublisher(false), Materializer); publisher.Subscribe(subscriber); var upstreamSubscription = await publisherProbe.ExpectSubscriptionAsync(); From 8cbb00b19d4ba986cd5a8a1eb4167b379846547e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 1 Apr 2024 15:30:34 -0500 Subject: [PATCH 06/16] added API approvals --- .../CoreAPISpec.ApproveCore.DotNet.verified.txt | 16 +++++++++++++--- .../CoreAPISpec.ApproveCore.Net.verified.txt | 16 +++++++++++++--- src/core/Akka/IO/TcpConnection.cs | 11 +++-------- src/core/Akka/Util/ByteString.cs | 2 ++ 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 08d6294b218..3c3365b92b3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3653,14 +3653,17 @@ namespace Akka.IO BigEndian = 0, LittleEndian = 1, } - [System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : System.Collections.Generic.IEnumerable, System.Collections.IEnumerable, System.IEquatable + [System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : System.IEquatable { public int Count { get; } public static Akka.IO.ByteString Empty { get; } + [System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")] public bool IsCompact { get; } public bool IsEmpty { get; } public byte this[int index] { get; } + public System.ReadOnlyMemory Memory { get; } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public Akka.IO.ByteString Compact() { } public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { } public static Akka.IO.ByteString CopyFrom(byte[] array) { } @@ -3672,9 +3675,13 @@ namespace Akka.IO public static Akka.IO.ByteString CopyFrom(System.Span span, int offset, int count) { } public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable> buffers) { } public int CopyTo(byte[] buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer, int index, int count) { } public override bool Equals(object obj) { } public bool Equals(Akka.IO.ByteString other) { } @@ -3684,7 +3691,6 @@ namespace Akka.IO public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable> buffers) { } public static Akka.IO.ByteString FromString(string str) { } public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { } - public System.Collections.Generic.IEnumerator GetEnumerator() { } public override int GetHashCode() { } public bool HasSubstring(Akka.IO.ByteString other, int index) { } public int IndexOf(byte b) { } @@ -3694,12 +3700,16 @@ namespace Akka.IO public byte[] ToArray() { } public override string ToString() { } public string ToString(System.Text.Encoding encoding) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public void WriteTo(System.IO.Stream stream) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { } public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static Akka.IO.ByteString op_Explicit(byte[] bytes) { } public static byte[] op_Explicit(Akka.IO.ByteString byteString) { } + public static Akka.IO.ByteString op_Explicit(System.Memory memory) { } + public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory memory) { } public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index f50634d8b95..9431d9a9175 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3643,14 +3643,17 @@ namespace Akka.IO BigEndian = 0, LittleEndian = 1, } - [System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : System.Collections.Generic.IEnumerable, System.Collections.IEnumerable, System.IEquatable + [System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : System.IEquatable { public int Count { get; } public static Akka.IO.ByteString Empty { get; } + [System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")] public bool IsCompact { get; } public bool IsEmpty { get; } public byte this[int index] { get; } + public System.ReadOnlyMemory Memory { get; } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public Akka.IO.ByteString Compact() { } public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { } public static Akka.IO.ByteString CopyFrom(byte[] array) { } @@ -3662,9 +3665,13 @@ namespace Akka.IO public static Akka.IO.ByteString CopyFrom(System.Span span, int offset, int count) { } public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable> buffers) { } public int CopyTo(byte[] buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer, int index, int count) { } public override bool Equals(object obj) { } public bool Equals(Akka.IO.ByteString other) { } @@ -3674,7 +3681,6 @@ namespace Akka.IO public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable> buffers) { } public static Akka.IO.ByteString FromString(string str) { } public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { } - public System.Collections.Generic.IEnumerator GetEnumerator() { } public override int GetHashCode() { } public bool HasSubstring(Akka.IO.ByteString other, int index) { } public int IndexOf(byte b) { } @@ -3684,12 +3690,16 @@ namespace Akka.IO public byte[] ToArray() { } public override string ToString() { } public string ToString(System.Text.Encoding encoding) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public void WriteTo(System.IO.Stream stream) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { } public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static Akka.IO.ByteString op_Explicit(byte[] bytes) { } public static byte[] op_Explicit(Akka.IO.ByteString byteString) { } + public static Akka.IO.ByteString op_Explicit(System.Memory memory) { } + public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory memory) { } public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 049a737f755..0adcb8567cb 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -240,14 +240,9 @@ private Receive PeerSentEOF(ConnectionInfo info) return message => { if (handleWrite(message)) return true; - var cmd = message as CloseCommand; - if (cmd != null) - { - HandleClose(info, Sender, cmd.Event); - return true; - } - if (message is ResumeReading) return true; - return false; + if (message is not CloseCommand cmd) return message is ResumeReading; + HandleClose(info, Sender, cmd.Event); + return true; }; } diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 24112a341cd..5c3c17d1da7 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -514,6 +514,7 @@ public int CopyTo(ref Span buffer, int index, int count) /// writeable . /// /// + [Obsolete("This method will be removed in future versions of Akka.NET.")] public void WriteTo(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); @@ -528,6 +529,7 @@ public void WriteTo(Stream stream) /// to a provided writeable . /// /// + [Obsolete("This method will be removed in future versions of Akka.NET.")] public Task WriteToAsync(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); From 53a7224ef49dea2415f8c2d3cb205f3972f3f36e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 16:55:29 -0500 Subject: [PATCH 07/16] Revert "fixed framing issues" This reverts commit d9c3d9821185f858522d885764ddecb32d5eda8d. --- src/core/Akka.Streams/Dsl/Framing.cs | 69 ++++++++++++---------------- 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 0dee51bb9f6..465b7ef14ac 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -169,45 +169,39 @@ public FramingException(string message) : base(message) protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { } } - private static int BigEndianDecoder(ref ReadOnlySpan enumerator, int length) + private static readonly Func, int, int> BigEndianDecoder = (enumerator, length) => { - if (length > enumerator.Length) - { - throw new ArgumentException("Length exceeds the size of the enumerator."); - } - - var result = 0; - - // Assuming 'length' is 4 for a 32-bit integer. - // Adjust the loop and shift amounts if dealing with different sizes. - for (var i = 0; i < length; i++) + var count = length; + var decoded = 0; + while (count > 0) { - result |= enumerator[i] << ((length - 1 - i) * 8); + decoded <<= 8; + if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); + decoded |= enumerator.Current & 0xFF; + count--; } - return result; - } + return decoded; + }; - private static int LittleEndianDecoder(ref ReadOnlySpan span, int length) + private static readonly Func, int, int> LittleEndianDecoder = (enumerator, length) => { - if (length > span.Length) - { - throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte span"); - } - + var highestOcted = (length - 1) << 3; + var mask = (int) (1L << (length << 3)) - 1; + var count = length; var decoded = 0; - var highestOctetShift = (length - 1) << 3; - var mask = (int)(1L << (length << 3)) - 1; - for (var i = 0; i < length; i++) + while (count > 0) { - // Shift and add the ith byte to 'decoded'. No need for >>>= as in JVM; just shift appropriately. - var shiftAmount = highestOctetShift - (i << 3); - decoded |= (span[i] & 0xFF) << shiftAmount; + // decoded >>>= 8 on the jvm + decoded = (int) ((uint) decoded >> 8); + if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); + decoded += (enumerator.Current & 0xFF) << highestOcted; + count--; } return decoded & mask; - } + }; private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage { @@ -348,8 +342,8 @@ private void DoParse() else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) { // Found a match - var parsedFrame = _buffer.Slice(0, possibleMatchPosition); - _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count); + var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); + _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); _nextPossibleMatch = 0; Push(_stage.Outlet, parsedFrame); @@ -428,7 +422,7 @@ public override void OnUpstreamFinish() /// private void PushFrame() { - var emit = _buffer.Slice(0, _frameSize); + var emit = _buffer.Slice(0, _frameSize).Compact(); _buffer = _buffer.Slice(_frameSize); _frameSize = int.MaxValue; Push(_stage.Outlet, emit); @@ -446,14 +440,9 @@ private void TryPushFrame() PushFrame(); else if (bufferSize >= _stage._minimumChunkSize) { - var iterator = _buffer.Memory.Span.Slice(_stage._lengthFieldOffset); - var parsedLength = _stage._byteOrder switch { - ByteOrder.BigEndian => BigEndianDecoder(ref iterator, _stage._lengthFieldLength), - ByteOrder.LittleEndian => LittleEndianDecoder(ref iterator, _stage._lengthFieldLength), - _ => throw new NotSupportedException($"ByteOrder {_stage._byteOrder} is not supported") - }; - - // TODO: AVOID ARRAY COPYING AGAIN HERE + var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator(); + var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength); + _frameSize = _stage._computeFrameSize.HasValue ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength) : parsedLength + _stage._minimumChunkSize; @@ -491,7 +480,7 @@ private void TryPull() private readonly int _maximumFramelength; private readonly int _lengthFieldOffset; private readonly int _minimumChunkSize; - private readonly ByteOrder _byteOrder; + private readonly Func, int, int> _intDecoder; private readonly Option, int, int>> _computeFrameSize; // For the sake of binary compatibility @@ -511,7 +500,7 @@ public LengthFieldFramingStage( _lengthFieldOffset = lengthFieldOffset; _minimumChunkSize = lengthFieldOffset + lengthFieldLength; _computeFrameSize = computeFrameSize; - _byteOrder = byteOrder; + _intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder; } protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); From 1e378989e18a120dbb8ca9a997734a681610c9f3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 17:01:09 -0500 Subject: [PATCH 08/16] fixed frame-length encoding --- src/core/Akka.Streams/Dsl/Framing.cs | 57 +++++++++++++++++++--------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 465b7ef14ac..8e7a0f667c7 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -169,40 +169,61 @@ public FramingException(string message) : base(message) protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { } } - private static readonly Func, int, int> BigEndianDecoder = (enumerator, length) => + private static readonly Func, int, int> BigEndianDecoder = (byteData, length) => { - var count = length; + if (length < 1 || length > 4) + { + throw new ArgumentOutOfRangeException(nameof(length), "Length must be between 1 and 4."); + } + + if (byteData.Length < length) + { + throw new IndexOutOfRangeException("BigEndianDecoder does not have enough bytes to decode."); + } + var decoded = 0; - while (count > 0) + for (int i = 0; i < length; i++) { + // Shift the previously accumulated value to the left, making space for the next byte decoded <<= 8; - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded |= enumerator.Current & 0xFF; - count--; + + // Read the next byte and add it to the result + decoded |= byteData.Span[i] & 0xFF; } return decoded; }; - private static readonly Func, int, int> LittleEndianDecoder = (enumerator, length) => + + private static readonly Func, int, int> LittleEndianDecoder = (byteData, length) => { - var highestOcted = (length - 1) << 3; - var mask = (int) (1L << (length << 3)) - 1; - var count = length; + if (length < 1 || length > 4) + { + throw new ArgumentOutOfRangeException(nameof(length), "Length must be between 1 and 4."); + } + + var highestOctet = (length - 1) << 3; + var mask = (int)(1L << (length << 3)) - 1; var decoded = 0; - while (count > 0) + for (int i = 0; i < length; i++) { - // decoded >>>= 8 on the jvm - decoded = (int) ((uint) decoded >> 8); - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded += (enumerator.Current & 0xFF) << highestOcted; - count--; + if (i >= byteData.Length) + { + throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte array."); + } + + // Shift the previously processed bytes to the right, making space for the next byte + decoded = (int)((uint)decoded >> 8); + + // Read the next byte and insert it into the correct position + decoded += (byteData.Span[i] & 0xFF) << highestOctet; } return decoded & mask; }; + private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage { #region Logic @@ -440,7 +461,7 @@ private void TryPushFrame() PushFrame(); else if (bufferSize >= _stage._minimumChunkSize) { - var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator(); + var iterator = _buffer.Memory.Slice(_stage._lengthFieldOffset); var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength); _frameSize = _stage._computeFrameSize.HasValue @@ -480,7 +501,7 @@ private void TryPull() private readonly int _maximumFramelength; private readonly int _lengthFieldOffset; private readonly int _minimumChunkSize; - private readonly Func, int, int> _intDecoder; + private readonly Func, int, int> _intDecoder; private readonly Option, int, int>> _computeFrameSize; // For the sake of binary compatibility From 0726e2d8b3c8ad9dd49abc0d23fec007f7fbb875 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 17:30:00 -0500 Subject: [PATCH 09/16] added substring spec --- src/core/Akka.Tests/Util/ByteStringSpec.cs | 27 +++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index 949f8ca3afb..0984097649d 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -150,7 +150,7 @@ public void A_concatenated_ByteString_with_partial_characters_must_return_correc data += ByteString.CopyFrom(rawData, 1, 3); // One and a half characters Assert.Equal(rawData.Length, data.Count); - string actual = data.ToString(encoding); + var actual = data.ToString(encoding); Assert.Equal(expected, actual); } @@ -173,6 +173,31 @@ public void A_sliced_ByteString_must_return_correct_string_for_ToString() Assert.Equal(expectedLeft, actualLeft); Assert.Equal(expectedRight, actualRight); } + + // generate a test case for the ByteString.HasSubstring method, when one big ByteString contains another ByteString + [Fact] + public void A_ByteString_must_return_true_when_containing_another_ByteString() + { + Prop.ForAll((ByteString a, ByteString b) => + { + var big = a + b + a; + return ByteStringHasSubstringZeroIndex().Label($"big: {big}, b: {b}") + .And(ByteStringHasSubstringNonZeroIndex().Label($"big: {big}, b: {b}")); + + bool ByteStringHasSubstringZeroIndex() + { + return big.HasSubstring(b, 0); + } + + bool ByteStringHasSubstringNonZeroIndex() + { + return big.HasSubstring(b, a.Count); + } + + }) + .QuickCheckThrowOnFailure(); + } + #if !NETFRAMEWORK [Fact(DisplayName = "A sliced byte string using Range must return the correct string for ToString")] From abc6aa4f421483406494f7abe954a1896d946be7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 17:47:05 -0500 Subject: [PATCH 10/16] validated IndexOf operation --- src/core/Akka.Tests/Util/ByteStringSpec.cs | 37 +++++++++++++++++++--- src/core/Akka/Util/ByteString.cs | 33 +++++++++++-------- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index 0984097649d..dd75eb3bc88 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -8,6 +8,7 @@ using System; using System.Linq; using System.Text; +using Akka.Actor; using Akka.IO; using FluentAssertions; using FsCheck; @@ -173,7 +174,7 @@ public void A_sliced_ByteString_must_return_correct_string_for_ToString() Assert.Equal(expectedLeft, actualLeft); Assert.Equal(expectedRight, actualRight); } - + // generate a test case for the ByteString.HasSubstring method, when one big ByteString contains another ByteString [Fact] public void A_ByteString_must_return_true_when_containing_another_ByteString() @@ -188,16 +189,44 @@ bool ByteStringHasSubstringZeroIndex() { return big.HasSubstring(b, 0); } - + bool ByteStringHasSubstringNonZeroIndex() { return big.HasSubstring(b, a.Count); } - }) .QuickCheckThrowOnFailure(); } - + + // generate a test case for ByteString.IndexOf of another ByteString and ensure that the index is correct (should match the beginning of the other ByteString) + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_another_ByteString() + { + Prop.ForAll((ByteString a, ByteString b) => + { + return Prop.When(b.Count > 0, () => + { + var big = a + b + a; + var i = ByteStringIndexOfZeroIndex(); + var g = ByteStringIndexOfNonZeroIndex(); + + return (i == a.Count).Label($"big: {big}, b: {b}, expected start: {a.Count}, actual start: {i}") + .And((g == a.Count).Label($"big: {big}, b: {b}, expected start: {a.Count}, actual start: {g}")); + + int ByteStringIndexOfZeroIndex() + { + return big.IndexOf(b, 0); + } + + int ByteStringIndexOfNonZeroIndex() + { + return big.IndexOf(b, a.Count); + } + }); + }) + .QuickCheckThrowOnFailure(); + } + #if !NETFRAMEWORK [Fact(DisplayName = "A sliced byte string using Range must return the correct string for ToString")] diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 5c3c17d1da7..3317a6fb73f 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -371,20 +371,12 @@ public int IndexOf(byte b, int from) { return _memory.Span[from..].IndexOf(b); } - - /// - /// Checks if a subsequence determined by the - /// byte string is can be found in current one, starting from provided - /// . - /// - /// - /// - /// - public bool HasSubstring(ByteString other, int index) + + public int IndexOf(ByteString other, int index = 0) { - if (other.Count == 0) return true; // Empty spans are always "found". + if (other.Count == 0) return index; // Empty spans are always "found". if (index < 0 || index > Count) throw new ArgumentOutOfRangeException(nameof(index), "Start index is out of range."); - if (Count - index < other.Count) return false; // Can't find if `toFind` is larger considering the start index. + if (Count - index < other.Count) return -1; // Can't find if `toFind` is larger considering the start index. for (var i = index; i <= Count - other.Count; i++) { @@ -398,10 +390,23 @@ public bool HasSubstring(ByteString other, int index) break; } } - if (found) return true; + if (found) return i; } - return false; + return -1; + } + + /// + /// Checks if a subsequence determined by the + /// byte string is can be found in current one, starting from provided + /// . + /// + /// + /// + /// + public bool HasSubstring(ByteString other, int index) + { + return IndexOf(other, index) > -1; } /// From aca4c724cf0207b7053a13df9d5ba9fb72ef2d12 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 17:58:17 -0500 Subject: [PATCH 11/16] fixing up specs --- src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs | 9 ++++----- src/core/Akka.Tests/Util/ByteStringSpec.cs | 13 +++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs index 0162713889e..7446a25dc1d 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs @@ -126,21 +126,20 @@ private static IEnumerable CompleteTestSequence(ByteString delimiter } [Fact] - public void Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences() + public async Task Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences() { for (var i = 1; i <= 100; i++) { foreach (var delimiter in DelimiterBytes) { var testSequence = CompleteTestSequence(delimiter).ToList(); - var task = Source.From(testSequence) + var task = await Source.From(testSequence) .Select(x => x + delimiter) .Via(Rechunk) .Via(Framing.Delimiter(delimiter, 256)) .RunWith(Sink.Seq(), Materializer); - - task.Wait(TimeSpan.FromDays(3)).Should().BeTrue(); - task.Result.Should().BeEquivalentTo(testSequence); + + task.Should().BeEquivalentTo(testSequence); } } } diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index dd75eb3bc88..0653be0b9eb 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -226,6 +226,19 @@ int ByteStringIndexOfNonZeroIndex() }) .QuickCheckThrowOnFailure(); } + + // write a unit test for the IndexOf method when we're looking for a single byte - do it without using FsCheck + [Theory] + [InlineData(0)] + [InlineData(1)] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte(int startingIndex) + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 5 }); + + var i = a.IndexOf(b, startingIndex); + Assert.Equal(4, i); + } #if !NETFRAMEWORK From ded7e5cdad70b9f3ca280209f455b68e53168be6 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 18:00:55 -0500 Subject: [PATCH 12/16] added ByteString.IndexOf method for single bytes --- src/core/Akka.Tests/Util/ByteStringSpec.cs | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index 0653be0b9eb..fbfda0328d4 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -238,6 +238,37 @@ public void A_ByteString_must_return_correct_index_when_containing_a_single_byte var i = a.IndexOf(b, startingIndex); Assert.Equal(4, i); + + // also do the comparison with the byte value + var j = a.IndexOf(5, startingIndex); + Assert.Equal(4, j); + } + + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte_front() + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 1 }); + + var i = a.IndexOf(b); + Assert.Equal(0, i); + + // also do the comparison with the byte value + var j = a.IndexOf(1); + Assert.Equal(0, j); + } + + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte_back() + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 9 }); + + var i = a.IndexOf(b); + Assert.Equal(8, i); + + // also do the comparison with the byte value + var j = a.IndexOf(9); } From 1a9389677ba1be473389f134a89d1a47db82e5eb Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 18:50:35 -0500 Subject: [PATCH 13/16] fixing up framing --- .../Akka.Streams.Tests/Dsl/FramingSpec.cs | 149 ++++++++------ src/core/Akka.Streams/Dsl/Framing.cs | 182 ++++++++++++------ src/core/Akka/Util/ByteString.cs | 3 +- 3 files changed, 212 insertions(+), 122 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs index 7446a25dc1d..48acb9fd6db 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs @@ -97,7 +97,6 @@ private void Rechunk() public Rechunker() : base("Rechunker") { - } protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); @@ -107,14 +106,15 @@ private Flow Rechunk => Flow.Create().Via(new Rechunker()).Named("rechunker"); private static readonly List DelimiterBytes = - new List {"\n", "\r\n", "FOO"}.Select(ByteString.FromString).ToList(); + new List { "\n", "\r\n", "FOO" }.Select(ByteString.FromString).ToList(); private static readonly List BaseTestSequences = new List { "", "foo", "hello world" }.Select(ByteString.FromString).ToList(); - private static Flow SimpleLines(string delimiter, int maximumBytes, bool allowTruncation = true) + private static Flow SimpleLines(string delimiter, int maximumBytes, + bool allowTruncation = true) { - return Framing.Delimiter(ByteString.FromString(delimiter), maximumBytes, allowTruncation) + return Framing.Delimiter(ByteString.FromString(delimiter), maximumBytes, allowTruncation) .Select(x => x.ToString(Encoding.UTF8)).Named("LineFraming"); } @@ -125,23 +125,24 @@ private static IEnumerable CompleteTestSequence(ByteString delimiter yield return delimiter.Slice(0, i) + sequence; } - [Fact] - public async Task Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences() + public static readonly TheoryData DelimiterBytesData = new() { - for (var i = 1; i <= 100; i++) - { - foreach (var delimiter in DelimiterBytes) - { - var testSequence = CompleteTestSequence(delimiter).ToList(); - var task = await Source.From(testSequence) - .Select(x => x + delimiter) - .Via(Rechunk) - .Via(Framing.Delimiter(delimiter, 256)) - .RunWith(Sink.Seq(), Materializer); - - task.Should().BeEquivalentTo(testSequence); - } - } + ByteString.FromString("\n"), ByteString.FromString("\r\n"), ByteString.FromString("FOO") + }; + + [Theory] + [MemberData(nameof(DelimiterBytesData))] + public async Task Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences( + ByteString delimiter) + { + var testSequence = CompleteTestSequence(delimiter).ToList(); + var task = await Source.From(testSequence) + .Select(x => x + delimiter) + .Via(Rechunk) + .Via(Framing.Delimiter(delimiter, 256)) + .RunWith(Sink.Seq(), Materializer); + + task.Should().BeEquivalentTo(testSequence); } [Fact] @@ -153,7 +154,7 @@ public void Delimiter_bytes_based_framing_must_respect_maximum_line_settings() .RunWith(Sink.Seq(), Materializer); task1.Wait(TimeSpan.FromDays(3)).Should().BeTrue(); - task1.Result.Should().BeEquivalentTo(new[] {"a", "b", "c", "d"}); + task1.Result.Should().BeEquivalentTo(new[] { "a", "b", "c", "d" }); var task2 = Source.Single(ByteString.FromString("ab\n")) @@ -173,11 +174,12 @@ public void Delimiter_bytes_based_framing_must_respect_maximum_line_settings() [Fact] public void Delimiter_bytes_based_framing_must_work_with_empty_streams() { - var task = Source.Empty().Via(SimpleLines("\n", 256)).RunAggregate(new List(), (list, s) => - { - list.Add(s); - return list; - }, Materializer); + var task = Source.Empty().Via(SimpleLines("\n", 256)).RunAggregate(new List(), + (list, s) => + { + list.Add(s); + return list; + }, Materializer); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); task.Result.Should().BeEmpty(); } @@ -212,16 +214,12 @@ private static string RandomString(int length) const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; var random = new Random(); return new string(Enumerable.Repeat(chars, length) - .Select(s => s[random.Next(s.Length)]).ToArray()); + .Select(s => s[random.Next(s.Length)]).ToArray()); } private static readonly ByteString ReferenceChunk = ByteString.FromString(RandomString(0x100001)); - private static readonly List ByteOrders = new() - { - ByteOrder.BigEndian, - ByteOrder.LittleEndian - }; + private static readonly List ByteOrders = new() { ByteOrder.BigEndian, ByteOrder.LittleEndian }; private static readonly List FrameLengths = new() { @@ -240,18 +238,31 @@ private static string RandomString(int length) 0x10001 }; - private static readonly List FieldLengths = new() {1, 2, 3, 4}; + private static readonly List FieldLengths = new() { 1, 2, 3, 4 }; - private static readonly List FieldOffsets = new() {0, 1, 2, 3, 15, 16, 31, 32, 44, 107}; + private static readonly List FieldOffsets = new() + { + 0, + 1, + 2, + 3, + 15, + 16, + 31, + 32, + 44, + 107 + }; private static ByteString Encode(ByteString payload, int fieldOffset, int fieldLength, ByteOrder byteOrder) => - EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(new byte[fieldOffset]), ByteString.Empty); + EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(new byte[fieldOffset]), + ByteString.Empty); private static ByteString EncodeComplexFrame( - ByteString payload, - int fieldLength, - ByteOrder byteOrder, - ByteString offset, + ByteString payload, + int fieldLength, + ByteOrder byteOrder, + ByteString offset, ByteString tail) { var h = ByteString.FromBytes(new byte[4].PutInt(payload.Count, order: byteOrder)); @@ -284,15 +295,17 @@ public void Length_field_based_framing_must_work_with_various_byte_orders_frame_ } } - Parallel.ForEach(GetFutureResults(), async futureResult => - { + Parallel.ForEach(GetFutureResults(), async futureResult => + { var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult; - result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); + result.ShouldBeSame(encodedFrames, + $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); }); } [Fact] - public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize() + public void + Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize() { IEnumerable, List, (ByteOrder, int, int))>> GetFutureResults() { @@ -307,6 +320,7 @@ int ComputeFrameSize(IReadOnlyList offset, int length) } var random = new Random(); + byte[] Offset() { var arr = new byte[fieldOffset]; @@ -319,7 +333,8 @@ byte[] Offset() var payload = ReferenceChunk.Slice(0, length); var offsetBytes = Offset(); var tailBytes = offsetBytes.Length > 0 ? new byte[offsetBytes[0]] : Array.Empty(); - return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), ByteString.FromBytes(tailBytes)); + return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), + ByteString.FromBytes(tailBytes)); }).ToList(); yield return Source.From(encodedFrames) @@ -331,10 +346,11 @@ byte[] Offset() } } - Parallel.ForEach(GetFutureResults(), async futureResult => - { + Parallel.ForEach(GetFutureResults(), async futureResult => + { var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult; - result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); + result.ShouldBeSame(encodedFrames, + $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); }); } @@ -386,18 +402,19 @@ public void Length_field_based_framing_must_report_truncated_frames() { foreach (var frameLength in FrameLengths.Where(f => f < 1 << (fieldLength * 8) && f != 0)) { - var fullFrame = Encode(ReferenceChunk.Slice(0, frameLength), fieldOffset, fieldLength, byteOrder); + var fullFrame = Encode(ReferenceChunk.Slice(0, frameLength), fieldOffset, fieldLength, + byteOrder); var partialFrame = fullFrame.Slice(0, fullFrame.Count - 1); // dropRight equivalent Action action = () => { - Source.From(new[] {fullFrame, partialFrame}) - .Via(Rechunk) - .Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder)) - .Grouped(10000) - .RunWith(Sink.First>(), Materializer) - .Wait(TimeSpan.FromSeconds(5)) - .ShouldBeTrue("Stream should complete withing 5 seconds"); + Source.From(new[] { fullFrame, partialFrame }) + .Via(Rechunk) + .Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder)) + .Grouped(10000) + .RunWith(Sink.First>(), Materializer) + .Wait(TimeSpan.FromSeconds(5)) + .ShouldBeTrue("Stream should complete withing 5 seconds"); }; action.Should().Throw(); } @@ -415,8 +432,9 @@ public void Length_field_based_framing_must_support_simple_framing_adapter() .Atop(Framing.SimpleFramingProtocol(1024).Reversed()) .Join(Flow.Create()); // Loopback - var random= new Random(); - var testMessages = Enumerable.Range(1, 100).Select(_ => ReferenceChunk.Slice(0, random.Next(1024))).ToList(); + var random = new Random(); + var testMessages = Enumerable.Range(1, 100).Select(_ => ReferenceChunk.Slice(0, random.Next(1024))) + .ToList(); var task = Source.From(testMessages) .Via(codecFlow) @@ -444,7 +462,7 @@ await Awaiting(async () => await result) .WithMessage("Decoded frame header reported negative size -4") .ShouldCompleteWithin(3.Seconds()); } - + [Fact] public async Task Length_field_based_framing_must_ignore_length_field_value_when_provided_computeFrameSize() { @@ -455,23 +473,26 @@ public async Task Length_field_based_framing_must_ignore_length_field_value_when var bs = ByteString.FromBytes(tempArray.PutInt(checked(0x04050607), order: ByteOrder.LittleEndian)); var result = Source.Single(bs) - .Via(Flow.Create().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) + .Via(Flow.Create() + .Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) .RunWith(Sink.Seq(), Materializer); var complete = await result.ShouldCompleteWithin(3.Seconds()); complete.Should().BeEquivalentTo(ImmutableArray.Create(bs)); } - + [Fact] - public async Task Length_field_based_framing_must_fail_the_stage_on_computeFrameSize_values_less_than_minimum_chunk_size() + public async Task + Length_field_based_framing_must_fail_the_stage_on_computeFrameSize_values_less_than_minimum_chunk_size() { int ComputeFrameSize(IReadOnlyList offset, int length) => 3; // A 4-byte message containing only an Int specifying the length of the payload var bytes = ByteString.FromBytes(BitConverter.GetBytes(4)); - + var result = Source.Single(bytes) - .Via(Flow.Create().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) + .Via(Flow.Create() + .Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) .RunWith(Sink.Seq(), Materializer); await Awaiting(async () => await result) @@ -497,4 +518,4 @@ public async Task Length_field_based_framing_must_let_zero_length_field_values_p complete.Should().BeEquivalentTo(bytes.ToImmutableList()); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 8e7a0f667c7..e6e742ad932 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -92,9 +92,9 @@ public static Flow LengthField(int fieldLength, /// /// TBD public static Flow LengthField( - int fieldLength, + int fieldLength, int fieldOffset, - int maximumFrameLength, + int maximumFrameLength, ByteOrder byteOrder, Func, int, int> computeFrameSize) { @@ -102,7 +102,8 @@ public static Flow LengthField( throw new ArgumentException("Length field length must be 1,2,3 or 4", nameof(fieldLength)); return Flow.Create() - .Via(new LengthFieldFramingStage(fieldLength, maximumFrameLength, fieldOffset, byteOrder, computeFrameSize)) + .Via(new LengthFieldFramingStage(fieldLength, maximumFrameLength, fieldOffset, byteOrder, + computeFrameSize)) .Named("LengthFieldFraming"); } @@ -122,7 +123,8 @@ public static Flow LengthField( /// /// Maximum length of allowed messages. If sent or received messages exceed the configured limit this BidiFlow will fail the stream. The header attached by this BidiFlow are not included in this limit. /// TBD - public static BidiFlow SimpleFramingProtocol(int maximumMessageLength) + public static BidiFlow SimpleFramingProtocol( + int maximumMessageLength) { return BidiFlow.FromFlowsMat(SimpleFramingProtocolEncoder(maximumMessageLength), SimpleFramingProtocolDecoder(maximumMessageLength), Keep.Left); @@ -166,7 +168,9 @@ public FramingException(string message) : base(message) /// /// The that holds the serialized object data about the exception being thrown. /// The that contains contextual information about the source or destination. - protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { } + protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } } private static readonly Func, int, int> BigEndianDecoder = (byteData, length) => @@ -186,7 +190,7 @@ protected FramingException(SerializationInfo info, StreamingContext context) : b { // Shift the previously accumulated value to the left, making space for the next byte decoded <<= 8; - + // Read the next byte and add it to the result decoded |= byteData.Span[i] & 0xFF; } @@ -212,10 +216,10 @@ protected FramingException(SerializationInfo info, StreamingContext context) : b { throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte array."); } - + // Shift the previously processed bytes to the right, making space for the next byte decoded = (int)((uint)decoded >> 8); - + // Read the next byte and insert it into the correct position decoded += (byteData.Span[i] & 0xFF) << highestOctet; } @@ -251,14 +255,11 @@ public override void OnPush() { var header = ByteString.CopyFrom(new[] { - Convert.ToByte((messageSize >> 24) & 0xFF), - Convert.ToByte((messageSize >> 16) & 0xFF), - Convert.ToByte((messageSize >> 8) & 0xFF), - Convert.ToByte(messageSize & 0xFF) + Convert.ToByte((messageSize >> 24) & 0xFF), Convert.ToByte((messageSize >> 16) & 0xFF), + Convert.ToByte((messageSize >> 8) & 0xFF), Convert.ToByte(messageSize & 0xFF) }); Push(_stage.Outlet, header + message); } - } public override void OnPull() => Pull(_stage.Inlet); @@ -287,7 +288,7 @@ private sealed class Logic : InAndOutGraphStageLogic private ByteString _buffer = ByteString.Empty; private int _nextPossibleMatch; - public Logic(DelimiterFramingStage stage) : base (stage.Shape) + public Logic(DelimiterFramingStage stage) : base(stage.Shape) { _stage = stage; _firstSeparatorByte = stage._separatorBytes[0]; @@ -328,58 +329,119 @@ private void TryPull() "Stream finished but there was a truncated final frame in the buffer")); } else - Pull(_stage.Inlet); + { + // check to see if stage is already pulled - if not, Pull it + if (!HasBeenPulled(_stage.Inlet)) + { + // if we haven't already pulled, then pull + Pull(_stage.Inlet); + } + } } private void DoParse() { - while (true) + // check to see if the buffer is big enough to contain the separator + if (_buffer.Count < _stage._separatorBytes.Count) { - var possibleMatchPosition = _buffer.IndexOf(_firstSeparatorByte, from: _nextPossibleMatch); - - if (possibleMatchPosition > _stage._maximumLineBytes) + if (IsClosed(_stage.Inlet)) { - FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - } - else if (possibleMatchPosition == -1) - { - if (_buffer.Count > _stage._maximumLineBytes) - FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - else + if (_stage._allowTruncation) { - // No matching character, we need to accumulate more bytes into the buffer - _nextPossibleMatch = _buffer.Count; - TryPull(); + Push(_stage.Outlet, _buffer); + CompleteStage(); } + else + FailStage( + new FramingException( + "Stream finished but there was a truncated final frame in the buffer")); } - else if (possibleMatchPosition + _stage._separatorBytes.Count > _buffer.Count) - { - // We have found a possible match (we found the first character of the terminator - // sequence) but we don't have yet enough bytes. We remember the position to - // retry from next time. - _nextPossibleMatch = possibleMatchPosition; + else TryPull(); - } - else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) - { - // Found a match - var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); - _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); - _nextPossibleMatch = 0; - Push(_stage.Outlet, parsedFrame); - if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) - CompleteStage(); + return; + } + + // search for the separator within the buffer + var definiteMatch = _buffer.IndexOf(_stage._separatorBytes, index: _nextPossibleMatch); + if (definiteMatch > _stage._maximumLineBytes) + { + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + return; + } + if (definiteMatch == -1) + { + if (_buffer.Count > _stage._maximumLineBytes) + { + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); } else { - // possibleMatchPos was not actually a match - _nextPossibleMatch++; - continue; + // No matching character, we need to accumulate more bytes into the buffer + _nextPossibleMatch = _buffer.Count; + TryPull(); } + } + else + { + // Found a match + var parsedFrame = _buffer.Slice(0, definiteMatch).Compact(); + _buffer = _buffer.Slice(definiteMatch + _stage._separatorBytes.Count).Compact(); + _nextPossibleMatch = 0; + Push(_stage.Outlet, parsedFrame); - break; + if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) + CompleteStage(); } + + // var possibleMatchPosition = _buffer.IndexOf(_firstSeparatorByte, from: _nextPossibleMatch); + // + // if (possibleMatchPosition > _stage._maximumLineBytes) + // { + // FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + // } + // else if (possibleMatchPosition == -1) + // { + // if (_buffer.Count > _stage._maximumLineBytes) + // FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + // else + // { + // // No matching character, we need to accumulate more bytes into the buffer + // _nextPossibleMatch = _buffer.Count; + // TryPull(); + // } + // } + // else if (possibleMatchPosition + _stage._separatorBytes.Count > _buffer.Count) + // { + // // We have found a possible match (we found the first character of the terminator + // // sequence) but we don't have yet enough bytes. We remember the position to + // // retry from next time. + // _nextPossibleMatch = possibleMatchPosition; + // TryPull(); + // } + // else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) + // { + // // Found a match + // var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); + // _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); + // _nextPossibleMatch = 0; + // Push(_stage.Outlet, parsedFrame); + // + // if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) + // CompleteStage(); + // } + // else + // { + // // possibleMatchPos was not actually a match + // _nextPossibleMatch++; + // continue; + // } + // + // break; } } @@ -389,7 +451,8 @@ private void DoParse() private readonly int _maximumLineBytes; private readonly bool _allowTruncation; - public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) : base("DelimiterFraming") + public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) : base( + "DelimiterFraming") { _separatorBytes = separatorBytes; _maximumLineBytes = maximumLineBytes; @@ -465,7 +528,8 @@ private void TryPushFrame() var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength); _frameSize = _stage._computeFrameSize.HasValue - ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength) + ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), + parsedLength) : parsedLength + _stage._minimumChunkSize; if (_frameSize > _stage._maximumFramelength) @@ -489,7 +553,8 @@ private void TryPushFrame() private void TryPull() { if (IsClosed(_stage.Inlet)) - FailStage(new FramingException("Stream finished but there was a truncated final frame in the buffer")); + FailStage(new FramingException( + "Stream finished but there was a truncated final frame in the buffer")); else Pull(_stage.Inlet); } @@ -505,15 +570,18 @@ private void TryPull() private readonly Option, int, int>> _computeFrameSize; // For the sake of binary compatibility - public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder) - : this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, Option, int, int>>.None) - { } + public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, + ByteOrder byteOrder) + : this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, + Option, int, int>>.None) + { + } public LengthFieldFramingStage( int lengthFieldLength, int maximumFrameLength, int lengthFieldOffset, - ByteOrder byteOrder, + ByteOrder byteOrder, Option, int, int>> computeFrameSize) : base("LengthFieldFramingStage") { _lengthFieldLength = lengthFieldLength; @@ -527,4 +595,4 @@ public LengthFieldFramingStage( protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); } } -} +} \ No newline at end of file diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 3317a6fb73f..9241ad7069d 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -369,7 +369,8 @@ public int IndexOf(byte b) /// public int IndexOf(byte b, int from) { - return _memory.Span[from..].IndexOf(b); + var rValue = _memory.Span[from..].IndexOf(b); + return rValue == -1 ? -1 : rValue + from; } public int IndexOf(ByteString other, int index = 0) From 27ae16d0fd08ffe9feba4a169675fe24f6df7950 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 19:07:50 -0500 Subject: [PATCH 14/16] fixed some of the N+1 issues --- src/core/Akka.Streams/Dsl/Framing.cs | 25 +++++++------------------ src/core/Akka/Util/ByteString.cs | 21 +++++---------------- 2 files changed, 12 insertions(+), 34 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index e6e742ad932..fa9155e30e5 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -326,7 +326,7 @@ private void TryPull() else FailStage( new FramingException( - "Stream finished but there was a truncated final frame in the buffer")); + $"Stream finished but there was a truncated final frame in the buffer + [{_buffer.ToString()}]")); } else { @@ -344,26 +344,13 @@ private void DoParse() // check to see if the buffer is big enough to contain the separator if (_buffer.Count < _stage._separatorBytes.Count) { - if (IsClosed(_stage.Inlet)) - { - if (_stage._allowTruncation) - { - Push(_stage.Outlet, _buffer); - CompleteStage(); - } - else - FailStage( - new FramingException( - "Stream finished but there was a truncated final frame in the buffer")); - } - else - TryPull(); + TryPull(); return; } // search for the separator within the buffer - var definiteMatch = _buffer.IndexOf(_stage._separatorBytes, index: _nextPossibleMatch); + var definiteMatch = _buffer.IndexOf(_stage._separatorBytes, _nextPossibleMatch); if (definiteMatch > _stage._maximumLineBytes) { FailStage( @@ -371,6 +358,7 @@ private void DoParse() $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); return; } + if (definiteMatch == -1) { if (_buffer.Count > _stage._maximumLineBytes) @@ -389,8 +377,8 @@ private void DoParse() else { // Found a match - var parsedFrame = _buffer.Slice(0, definiteMatch).Compact(); - _buffer = _buffer.Slice(definiteMatch + _stage._separatorBytes.Count).Compact(); + var parsedFrame = _buffer.Slice(0, definiteMatch); + _buffer = _buffer.Slice(definiteMatch + _stage._separatorBytes.Count); _nextPossibleMatch = 0; Push(_stage.Outlet, parsedFrame); @@ -398,6 +386,7 @@ private void DoParse() CompleteStage(); } + // var possibleMatchPosition = _buffer.IndexOf(_firstSeparatorByte, from: _nextPossibleMatch); // // if (possibleMatchPosition > _stage._maximumLineBytes) diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 9241ad7069d..bf504bc29da 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -378,23 +378,12 @@ public int IndexOf(ByteString other, int index = 0) if (other.Count == 0) return index; // Empty spans are always "found". if (index < 0 || index > Count) throw new ArgumentOutOfRangeException(nameof(index), "Start index is out of range."); if (Count - index < other.Count) return -1; // Can't find if `toFind` is larger considering the start index. + + var span = _memory.Span; + var otherSpan = other._memory.Span; - for (var i = index; i <= Count - other.Count; i++) - { - // Check if `toFind` starts at position `i` in `container`. - var found = true; - for (var j = 0; j < other.Count; j++) - { - if (this[i + j] != other[j]) - { - found = false; - break; - } - } - if (found) return i; - } - - return -1; + var indexOf = span.Slice(index).IndexOf(otherSpan); + return indexOf == -1 ? -1 : indexOf + index; } /// From 79b01bd6a64e5433abefccc4d51f17c5f504ec8f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 20:05:05 -0500 Subject: [PATCH 15/16] fixed delimiter based encoding --- src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs | 6 +++--- src/core/Akka.Streams/Dsl/Framing.cs | 12 +++++++++++- src/core/Akka/IO/TcpConnection.cs | 1 - 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs index 48acb9fd6db..65b10923dab 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs @@ -82,8 +82,8 @@ private void Rechunk() var nextChunkSize = _buffer.IsEmpty ? 0 : ThreadLocalRandom.Current.Next(0, _buffer.Count + 1); - var newChunk = _buffer.Slice(0, nextChunkSize).Compact(); - _buffer = _buffer.Slice(nextChunkSize).Compact(); + var newChunk = _buffer.Slice(0, nextChunkSize); + _buffer = _buffer.Slice(nextChunkSize); Push(_stage.Outlet, newChunk); @@ -127,7 +127,7 @@ private static IEnumerable CompleteTestSequence(ByteString delimiter public static readonly TheoryData DelimiterBytesData = new() { - ByteString.FromString("\n"), ByteString.FromString("\r\n"), ByteString.FromString("FOO") + ByteString.FromString("\n"), ByteString.FromString("\r\n"), ByteString.FromString("FOO"), ByteString.FromString("FOUR") }; [Theory] diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index fa9155e30e5..f5fd4cb677b 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -370,7 +370,17 @@ private void DoParse() else { // No matching character, we need to accumulate more bytes into the buffer - _nextPossibleMatch = _buffer.Count; + /* + * NOTE: so this was a tad tricky to catch in the original code - this is a performance + * optimization designed to avoid re-searching through a buffer that has already been + * searched. + * + * However, if we don't find a match we need to remember the position we started searching + * MINUS the length of the delimiter - just in case we only received a _partial_ delimiter + * earlier. If we use just the _buffer.Count, we will end up missing and losing messages + * eventually. + */ + _nextPossibleMatch = _buffer.Count - _stage._separatorBytes.Count; TryPull(); } } diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 0adcb8567cb..cd1b4c83b38 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -940,7 +940,6 @@ public override void DoWrite(ConnectionInfo info) { try { - // TODO: avoid use of SocketAsyncEventArgs on newer platforms _sendArgs.SetBuffer(_dataToSend); if (!_connection.Socket.SendAsync(_sendArgs)) _self.Tell(SocketSent.Instance); From f63896ff757b8b8c636202bbc021090659a54d7a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Apr 2024 20:08:44 -0500 Subject: [PATCH 16/16] delete unused code --- src/core/Akka.Streams/Dsl/Framing.cs | 46 ---------------------------- 1 file changed, 46 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index f5fd4cb677b..29aaf5210ed 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -395,52 +395,6 @@ private void DoParse() if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) CompleteStage(); } - - - // var possibleMatchPosition = _buffer.IndexOf(_firstSeparatorByte, from: _nextPossibleMatch); - // - // if (possibleMatchPosition > _stage._maximumLineBytes) - // { - // FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - // } - // else if (possibleMatchPosition == -1) - // { - // if (_buffer.Count > _stage._maximumLineBytes) - // FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - // else - // { - // // No matching character, we need to accumulate more bytes into the buffer - // _nextPossibleMatch = _buffer.Count; - // TryPull(); - // } - // } - // else if (possibleMatchPosition + _stage._separatorBytes.Count > _buffer.Count) - // { - // // We have found a possible match (we found the first character of the terminator - // // sequence) but we don't have yet enough bytes. We remember the position to - // // retry from next time. - // _nextPossibleMatch = possibleMatchPosition; - // TryPull(); - // } - // else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) - // { - // // Found a match - // var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); - // _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); - // _nextPossibleMatch = 0; - // Push(_stage.Outlet, parsedFrame); - // - // if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) - // CompleteStage(); - // } - // else - // { - // // possibleMatchPos was not actually a match - // _nextPossibleMatch++; - // continue; - // } - // - // break; } }