diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 08d6294b218..3c3365b92b3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3653,14 +3653,17 @@ namespace Akka.IO BigEndian = 0, LittleEndian = 1, } - [System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : System.Collections.Generic.IEnumerable, System.Collections.IEnumerable, System.IEquatable + [System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : System.IEquatable { public int Count { get; } public static Akka.IO.ByteString Empty { get; } + [System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")] public bool IsCompact { get; } public bool IsEmpty { get; } public byte this[int index] { get; } + public System.ReadOnlyMemory Memory { get; } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public Akka.IO.ByteString Compact() { } public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { } public static Akka.IO.ByteString CopyFrom(byte[] array) { } @@ -3672,9 +3675,13 @@ namespace Akka.IO public static Akka.IO.ByteString CopyFrom(System.Span span, int offset, int count) { } public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable> buffers) { } public int CopyTo(byte[] buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer, int index, int count) { } public override bool Equals(object obj) { } public bool Equals(Akka.IO.ByteString other) { } @@ -3684,7 +3691,6 @@ namespace Akka.IO public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable> buffers) { } public static Akka.IO.ByteString FromString(string str) { } public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { } - public System.Collections.Generic.IEnumerator GetEnumerator() { } public override int GetHashCode() { } public bool HasSubstring(Akka.IO.ByteString other, int index) { } public int IndexOf(byte b) { } @@ -3694,12 +3700,16 @@ namespace Akka.IO public byte[] ToArray() { } public override string ToString() { } public string ToString(System.Text.Encoding encoding) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public void WriteTo(System.IO.Stream stream) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { } public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static Akka.IO.ByteString op_Explicit(byte[] bytes) { } public static byte[] op_Explicit(Akka.IO.ByteString byteString) { } + public static Akka.IO.ByteString op_Explicit(System.Memory memory) { } + public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory memory) { } public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index f50634d8b95..9431d9a9175 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3643,14 +3643,17 @@ namespace Akka.IO BigEndian = 0, LittleEndian = 1, } - [System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : System.Collections.Generic.IEnumerable, System.Collections.IEnumerable, System.IEquatable + [System.Diagnostics.DebuggerDisplayAttribute("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : System.IEquatable { public int Count { get; } public static Akka.IO.ByteString Empty { get; } + [System.ObsoleteAttribute("This property will be removed in future versions of Akka.NET.")] public bool IsCompact { get; } public bool IsEmpty { get; } public byte this[int index] { get; } + public System.ReadOnlyMemory Memory { get; } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public Akka.IO.ByteString Compact() { } public Akka.IO.ByteString Concat(Akka.IO.ByteString other) { } public static Akka.IO.ByteString CopyFrom(byte[] array) { } @@ -3662,9 +3665,13 @@ namespace Akka.IO public static Akka.IO.ByteString CopyFrom(System.Span span, int offset, int count) { } public static Akka.IO.ByteString CopyFrom(System.Collections.Generic.IEnumerable> buffers) { } public int CopyTo(byte[] buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Memory buffer, int index, int count) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref System.Span buffer, int index, int count) { } public override bool Equals(object obj) { } public bool Equals(Akka.IO.ByteString other) { } @@ -3674,7 +3681,6 @@ namespace Akka.IO public static Akka.IO.ByteString FromBytes(System.Collections.Generic.IEnumerable> buffers) { } public static Akka.IO.ByteString FromString(string str) { } public static Akka.IO.ByteString FromString(string str, System.Text.Encoding encoding) { } - public System.Collections.Generic.IEnumerator GetEnumerator() { } public override int GetHashCode() { } public bool HasSubstring(Akka.IO.ByteString other, int index) { } public int IndexOf(byte b) { } @@ -3684,12 +3690,16 @@ namespace Akka.IO public byte[] ToArray() { } public override string ToString() { } public string ToString(System.Text.Encoding encoding) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public void WriteTo(System.IO.Stream stream) { } + [System.ObsoleteAttribute("This method will be removed in future versions of Akka.NET.")] public System.Threading.Tasks.Task WriteToAsync(System.IO.Stream stream) { } public static Akka.IO.ByteString +(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static bool ==(Akka.IO.ByteString x, Akka.IO.ByteString y) { } public static Akka.IO.ByteString op_Explicit(byte[] bytes) { } public static byte[] op_Explicit(Akka.IO.ByteString byteString) { } + public static Akka.IO.ByteString op_Explicit(System.Memory memory) { } + public static Akka.IO.ByteString op_Explicit(System.ReadOnlyMemory memory) { } public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index e593ac3e18a..1addce95512 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -424,16 +424,18 @@ public async Task GroupBy_must_work_under_fuzzing_stress_test() await this.AssertAllStagesStoppedAsync(async () => { var publisherProbe = this.CreateManualPublisherProbe(); - var subscriber = this.CreateManualSubscriberProbe>(); + var subscriber = this.CreateManualSubscriberProbe(); var firstGroup = (Source, NotUsed>)Source.FromPublisher(publisherProbe) .GroupBy(256, element => element[0]) + .Select(b => b.ToArray()) // have to convert to an array .Select(b => b.Reverse()) .MergeSubstreams(); - var secondGroup = (Source, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First()) + var secondGroup = (Source)firstGroup.GroupBy(256, bytes => bytes.First()) .Select(b => b.Reverse()) + .Select(b => ByteString.FromBytes(b.ToArray())) .MergeSubstreams(); - var publisher = secondGroup.RunWith(Sink.AsPublisher>(false), Materializer); + var publisher = secondGroup.RunWith(Sink.AsPublisher(false), Materializer); publisher.Subscribe(subscriber); var upstreamSubscription = await publisherProbe.ExpectSubscriptionAsync(); diff --git a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs index 0162713889e..65b10923dab 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs @@ -82,8 +82,8 @@ private void Rechunk() var nextChunkSize = _buffer.IsEmpty ? 0 : ThreadLocalRandom.Current.Next(0, _buffer.Count + 1); - var newChunk = _buffer.Slice(0, nextChunkSize).Compact(); - _buffer = _buffer.Slice(nextChunkSize).Compact(); + var newChunk = _buffer.Slice(0, nextChunkSize); + _buffer = _buffer.Slice(nextChunkSize); Push(_stage.Outlet, newChunk); @@ -97,7 +97,6 @@ private void Rechunk() public Rechunker() : base("Rechunker") { - } protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); @@ -107,14 +106,15 @@ private Flow Rechunk => Flow.Create().Via(new Rechunker()).Named("rechunker"); private static readonly List DelimiterBytes = - new List {"\n", "\r\n", "FOO"}.Select(ByteString.FromString).ToList(); + new List { "\n", "\r\n", "FOO" }.Select(ByteString.FromString).ToList(); private static readonly List BaseTestSequences = new List { "", "foo", "hello world" }.Select(ByteString.FromString).ToList(); - private static Flow SimpleLines(string delimiter, int maximumBytes, bool allowTruncation = true) + private static Flow SimpleLines(string delimiter, int maximumBytes, + bool allowTruncation = true) { - return Framing.Delimiter(ByteString.FromString(delimiter), maximumBytes, allowTruncation) + return Framing.Delimiter(ByteString.FromString(delimiter), maximumBytes, allowTruncation) .Select(x => x.ToString(Encoding.UTF8)).Named("LineFraming"); } @@ -125,24 +125,24 @@ private static IEnumerable CompleteTestSequence(ByteString delimiter yield return delimiter.Slice(0, i) + sequence; } - [Fact] - public void Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences() + public static readonly TheoryData DelimiterBytesData = new() { - for (var i = 1; i <= 100; i++) - { - foreach (var delimiter in DelimiterBytes) - { - var testSequence = CompleteTestSequence(delimiter).ToList(); - var task = Source.From(testSequence) - .Select(x => x + delimiter) - .Via(Rechunk) - .Via(Framing.Delimiter(delimiter, 256)) - .RunWith(Sink.Seq(), Materializer); + ByteString.FromString("\n"), ByteString.FromString("\r\n"), ByteString.FromString("FOO"), ByteString.FromString("FOUR") + }; - task.Wait(TimeSpan.FromDays(3)).Should().BeTrue(); - task.Result.Should().BeEquivalentTo(testSequence); - } - } + [Theory] + [MemberData(nameof(DelimiterBytesData))] + public async Task Delimiter_bytes_based_framing_must_work_with_various_delimiters_and_test_sequences( + ByteString delimiter) + { + var testSequence = CompleteTestSequence(delimiter).ToList(); + var task = await Source.From(testSequence) + .Select(x => x + delimiter) + .Via(Rechunk) + .Via(Framing.Delimiter(delimiter, 256)) + .RunWith(Sink.Seq(), Materializer); + + task.Should().BeEquivalentTo(testSequence); } [Fact] @@ -154,7 +154,7 @@ public void Delimiter_bytes_based_framing_must_respect_maximum_line_settings() .RunWith(Sink.Seq(), Materializer); task1.Wait(TimeSpan.FromDays(3)).Should().BeTrue(); - task1.Result.Should().BeEquivalentTo(new[] {"a", "b", "c", "d"}); + task1.Result.Should().BeEquivalentTo(new[] { "a", "b", "c", "d" }); var task2 = Source.Single(ByteString.FromString("ab\n")) @@ -174,11 +174,12 @@ public void Delimiter_bytes_based_framing_must_respect_maximum_line_settings() [Fact] public void Delimiter_bytes_based_framing_must_work_with_empty_streams() { - var task = Source.Empty().Via(SimpleLines("\n", 256)).RunAggregate(new List(), (list, s) => - { - list.Add(s); - return list; - }, Materializer); + var task = Source.Empty().Via(SimpleLines("\n", 256)).RunAggregate(new List(), + (list, s) => + { + list.Add(s); + return list; + }, Materializer); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); task.Result.Should().BeEmpty(); } @@ -213,16 +214,12 @@ private static string RandomString(int length) const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; var random = new Random(); return new string(Enumerable.Repeat(chars, length) - .Select(s => s[random.Next(s.Length)]).ToArray()); + .Select(s => s[random.Next(s.Length)]).ToArray()); } private static readonly ByteString ReferenceChunk = ByteString.FromString(RandomString(0x100001)); - private static readonly List ByteOrders = new() - { - ByteOrder.BigEndian, - ByteOrder.LittleEndian - }; + private static readonly List ByteOrders = new() { ByteOrder.BigEndian, ByteOrder.LittleEndian }; private static readonly List FrameLengths = new() { @@ -241,18 +238,31 @@ private static string RandomString(int length) 0x10001 }; - private static readonly List FieldLengths = new() {1, 2, 3, 4}; + private static readonly List FieldLengths = new() { 1, 2, 3, 4 }; - private static readonly List FieldOffsets = new() {0, 1, 2, 3, 15, 16, 31, 32, 44, 107}; + private static readonly List FieldOffsets = new() + { + 0, + 1, + 2, + 3, + 15, + 16, + 31, + 32, + 44, + 107 + }; private static ByteString Encode(ByteString payload, int fieldOffset, int fieldLength, ByteOrder byteOrder) => - EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(new byte[fieldOffset]), ByteString.Empty); + EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(new byte[fieldOffset]), + ByteString.Empty); private static ByteString EncodeComplexFrame( - ByteString payload, - int fieldLength, - ByteOrder byteOrder, - ByteString offset, + ByteString payload, + int fieldLength, + ByteOrder byteOrder, + ByteString offset, ByteString tail) { var h = ByteString.FromBytes(new byte[4].PutInt(payload.Count, order: byteOrder)); @@ -285,15 +295,17 @@ public void Length_field_based_framing_must_work_with_various_byte_orders_frame_ } } - Parallel.ForEach(GetFutureResults(), async futureResult => - { + Parallel.ForEach(GetFutureResults(), async futureResult => + { var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult; - result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); + result.ShouldBeSame(encodedFrames, + $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); }); } [Fact] - public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize() + public void + Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize() { IEnumerable, List, (ByteOrder, int, int))>> GetFutureResults() { @@ -308,6 +320,7 @@ int ComputeFrameSize(IReadOnlyList offset, int length) } var random = new Random(); + byte[] Offset() { var arr = new byte[fieldOffset]; @@ -320,7 +333,8 @@ byte[] Offset() var payload = ReferenceChunk.Slice(0, length); var offsetBytes = Offset(); var tailBytes = offsetBytes.Length > 0 ? new byte[offsetBytes[0]] : Array.Empty(); - return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), ByteString.FromBytes(tailBytes)); + return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), + ByteString.FromBytes(tailBytes)); }).ToList(); yield return Source.From(encodedFrames) @@ -332,10 +346,11 @@ byte[] Offset() } } - Parallel.ForEach(GetFutureResults(), async futureResult => - { + Parallel.ForEach(GetFutureResults(), async futureResult => + { var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult; - result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); + result.ShouldBeSame(encodedFrames, + $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}"); }); } @@ -387,18 +402,19 @@ public void Length_field_based_framing_must_report_truncated_frames() { foreach (var frameLength in FrameLengths.Where(f => f < 1 << (fieldLength * 8) && f != 0)) { - var fullFrame = Encode(ReferenceChunk.Slice(0, frameLength), fieldOffset, fieldLength, byteOrder); + var fullFrame = Encode(ReferenceChunk.Slice(0, frameLength), fieldOffset, fieldLength, + byteOrder); var partialFrame = fullFrame.Slice(0, fullFrame.Count - 1); // dropRight equivalent Action action = () => { - Source.From(new[] {fullFrame, partialFrame}) - .Via(Rechunk) - .Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder)) - .Grouped(10000) - .RunWith(Sink.First>(), Materializer) - .Wait(TimeSpan.FromSeconds(5)) - .ShouldBeTrue("Stream should complete withing 5 seconds"); + Source.From(new[] { fullFrame, partialFrame }) + .Via(Rechunk) + .Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder)) + .Grouped(10000) + .RunWith(Sink.First>(), Materializer) + .Wait(TimeSpan.FromSeconds(5)) + .ShouldBeTrue("Stream should complete withing 5 seconds"); }; action.Should().Throw(); } @@ -416,8 +432,9 @@ public void Length_field_based_framing_must_support_simple_framing_adapter() .Atop(Framing.SimpleFramingProtocol(1024).Reversed()) .Join(Flow.Create()); // Loopback - var random= new Random(); - var testMessages = Enumerable.Range(1, 100).Select(_ => ReferenceChunk.Slice(0, random.Next(1024))).ToList(); + var random = new Random(); + var testMessages = Enumerable.Range(1, 100).Select(_ => ReferenceChunk.Slice(0, random.Next(1024))) + .ToList(); var task = Source.From(testMessages) .Via(codecFlow) @@ -445,7 +462,7 @@ await Awaiting(async () => await result) .WithMessage("Decoded frame header reported negative size -4") .ShouldCompleteWithin(3.Seconds()); } - + [Fact] public async Task Length_field_based_framing_must_ignore_length_field_value_when_provided_computeFrameSize() { @@ -456,23 +473,26 @@ public async Task Length_field_based_framing_must_ignore_length_field_value_when var bs = ByteString.FromBytes(tempArray.PutInt(checked(0x04050607), order: ByteOrder.LittleEndian)); var result = Source.Single(bs) - .Via(Flow.Create().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) + .Via(Flow.Create() + .Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) .RunWith(Sink.Seq(), Materializer); var complete = await result.ShouldCompleteWithin(3.Seconds()); complete.Should().BeEquivalentTo(ImmutableArray.Create(bs)); } - + [Fact] - public async Task Length_field_based_framing_must_fail_the_stage_on_computeFrameSize_values_less_than_minimum_chunk_size() + public async Task + Length_field_based_framing_must_fail_the_stage_on_computeFrameSize_values_less_than_minimum_chunk_size() { int ComputeFrameSize(IReadOnlyList offset, int length) => 3; // A 4-byte message containing only an Int specifying the length of the payload var bytes = ByteString.FromBytes(BitConverter.GetBytes(4)); - + var result = Source.Single(bytes) - .Via(Flow.Create().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) + .Via(Flow.Create() + .Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize))) .RunWith(Sink.Seq(), Materializer); await Awaiting(async () => await result) @@ -498,4 +518,4 @@ public async Task Length_field_based_framing_must_let_zero_length_field_values_p complete.Should().BeEquivalentTo(bytes.ToImmutableList()); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 4bab68295b5..1ca842618c5 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -338,7 +338,7 @@ await TargetFileAsync(async f => var completion = Source.From(_testByteStrings) .Select(bytes => { - if (bytes.Contains(Convert.ToByte('b'))) throw new TestException("bees!"); + if (bytes.ToArray().Contains(Convert.ToByte('b'))) throw new TestException("bees!"); return bytes; }) .RunWith(FileIO.ToFile(f), _materializer); diff --git a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs index ace20df0823..23958a34d7c 100644 --- a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs @@ -66,7 +66,7 @@ await this.AssertAllStagesStoppedAsync(() => { result = ReadN(inputStream, 2); result.Item1.Should().Be(2); - result.Item2.Should().BeEquivalentTo(Enumerable.Concat(_byteString.Slice(2), byteString2.Slice(0, 1))); + result.Item2.Should().BeEquivalentTo((_byteString.Slice(2).Concat(byteString2.Slice(0, 1)))); result = ReadN(inputStream, 2); result.Item1.Should().Be(2); @@ -86,7 +86,7 @@ await this.AssertAllStagesStoppedAsync(() => { var arr = new byte[_byteString.Count + 1]; inputStream.Read(arr, 0, arr.Length).Should().Be(arr.Length - 1); inputStream.Dispose(); - ByteString.FromBytes(arr).Should().BeEquivalentTo(Enumerable.Concat(_byteString, ByteString.FromBytes(new byte[] { 0 }))); + ByteString.FromBytes(arr).Should().BeEquivalentTo((_byteString.Concat(ByteString.FromBytes(new byte[] { 0 })))); return Task.CompletedTask; }, _materializer); } @@ -214,7 +214,7 @@ await this.AssertAllStagesStoppedAsync(() => { { var r = ReadN(inputStream, 8); r.Item1.Should().Be(8); - r.Item2.Should().BeEquivalentTo(Enumerable.Concat(bytes[i * 2], bytes[i * 2 + 1])); + r.Item2.Should().BeEquivalentTo(bytes[i * 2].Concat(bytes[i * 2 + 1])); } inputStream.Dispose(); @@ -236,7 +236,7 @@ await this.AssertAllStagesStoppedAsync(() => { var r1 = ReadN(inputStream, 15); r1.Item1.Should().Be(15); - r1.Item2.Should().BeEquivalentTo(Enumerable.Concat(bytes1, bytes2.Slice(0, 5))); + r1.Item2.Should().BeEquivalentTo(bytes1.Concat(bytes2.Slice(0, 5))); var r2 = ReadN(inputStream, 15); r2.Item1.Should().Be(5); diff --git a/src/core/Akka.Streams/Dsl/Framing.cs b/src/core/Akka.Streams/Dsl/Framing.cs index 465b7ef14ac..29aaf5210ed 100644 --- a/src/core/Akka.Streams/Dsl/Framing.cs +++ b/src/core/Akka.Streams/Dsl/Framing.cs @@ -92,9 +92,9 @@ public static Flow LengthField(int fieldLength, /// /// TBD public static Flow LengthField( - int fieldLength, + int fieldLength, int fieldOffset, - int maximumFrameLength, + int maximumFrameLength, ByteOrder byteOrder, Func, int, int> computeFrameSize) { @@ -102,7 +102,8 @@ public static Flow LengthField( throw new ArgumentException("Length field length must be 1,2,3 or 4", nameof(fieldLength)); return Flow.Create() - .Via(new LengthFieldFramingStage(fieldLength, maximumFrameLength, fieldOffset, byteOrder, computeFrameSize)) + .Via(new LengthFieldFramingStage(fieldLength, maximumFrameLength, fieldOffset, byteOrder, + computeFrameSize)) .Named("LengthFieldFraming"); } @@ -122,7 +123,8 @@ public static Flow LengthField( /// /// Maximum length of allowed messages. If sent or received messages exceed the configured limit this BidiFlow will fail the stream. The header attached by this BidiFlow are not included in this limit. /// TBD - public static BidiFlow SimpleFramingProtocol(int maximumMessageLength) + public static BidiFlow SimpleFramingProtocol( + int maximumMessageLength) { return BidiFlow.FromFlowsMat(SimpleFramingProtocolEncoder(maximumMessageLength), SimpleFramingProtocolDecoder(maximumMessageLength), Keep.Left); @@ -166,43 +168,66 @@ public FramingException(string message) : base(message) /// /// The that holds the serialized object data about the exception being thrown. /// The that contains contextual information about the source or destination. - protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { } + protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } } - private static readonly Func, int, int> BigEndianDecoder = (enumerator, length) => + private static readonly Func, int, int> BigEndianDecoder = (byteData, length) => { - var count = length; + if (length < 1 || length > 4) + { + throw new ArgumentOutOfRangeException(nameof(length), "Length must be between 1 and 4."); + } + + if (byteData.Length < length) + { + throw new IndexOutOfRangeException("BigEndianDecoder does not have enough bytes to decode."); + } + var decoded = 0; - while (count > 0) + for (int i = 0; i < length; i++) { + // Shift the previously accumulated value to the left, making space for the next byte decoded <<= 8; - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded |= enumerator.Current & 0xFF; - count--; + + // Read the next byte and add it to the result + decoded |= byteData.Span[i] & 0xFF; } return decoded; }; - private static readonly Func, int, int> LittleEndianDecoder = (enumerator, length) => + + private static readonly Func, int, int> LittleEndianDecoder = (byteData, length) => { - var highestOcted = (length - 1) << 3; - var mask = (int) (1L << (length << 3)) - 1; - var count = length; + if (length < 1 || length > 4) + { + throw new ArgumentOutOfRangeException(nameof(length), "Length must be between 1 and 4."); + } + + var highestOctet = (length - 1) << 3; + var mask = (int)(1L << (length << 3)) - 1; var decoded = 0; - while (count > 0) + for (int i = 0; i < length; i++) { - // decoded >>>= 8 on the jvm - decoded = (int) ((uint) decoded >> 8); - if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string"); - decoded += (enumerator.Current & 0xFF) << highestOcted; - count--; + if (i >= byteData.Length) + { + throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte array."); + } + + // Shift the previously processed bytes to the right, making space for the next byte + decoded = (int)((uint)decoded >> 8); + + // Read the next byte and insert it into the correct position + decoded += (byteData.Span[i] & 0xFF) << highestOctet; } return decoded & mask; }; + private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage { #region Logic @@ -230,14 +255,11 @@ public override void OnPush() { var header = ByteString.CopyFrom(new[] { - Convert.ToByte((messageSize >> 24) & 0xFF), - Convert.ToByte((messageSize >> 16) & 0xFF), - Convert.ToByte((messageSize >> 8) & 0xFF), - Convert.ToByte(messageSize & 0xFF) + Convert.ToByte((messageSize >> 24) & 0xFF), Convert.ToByte((messageSize >> 16) & 0xFF), + Convert.ToByte((messageSize >> 8) & 0xFF), Convert.ToByte(messageSize & 0xFF) }); Push(_stage.Outlet, header + message); } - } public override void OnPull() => Pull(_stage.Inlet); @@ -266,7 +288,7 @@ private sealed class Logic : InAndOutGraphStageLogic private ByteString _buffer = ByteString.Empty; private int _nextPossibleMatch; - public Logic(DelimiterFramingStage stage) : base (stage.Shape) + public Logic(DelimiterFramingStage stage) : base(stage.Shape) { _stage = stage; _firstSeparatorByte = stage._separatorBytes[0]; @@ -304,60 +326,74 @@ private void TryPull() else FailStage( new FramingException( - "Stream finished but there was a truncated final frame in the buffer")); + $"Stream finished but there was a truncated final frame in the buffer + [{_buffer.ToString()}]")); } else - Pull(_stage.Inlet); + { + // check to see if stage is already pulled - if not, Pull it + if (!HasBeenPulled(_stage.Inlet)) + { + // if we haven't already pulled, then pull + Pull(_stage.Inlet); + } + } } private void DoParse() { - while (true) + // check to see if the buffer is big enough to contain the separator + if (_buffer.Count < _stage._separatorBytes.Count) { - var possibleMatchPosition = _buffer.IndexOf(_firstSeparatorByte, from: _nextPossibleMatch); + TryPull(); - if (possibleMatchPosition > _stage._maximumLineBytes) - { - FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - } - else if (possibleMatchPosition == -1) - { - if (_buffer.Count > _stage._maximumLineBytes) - FailStage(new FramingException($"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); - else - { - // No matching character, we need to accumulate more bytes into the buffer - _nextPossibleMatch = _buffer.Count; - TryPull(); - } - } - else if (possibleMatchPosition + _stage._separatorBytes.Count > _buffer.Count) - { - // We have found a possible match (we found the first character of the terminator - // sequence) but we don't have yet enough bytes. We remember the position to - // retry from next time. - _nextPossibleMatch = possibleMatchPosition; - TryPull(); - } - else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition)) + return; + } + + // search for the separator within the buffer + var definiteMatch = _buffer.IndexOf(_stage._separatorBytes, _nextPossibleMatch); + if (definiteMatch > _stage._maximumLineBytes) + { + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); + return; + } + + if (definiteMatch == -1) + { + if (_buffer.Count > _stage._maximumLineBytes) { - // Found a match - var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact(); - _buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact(); - _nextPossibleMatch = 0; - Push(_stage.Outlet, parsedFrame); - - if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) - CompleteStage(); + FailStage( + new FramingException( + $"Read {_buffer.Count} bytes which is more than {_stage._maximumLineBytes} without seeing a line terminator")); } else { - // possibleMatchPos was not actually a match - _nextPossibleMatch++; - continue; + // No matching character, we need to accumulate more bytes into the buffer + /* + * NOTE: so this was a tad tricky to catch in the original code - this is a performance + * optimization designed to avoid re-searching through a buffer that has already been + * searched. + * + * However, if we don't find a match we need to remember the position we started searching + * MINUS the length of the delimiter - just in case we only received a _partial_ delimiter + * earlier. If we use just the _buffer.Count, we will end up missing and losing messages + * eventually. + */ + _nextPossibleMatch = _buffer.Count - _stage._separatorBytes.Count; + TryPull(); } + } + else + { + // Found a match + var parsedFrame = _buffer.Slice(0, definiteMatch); + _buffer = _buffer.Slice(definiteMatch + _stage._separatorBytes.Count); + _nextPossibleMatch = 0; + Push(_stage.Outlet, parsedFrame); - break; + if (IsClosed(_stage.Inlet) && _buffer.IsEmpty) + CompleteStage(); } } } @@ -368,7 +404,8 @@ private void DoParse() private readonly int _maximumLineBytes; private readonly bool _allowTruncation; - public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) : base("DelimiterFraming") + public DelimiterFramingStage(ByteString separatorBytes, int maximumLineBytes, bool allowTruncation) : base( + "DelimiterFraming") { _separatorBytes = separatorBytes; _maximumLineBytes = maximumLineBytes; @@ -440,11 +477,12 @@ private void TryPushFrame() PushFrame(); else if (bufferSize >= _stage._minimumChunkSize) { - var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator(); + var iterator = _buffer.Memory.Slice(_stage._lengthFieldOffset); var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength); _frameSize = _stage._computeFrameSize.HasValue - ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength) + ? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), + parsedLength) : parsedLength + _stage._minimumChunkSize; if (_frameSize > _stage._maximumFramelength) @@ -468,7 +506,8 @@ private void TryPushFrame() private void TryPull() { if (IsClosed(_stage.Inlet)) - FailStage(new FramingException("Stream finished but there was a truncated final frame in the buffer")); + FailStage(new FramingException( + "Stream finished but there was a truncated final frame in the buffer")); else Pull(_stage.Inlet); } @@ -480,19 +519,22 @@ private void TryPull() private readonly int _maximumFramelength; private readonly int _lengthFieldOffset; private readonly int _minimumChunkSize; - private readonly Func, int, int> _intDecoder; + private readonly Func, int, int> _intDecoder; private readonly Option, int, int>> _computeFrameSize; // For the sake of binary compatibility - public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder) - : this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, Option, int, int>>.None) - { } + public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, + ByteOrder byteOrder) + : this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, + Option, int, int>>.None) + { + } public LengthFieldFramingStage( int lengthFieldLength, int maximumFrameLength, int lengthFieldOffset, - ByteOrder byteOrder, + ByteOrder byteOrder, Option, int, int>> computeFrameSize) : base("LengthFieldFramingStage") { _lengthFieldLength = lengthFieldLength; @@ -506,4 +548,4 @@ public LengthFieldFramingStage( protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Util/ByteStringSpec.cs b/src/core/Akka.Tests/Util/ByteStringSpec.cs index 9f4e85623a3..fbfda0328d4 100644 --- a/src/core/Akka.Tests/Util/ByteStringSpec.cs +++ b/src/core/Akka.Tests/Util/ByteStringSpec.cs @@ -5,8 +5,10 @@ // //----------------------------------------------------------------------- +using System; using System.Linq; using System.Text; +using Akka.Actor; using Akka.IO; using FluentAssertions; using FsCheck; @@ -14,15 +16,10 @@ namespace Akka.Tests.Util { - - /// - /// TODO: Should we use the FsCheck.XUnit integration when they upgrade to xUnit 2 - /// public class ByteStringSpec { class Generators { - // TODO: Align with JVM Akka Generator public static Arbitrary ByteStrings() { @@ -45,7 +42,7 @@ public void A_ByteString_must_have_correct_size_when_concatenating() [Fact] public void A_ByteString_must_have_correct_size_when_slicing_from_index() { - var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 }); (a + b).Slice(b.Count).Count.Should().Be(a.Count); @@ -54,32 +51,24 @@ public void A_ByteString_must_have_correct_size_when_slicing_from_index() [Fact] public void A_ByteString_must_be_sequential_when_slicing_from_start() { - Prop.ForAll((ByteString a, ByteString b) => (a + b).Slice(0, a.Count).SequenceEqual(a)) + Prop.ForAll((ByteString a, ByteString b) => + (a + b).Slice(0, a.Count).Memory.Span.SequenceEqual(a.Memory.Span)) .QuickCheckThrowOnFailure(); } + [Fact] public void A_ByteString_must_be_sequential_when_slicing_from_index() { - var a = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var b = ByteString.FromBytes(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18 }); (a + b).Slice(a.Count).Should().BeEquivalentTo(b); } - [Fact] - public void A_ByteString_must_be_equal_to_the_original_when_compacting() - { - Prop.ForAll((ByteString xs) => - { - var ys = xs.Compact(); - return xs.SequenceEqual(ys) && ys.IsCompact; - }).QuickCheckThrowOnFailure(); - } - [Fact] public void A_ByteString_must_be_equal_to_the_original_when_recombining() { - var xs = ByteString.FromBytes(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9} ); + var xs = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); var tmp1 = xs.Slice(0, xs.Count / 2); var tmp2 = xs.Slice(xs.Count / 2); var tmp11 = tmp1.Slice(0, tmp1.Count / 2); @@ -90,32 +79,24 @@ public void A_ByteString_must_be_equal_to_the_original_when_recombining() [Fact] public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_String() { - Prop.ForAll((string s) => ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) == (s ?? "")) // TODO: What should we do with null string? + Prop.ForAll((string s) => + ByteString.FromString(s, Encoding.UTF8).ToString(Encoding.UTF8) == + (s ?? "")) // TODO: What should we do with null string? .QuickCheckThrowOnFailure(); } [Fact] public void A_ByteString_must_behave_as_expected_when_created_from_and_decoding_to_unicode_String() { - Prop.ForAll((string s) => ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) == (s ?? "")) // TODO: What should we do with null string? + Prop.ForAll( + (string s) => + ByteString.FromString(s, Encoding.Unicode).ToString(Encoding.Unicode) == + (s ?? "")) // TODO: What should we do with null string? .QuickCheckThrowOnFailure(); } - [Fact] - public void A_ByteString_must_behave_as_expected_when_compacting() - { - Prop.ForAll((ByteString a) => - { - var wasCompact = a.IsCompact; - var b = a.Compact(); - return ((!wasCompact) || (b == a)) && - b.SequenceEqual(a) && - b.IsCompact && - b.Compact() == b; - }).QuickCheckThrowOnFailure(); - } - - [Fact(DisplayName = @"A concatenated byte string should return the index of a byte in one the two byte strings.")] + [Fact(DisplayName = + @"A concatenated byte string should return the index of a byte in one the two byte strings.")] public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_string() { var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 }); @@ -124,7 +105,8 @@ public void A_concatenated_bytestring_must_return_correct_index_of_elements_in_s Assert.Equal(1, offset); } - [Fact(DisplayName = @"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")] + [Fact(DisplayName = + @"A concatenated byte string should return -1 when it was not found in the concatenated byte strings")] public void A_concatenated_bytestring_must_return_negative_one_when_an_element_was_not_found() { var b = ByteString.FromBytes(new byte[] { 1 }) + ByteString.FromBytes(new byte[] { 2 }); @@ -133,7 +115,8 @@ public void A_concatenated_bytestring_must_return_negative_one_when_an_element_w Assert.Equal(-1, offset); } - [Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")] + [Fact(DisplayName = + "A concatenated byte string composed of partial characters must return the correct string for ToString(Unicode)")] public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_Unicode() { // In Unicode encoding, characters present in the ASCII character set are 2 bytes long. @@ -152,7 +135,8 @@ public void A_concatenated_ByteString_with_partial_characters_must_return_correc Assert.Equal(expected, actual); } - [Fact(DisplayName = "A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")] + [Fact(DisplayName = + "A concatenated byte string composed of partial characters must return the correct string for ToString(UTF8)")] public void A_concatenated_ByteString_with_partial_characters_must_return_correct_string_for_ToString_UTF8() { // In UTF-8 encoding, characters present in the ASCII character set are only 1 byte long. @@ -167,7 +151,7 @@ public void A_concatenated_ByteString_with_partial_characters_must_return_correc data += ByteString.CopyFrom(rawData, 1, 3); // One and a half characters Assert.Equal(rawData.Length, data.Count); - string actual = data.ToString(encoding); + var actual = data.ToString(encoding); Assert.Equal(expected, actual); } @@ -191,6 +175,103 @@ public void A_sliced_ByteString_must_return_correct_string_for_ToString() Assert.Equal(expectedRight, actualRight); } + // generate a test case for the ByteString.HasSubstring method, when one big ByteString contains another ByteString + [Fact] + public void A_ByteString_must_return_true_when_containing_another_ByteString() + { + Prop.ForAll((ByteString a, ByteString b) => + { + var big = a + b + a; + return ByteStringHasSubstringZeroIndex().Label($"big: {big}, b: {b}") + .And(ByteStringHasSubstringNonZeroIndex().Label($"big: {big}, b: {b}")); + + bool ByteStringHasSubstringZeroIndex() + { + return big.HasSubstring(b, 0); + } + + bool ByteStringHasSubstringNonZeroIndex() + { + return big.HasSubstring(b, a.Count); + } + }) + .QuickCheckThrowOnFailure(); + } + + // generate a test case for ByteString.IndexOf of another ByteString and ensure that the index is correct (should match the beginning of the other ByteString) + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_another_ByteString() + { + Prop.ForAll((ByteString a, ByteString b) => + { + return Prop.When(b.Count > 0, () => + { + var big = a + b + a; + var i = ByteStringIndexOfZeroIndex(); + var g = ByteStringIndexOfNonZeroIndex(); + + return (i == a.Count).Label($"big: {big}, b: {b}, expected start: {a.Count}, actual start: {i}") + .And((g == a.Count).Label($"big: {big}, b: {b}, expected start: {a.Count}, actual start: {g}")); + + int ByteStringIndexOfZeroIndex() + { + return big.IndexOf(b, 0); + } + + int ByteStringIndexOfNonZeroIndex() + { + return big.IndexOf(b, a.Count); + } + }); + }) + .QuickCheckThrowOnFailure(); + } + + // write a unit test for the IndexOf method when we're looking for a single byte - do it without using FsCheck + [Theory] + [InlineData(0)] + [InlineData(1)] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte(int startingIndex) + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 5 }); + + var i = a.IndexOf(b, startingIndex); + Assert.Equal(4, i); + + // also do the comparison with the byte value + var j = a.IndexOf(5, startingIndex); + Assert.Equal(4, j); + } + + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte_front() + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 1 }); + + var i = a.IndexOf(b); + Assert.Equal(0, i); + + // also do the comparison with the byte value + var j = a.IndexOf(1); + Assert.Equal(0, j); + } + + [Fact] + public void A_ByteString_must_return_correct_index_when_containing_a_single_byte_back() + { + var a = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); + var b = ByteString.FromBytes(new byte[] { 9 }); + + var i = a.IndexOf(b); + Assert.Equal(8, i); + + // also do the comparison with the byte value + var j = a.IndexOf(9); + } + + #if !NETFRAMEWORK [Fact(DisplayName = "A sliced byte string using Range must return the correct string for ToString")] public void A_sliced_ByteString_using_Range_must_return_correct_string_for_ToString() @@ -213,4 +294,4 @@ public void A_sliced_ByteString_using_Range_must_return_correct_string_for_ToStr } #endif } -} +} \ No newline at end of file diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index bd8007f0875..bb75fb9024b 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -82,44 +82,13 @@ public void Release(SocketAsyncEventArgs e) internal static class SocketAsyncEventArgsExtensions { - public static void SetBuffer(this SocketAsyncEventArgs args, ByteString data) - { - if (data.IsCompact) - { - var buffer = data.Buffers[0]; - if (args.BufferList != null) - { - // BufferList property setter is not simple member association operation, - // but the getter is. Therefore we first check if we need to clear buffer list - // and only do so if necessary. - args.BufferList = null; - } - args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); - } - else - { - if (RuntimeDetector.IsMono) - { - // Mono doesn't support BufferList - falback to compacting ByteString - var compacted = data.Compact(); - var buffer = compacted.Buffers[0]; - args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); - } - else - { - args.SetBuffer(null, 0, 0); - args.BufferList = data.Buffers; - } - } - } - public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable dataCollection) { if (RuntimeDetector.IsMono) { // Mono doesn't support BufferList - falback to compacting ByteString var dataList = dataCollection.ToList(); - var totalSize = dataList.SelectMany(d => d.Buffers).Sum(d => d.Count); + var totalSize = dataList.Count; var bytes = new byte[totalSize]; var position = 0; foreach (var byteString in dataList) @@ -132,7 +101,8 @@ public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable d.Buffers).ToList(); + // TODO: fix this before we ship + args.BufferList = new List>(dataCollection.Select(bs => new ArraySegment(bs.ToArray()))); } } } diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 7e12a58b8b4..cd1b4c83b38 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -240,14 +240,9 @@ private Receive PeerSentEOF(ConnectionInfo info) return message => { if (handleWrite(message)) return true; - var cmd = message as CloseCommand; - if (cmd != null) - { - HandleClose(info, Sender, cmd.Event); - return true; - } - if (message is ResumeReading) return true; - return false; + if (message is not CloseCommand cmd) return message is ResumeReading; + HandleClose(info, Sender, cmd.Event); + return true; }; } diff --git a/src/core/Akka/IO/UdpConnection.cs b/src/core/Akka/IO/UdpConnection.cs index 1e1b0aacc81..93de1525951 100644 --- a/src/core/Akka/IO/UdpConnection.cs +++ b/src/core/Akka/IO/UdpConnection.cs @@ -66,6 +66,7 @@ private Receive Resolving(DnsEndPoint remoteAddress) => message => DoConnect(new IPEndPoint(r.Addr, remoteAddress.Port)); return true; } + return false; }; @@ -106,62 +107,66 @@ private bool Connected(object message) { switch (message) { - case SuspendReading _: _readingSuspended = true; return true; + case SuspendReading _: + _readingSuspended = true; + return true; case ResumeReading _: + { + _readingSuspended = false; + if (_pendingRead != null) { - _readingSuspended = false; - if (_pendingRead != null) - { - _connect.Handler.Tell(_pendingRead); - _pendingRead = null; - ReceiveAsync(); - } - return true; + _connect.Handler.Tell(_pendingRead); + _pendingRead = null; + ReceiveAsync(); } - + + return true; + } + case SocketReceived socketReceived: _pendingReceive = null; - DoRead(socketReceived, _connect.Handler); + DoRead(socketReceived, _connect.Handler); return true; - + case Disconnect _: - { - Log.Debug("Closing UDP connection to [{0}]", _connect.RemoteAddress); + { + Log.Debug("Closing UDP connection to [{0}]", _connect.RemoteAddress); - _socket.Dispose(); + _socket.Dispose(); - Sender.Tell(Disconnected.Instance); - Log.Debug("Connection closed to [{0}], stopping listener", _connect.RemoteAddress); - Context.Stop(Self); - return true; - } + Sender.Tell(Disconnected.Instance); + Log.Debug("Connection closed to [{0}], stopping listener", _connect.RemoteAddress); + Context.Stop(Self); + return true; + } case Send send: + { + if (WritePending) + { + if (Udp.Settings.TraceLogging) Log.Debug("Dropping write because queue is full"); + Sender.Tell(new CommandFailed(send)); + } + else { - if (WritePending) + if (!send.Payload.IsEmpty) { - if (Udp.Settings.TraceLogging) Log.Debug("Dropping write because queue is full"); - Sender.Tell(new CommandFailed(send)); + _pendingSend = (send, Sender); + DoWrite(); } else { - if (!send.Payload.IsEmpty) - { - _pendingSend = (send, Sender); - DoWrite(); - } - else - { - if (send.WantsAck) - Sender.Tell(send.Ack); - } + if (send.WantsAck) + Sender.Tell(send.Ack); } - return true; } + + return true; + } case SocketSent sent: { if (_pendingSend == null) throw new Exception("There are no pending sent"); - + var (send, sender) = _pendingSend.Value; if (send.WantsAck) sender.Tell(send.Ack); @@ -193,7 +198,13 @@ private void DoWrite() var (send, sender) = _pendingSend.Value; var data = send.Payload; - var bytesWritten = _socket.Send(data.Buffers); +#if NET6_0_OR_GREATER + var bytesWritten = _socket.Send(data.Memory.Span); +#else + // pay the .NET Framework performance penalty + var array = data.ToArray(); + var bytesWritten = _socket.Send(array); +#endif if (Udp.Settings.TraceLogging) Log.Debug("Wrote [{0}] bytes to socket", bytesWritten); @@ -232,19 +243,20 @@ private void ReportConnectFailure(Action thunk) } catch (Exception e) { - Log.Error(e, "Failure while connecting UDP channel to remote address [{0}] local address [{1}]", _connect.RemoteAddress, _connect.LocalAddress); + Log.Error(e, "Failure while connecting UDP channel to remote address [{0}] local address [{1}]", + _connect.RemoteAddress, _connect.LocalAddress); _commander.Tell(new CommandFailed(_connect)); Context.Stop(Self); } } private SocketAsyncEventArgs _pendingReceive; - + private void ReceiveAsync() { if (_pendingReceive != null) return; - + var e = Udp.SocketEventArgsPool.Acquire(Self); _pendingReceive = e; if (!_socket.ReceiveAsync(e)) @@ -254,4 +266,4 @@ private void ReceiveAsync() } } } -} +} \ No newline at end of file diff --git a/src/core/Akka/Util/ByteString.cs b/src/core/Akka/Util/ByteString.cs index 8b59807ce4c..bf504bc29da 100644 --- a/src/core/Akka/Util/ByteString.cs +++ b/src/core/Akka/Util/ByteString.cs @@ -25,8 +25,8 @@ namespace Akka.IO /// when concatenating and slicing sequences of bytes, /// and also providing a thread safe way of working with bytes. /// - [DebuggerDisplay("(Count = {_count}, Buffers = {_buffers})")] - public sealed class ByteString : IEquatable, IEnumerable + [DebuggerDisplay("(Count = {Count}, Buffer = {Memory})")] + public sealed class ByteString : IEquatable { #region creation methods @@ -244,49 +244,69 @@ public static ByteString FromString(string str, Encoding encoding) #endregion - private readonly int _count; - private readonly ByteBuffer[] _buffers; + private readonly ReadOnlyMemory _memory; + + public ReadOnlyMemory Memory => _memory; - private ByteString(ByteBuffer[] buffers, int count) + private static ReadOnlyMemory ConvertToMemory(ByteBuffer[] buffers) { - _buffers = buffers; - _count = count; + return buffers.Length switch + { + 0 => ReadOnlyMemory.Empty, + 1 => new ReadOnlyMemory(buffers[0].Array, buffers[0].Offset, buffers[0].Count), + _ => new ReadOnlyMemory(ConcatBuffers(buffers)) + }; + + static byte[] ConcatBuffers(ByteBuffer[] buffers) + { + var count = buffers.Sum(x => x.Count); + var result = new byte[count]; + var offset = 0; + foreach (var buffer in buffers) + { + Array.Copy(buffer.Array ?? throw new InvalidOperationException(), buffer.Offset, result, offset, buffer.Count); + offset += buffer.Count; + } + + return result; + } } - private ByteString(ByteBuffer buffer) + private ByteString(ByteBuffer[] buffers, int count) : this(ConvertToMemory(buffers)) { - _buffers = new[] { buffer }; - _count = buffer.Count; } - private ByteString(byte[] array, int offset, int count) + private ByteString(ByteBuffer buffer) : + this(new ReadOnlyMemory(buffer.Array, buffer.Offset, buffer.Count)) { - _buffers = new[] { new ByteBuffer(array, offset, count) }; - _count = count; + } + + private ByteString(byte[] array, int offset, int count) : this(new ReadOnlyMemory(array, offset, count)) + { + } + + private ByteString(in ReadOnlyMemory memory) + { + _memory = memory; } /// /// Gets a total number of bytes stored inside this . /// - public int Count => _count; + public int Count => _memory.Length; /// /// Determines if current has compact representation. /// Compact byte strings represent bytes stored inside single, continuous /// block of memory. /// - /// TBD - public bool IsCompact => _buffers.Length == 1; + [Obsolete("This property will be removed in future versions of Akka.NET.")] + public bool IsCompact => true; /// /// Determines if current is empty. /// - public bool IsEmpty => _count == 0; - - /// - /// Gets sequence of the buffers used underneat. - /// - internal IList Buffers => _buffers; + public bool IsEmpty => Count == 0; /// /// Gets a byte stored under a provided . @@ -296,11 +316,7 @@ public byte this[int index] { get { - if (index >= _count) throw new IndexOutOfRangeException("Requested index is outside of the bounds of the ByteString"); - int j; - var i = GetBufferFittingIndex(index, out j); - var buffer = _buffers[i]; - return buffer.Array[buffer.Offset + j]; + return _memory.Span[index]; } } @@ -308,13 +324,10 @@ public byte this[int index] /// Compacts current , potentially copying its content underneat /// into new byte array. /// - /// TBD + [Obsolete("This method will be removed in future versions of Akka.NET.")] public ByteString Compact() { - if (IsCompact) return this; - - var copy = this.ToArray(); - return new ByteString(copy, 0, copy.Length); + return this; } /// @@ -323,7 +336,7 @@ public ByteString Compact() /// operation. /// /// index inside current , from which slicing should start - public ByteString Slice(int index) => Slice(index, _count - index); + public ByteString Slice(int index) => Slice(index, Count - index); /// /// Slices current , creating a new @@ -335,75 +348,7 @@ public ByteString Compact() /// If index or count result in an invalid . public ByteString Slice(int index, int count) { - if(index < 0) - throw new ArgumentOutOfRangeException(nameof(index), "Index must be positive number"); - if (count < 0) - throw new ArgumentOutOfRangeException(nameof(count), "Count must be positive number"); - if (count == 0) return Empty; - if(index > _count) - throw new ArgumentOutOfRangeException(nameof(index), "Index is outside of the bounds of the ByteString"); - if(index + count > _count) - throw new ArgumentOutOfRangeException(nameof(count), "Index + count is outside of the bounds of the ByteString"); - - if (index == 0 && count == _count) return this; - - var i = GetBufferFittingIndex(index, out var j); - var init = _buffers[i]; - - var copied = Math.Min(init.Count - j, count); - var newBuffers = new ByteBuffer[_buffers.Length - i]; - newBuffers[0] = new ByteBuffer(init.Array, init.Offset + j, copied); - - i++; - var k = 1; - for (; i < _buffers.Length; i++, k++) - { - if (copied >= count) break; - - var buffer = _buffers[i]; - var toCopy = Math.Min(buffer.Count, count - copied); - newBuffers[k] = new ByteBuffer(buffer.Array, buffer.Offset, toCopy); - copied += toCopy; - } - - if (k < newBuffers.Length) - newBuffers = newBuffers.Take(k).ToArray(); - - return new ByteString(newBuffers, count); - } - - /// - /// Given an in current tries to - /// find which buffer will be used to contain that index and return its range. - /// An offset within the buffer itself will be stored in . - /// - /// - /// - /// - private int GetBufferFittingIndex(int index, out int indexWithinBuffer) - { - if (index == 0) - { - indexWithinBuffer = 0; - return 0; - } - - var j = index; - for (var i = 0; i < _buffers.Length; i++) - { - var buffer = _buffers[i]; - if (j >= buffer.Count) - { - j -= buffer.Count; - } - else - { - indexWithinBuffer = j; - return i; - } - } - - throw new IndexOutOfRangeException($"Requested index [{index}] is outside of the bounds of current ByteString."); + return new ByteString(_memory.Slice(index, count)); } /// @@ -414,14 +359,7 @@ private int GetBufferFittingIndex(int index, out int indexWithinBuffer) /// public int IndexOf(byte b) { - var idx = 0; - foreach (var x in this) - { - if (x == b) return idx; - idx++; - } - - return -1; + return _memory.Span.IndexOf(b); } /// @@ -431,22 +369,21 @@ public int IndexOf(byte b) /// public int IndexOf(byte b, int from) { - if (from >= _count) return -1; - - int j; - var i = GetBufferFittingIndex(from, out j); - var idx = from; - for (; i < _buffers.Length; i++) - { - var buffer = _buffers[i]; - for (; j < buffer.Count; j++, idx++) - { - if (buffer.Array[buffer.Offset + j] == b) return idx; - } - j = 0; - } + var rValue = _memory.Span[from..].IndexOf(b); + return rValue == -1 ? -1 : rValue + from; + } + + public int IndexOf(ByteString other, int index = 0) + { + if (other.Count == 0) return index; // Empty spans are always "found". + if (index < 0 || index > Count) throw new ArgumentOutOfRangeException(nameof(index), "Start index is out of range."); + if (Count - index < other.Count) return -1; // Can't find if `toFind` is larger considering the start index. + + var span = _memory.Span; + var otherSpan = other._memory.Span; - return -1; + var indexOf = span.Slice(index).IndexOf(otherSpan); + return indexOf == -1 ? -1 : indexOf + index; } /// @@ -459,53 +396,19 @@ public int IndexOf(byte b, int from) /// public bool HasSubstring(ByteString other, int index) { - // quick check: if subsequence is longer than remaining size, return false - if (other.Count > _count - index) return false; - - int thisIdx = 0, otherIdx = 0; - var i = GetBufferFittingIndex(index, out thisIdx); - var j = 0; - while (j < other._buffers.Length) - { - var buffer = _buffers[i]; - var otherBuffer = other._buffers[j]; - - while (thisIdx < buffer.Count && otherIdx < otherBuffer.Count) - { - if (buffer.Array[buffer.Offset + thisIdx] != otherBuffer.Array[otherBuffer.Offset + otherIdx]) - return false; - - thisIdx++; - otherIdx++; - } - - if (thisIdx >= buffer.Count) - { - i++; - thisIdx = 0; - } - if (otherIdx >= otherBuffer.Count) - { - j++; - otherIdx = 0; - } - } - - return true; + return IndexOf(other, index) > -1; } /// /// Copies content of a current into a single byte array. /// - /// TBD + /// + /// WARNING: this method allocates! + /// + /// A new array of data public byte[] ToArray() { - if (_count == 0) - return Array.Empty(); - - var copy = new byte[_count]; - CopyTo(copy, 0, _count); - return copy; + return _memory.ToArray(); } /// @@ -519,16 +422,14 @@ public ByteString Concat(ByteString other) if (other == null) throw new ArgumentNullException(nameof(other), "Cannot append null to ByteString."); if (other.IsEmpty) return this; - if (this.IsEmpty) return other; + if (IsEmpty) return other; - var count = _count + other._count; - var len1 = _buffers.Length; - var len2 = other._buffers.Length; - var array = new ByteBuffer[len1 + len2]; - Array.Copy(this._buffers, 0, array, 0, len1); - Array.Copy(other._buffers, 0, array, len1, len2); - - return new ByteString(array, count); + // combine the two ReadOnlyMemory instances + var array = new byte[_memory.Length + other._memory.Length]; + _memory.Span.CopyTo(array); + other._memory.Span.CopyTo(array.AsSpan(_memory.Length)); + + return new ByteString(array); } /// @@ -544,20 +445,9 @@ public int CopyTo(byte[] buffer, int index, int count) if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - Array.Copy(b.Array, b.Offset, buffer, position, toCopy); - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.CopyTo(buffer.AsSpan(index, count)); + return count; } /// @@ -565,6 +455,7 @@ public int CopyTo(byte[] buffer, int index, int count) /// /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Memory buffer) => CopyTo(ref buffer, 0, buffer.Length); @@ -574,29 +465,16 @@ public int CopyTo(ref Memory buffer) /// buffer and copying a number of bytes. /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Memory buffer, int index, int count) { if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - - var bufferSpan = buffer.Span.Slice(position, toCopy); - b.AsSpan().CopyTo(bufferSpan); - - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.Slice(0, count).CopyTo(buffer.Span.Slice(index)); + return count; } /// @@ -604,6 +482,7 @@ public int CopyTo(ref Memory buffer, int index, int count) /// . /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Span buffer) => CopyTo(ref buffer, 0, buffer.Length); @@ -613,29 +492,16 @@ public int CopyTo(ref Span buffer) /// buffer and copying a number of bytes. /// /// The number of bytes copied + [Obsolete("This method will be removed in future versions of Akka.NET.")] public int CopyTo(ref Span buffer, int index, int count) { if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to."); if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count)); - count = Math.Min(count, _count); - var remaining = count; - var position = index; - foreach (var b in _buffers) - { - var toCopy = Math.Min(b.Count, remaining); - - var bufferSpan = buffer.Slice(position, toCopy); - b.AsSpan().CopyTo(bufferSpan); - - position += toCopy; - remaining -= toCopy; - - if (remaining == 0) return count; - } - - return 0; + count = Math.Min(count, Count); + _memory.Span.Slice(0, count).CopyTo(buffer.Slice(index)); + return count; } /// @@ -643,14 +509,14 @@ public int CopyTo(ref Span buffer, int index, int count) /// writeable . /// /// + [Obsolete("This method will be removed in future versions of Akka.NET.")] public void WriteTo(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - - foreach (var buffer in _buffers) - { - stream.Write(buffer.Array, buffer.Offset, buffer.Count); - } + + // TODO: remove this method in future versions of Akka.NET and use System.Memory APIs + var array = _memory.ToArray(); + stream.Write(array, 0, array.Length); } /// @@ -658,79 +524,51 @@ public void WriteTo(Stream stream) /// to a provided writeable . /// /// - public async Task WriteToAsync(Stream stream) + [Obsolete("This method will be removed in future versions of Akka.NET.")] + public Task WriteToAsync(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - foreach (var buffer in _buffers) - { - await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count); - } + // TODO: remove this method in future versions of Akka.NET and use System.Memory APIs + var array = _memory.ToArray(); + return stream.WriteAsync(array, 0, array.Length); } public override bool Equals(object obj) => Equals(obj as ByteString); public override int GetHashCode() { - var hashCode = 0; - foreach (var b in this) - { - hashCode = (hashCode * 397) ^ b.GetHashCode(); - } - return hashCode; + return _memory.GetHashCode(); } public bool Equals(ByteString other) { if (ReferenceEquals(other, this)) return true; if (ReferenceEquals(other, null)) return false; - if (_count != other._count) return false; - - using (var thisEnum = this.GetEnumerator()) - using (var otherEnum = other.GetEnumerator()) - { - while (thisEnum.MoveNext() && otherEnum.MoveNext()) - { - if (thisEnum.Current != otherEnum.Current) return false; - } - } - - return true; - } - - public IEnumerator GetEnumerator() - { - foreach (var buffer in _buffers) - { - for (int i = buffer.Offset; i < buffer.Offset + buffer.Count; i++) - { - yield return buffer.Array[i]; - } - } + + return _memory.Span.SequenceEqual(other._memory.Span); } - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - public override string ToString() => ToString(Encoding.UTF8); public string ToString(Encoding encoding) { - if (IsCompact) - return encoding.GetString(_buffers[0].Array, _buffers[0].Offset, _buffers[0].Count); - - byte[] buffer = ToArray(); - - return encoding.GetString(buffer); + // get span as byte array + return encoding.GetString(_memory.Span.ToArray()); } public static bool operator ==(ByteString x, ByteString y) => Equals(x, y); public static bool operator !=(ByteString x, ByteString y) => !Equals(x, y); - public static explicit operator ByteString(byte[] bytes) => ByteString.CopyFrom(bytes); + public static explicit operator ByteString(byte[] bytes) => CopyFrom(bytes); public static explicit operator byte[] (ByteString byteString) => byteString.ToArray(); + public static explicit operator ByteString(Memory memory) => CopyFrom(memory); + + public static explicit operator ByteString(ReadOnlyMemory memory) => new(memory); + public static ByteString operator +(ByteString x, ByteString y) => x.Concat(y); }