From 8f76c02af78bc8bf50545ed88a8a38f3783fc40f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=9B=BD=E4=BC=9F?= <366193849@qq.com> Date: Thu, 21 Nov 2024 10:27:25 +0800 Subject: [PATCH] MqttBufferWriter: Use Span GetSpan(int) to get the buffer to write. --- .../MqttBufferReaderBenchmark.cs | 21 ++--- .../MqttPacketReaderWriterBenchmark.cs | 6 +- .../Formatter/MqttBufferReader_Tests.cs | 7 +- .../Formatter/MqttPacketWriter_Tests.cs | 12 +-- .../Helpers/MqttPacketWriterExtensions.cs | 2 +- Source/MQTTnet.Tests/Protocol_Tests.cs | 8 +- Source/MQTTnet/Formatter/MqttBufferWriter.cs | 86 ++++++++++--------- .../Formatter/V3/MqttV3PacketFormatter.cs | 2 +- .../Formatter/V5/MqttV5PacketEncoder.cs | 3 +- 9 files changed, 77 insertions(+), 70 deletions(-) diff --git a/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs b/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs index bfa3d209c..e49dd79f0 100644 --- a/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs @@ -2,11 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Text; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; using MQTTnet.Formatter; +using System; +using System.Runtime.InteropServices; +using System.Text; namespace MQTTnet.Benchmarks { @@ -14,8 +15,7 @@ namespace MQTTnet.Benchmarks [MemoryDiagnoser] public class MqttBufferReaderBenchmark { - byte[] _buffer; - int _bufferLength; + ArraySegment _buffer; [GlobalSetup] public void GlobalSetup() @@ -23,21 +23,22 @@ public void GlobalSetup() var writer = new MqttBufferWriter(1024, 1024); writer.WriteString("hgfjkdfkjlghfdjghdfljkdfhgdlkjfshgsldkfjghsdflkjghdsflkjhrstiuoghlkfjbhnfbutghjoiöjhklötnbhtroliöuhbjntluiobkjzbhtdrlskbhtruhjkfthgbkftgjhgfiklhotriuöhbjtrsioöbtrsötrhträhtrühjtriüoätrhjtsrölbktrbnhtrulöbionhströloubinströoliubhnsöotrunbtöroisntröointröioujhgötiohjgötorshjnbgtöorihbnjtröoihbjntröobntröoibntrjhötrohjbtröoihntröoibnrtoiöbtrjnboöitrhjtnriohötrhjtöroihjtroöihjtroösibntsroönbotöirsbntöoihjntröoihntroöbtrboöitrnhoöitrhjntröoishbnjtröosbhtröbntriohjtröoijtöoitbjöotibjnhöotirhbjntroöibhnjrtoöibnhtroöibnhtörsbnhtöoirbnhtöroibntoörhjnbträöbtrbträbtrbtirbätrsibohjntrsöiobthnjiohjsrtoib"); - _buffer = writer.GetBuffer(); - _bufferLength = writer.Length; + if (MemoryMarshal.TryGetArray(writer.GetWrittenMemory(), out var segment)) + { + _buffer = segment; + } } [Benchmark] public void Use_Span() { - var span = _buffer.AsSpan(0, _bufferLength); - Encoding.UTF8.GetString(span); + Encoding.UTF8.GetString(_buffer.AsSpan()); } - + [Benchmark] public void Use_Encoding() { - Encoding.UTF8.GetString(_buffer, 0, _bufferLength); + Encoding.UTF8.GetString(_buffer.Array, _buffer.Offset, _buffer.Count); } } } \ No newline at end of file diff --git a/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs b/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs index aecd8f814..fca65aaab 100644 --- a/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs @@ -16,7 +16,7 @@ public class MqttPacketReaderWriterBenchmark : BaseBenchmark { readonly byte[] _demoPayload = new byte[1024]; - byte[] _readPayload; + ReadOnlyMemory _readPayload; [GlobalCleanup] public void GlobalCleanup() @@ -42,7 +42,7 @@ public void GlobalSetup() writer.WriteString("fjgffiogfhgfhoihgoireghreghreguhreguireoghreouighreouighreughreguiorehreuiohruiorehreuioghreug"); writer.WriteBinary(_demoPayload); - _readPayload = new ArraySegment(writer.GetBuffer(), 0, writer.Length).ToArray(); + _readPayload = writer.GetWrittenMemory(); } [Benchmark] @@ -51,7 +51,7 @@ public void Read_100_000_Messages() for (var i = 0; i < 100000; i++) { var reader = new MqttBufferReader(); - reader.SetBuffer(_readPayload.AsMemory(0, _readPayload.Length)); + reader.SetBuffer(_readPayload); reader.ReadString(); reader.ReadBinaryData(); diff --git a/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs index fc62d65b0..2c8b6665e 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs @@ -133,8 +133,8 @@ public void Read_Various_Positions_and_Offsets() writer.WriteVariableByteInteger(elementNumberValue); elementSize = writer.Length; elementBytes = new byte[elementSize]; - var buffer = writer.GetBuffer(); - Array.Copy(buffer, elementBytes, elementSize); + writer.GetWrittenSpan().CopyTo(elementBytes); + alreadyBigEndian = true; // nothing to swap } break; @@ -146,8 +146,7 @@ public void Read_Various_Positions_and_Offsets() writer.WriteString(elementStringValue); elementSize = writer.Length; elementBytes = new byte[elementSize]; - var buffer = writer.GetBuffer(); - Array.Copy(buffer, elementBytes, elementSize); + writer.GetWrittenSpan().CopyTo(elementBytes); alreadyBigEndian = true; // nothing to swap } break; diff --git a/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs index eafee2528..68d673f5c 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs @@ -36,10 +36,10 @@ public void Use_All_Data_Types() writer.WriteVariableByteInteger(1234U); writer.WriteVariableByteInteger(9876U); - var buffer = writer.GetBuffer(); + var buffer = writer.GetWrittenMemory(); var reader = new MqttBufferReader(); - reader.SetBuffer(buffer.AsMemory(0, writer.Length)); + reader.SetBuffer(buffer); Assert.AreEqual("AString", reader.ReadString()); Assert.IsTrue(reader.ReadByte() == 1); @@ -48,13 +48,13 @@ public void Use_All_Data_Types() Assert.AreEqual(1234U, reader.ReadVariableByteInteger()); Assert.AreEqual(9876U, reader.ReadVariableByteInteger()); } - + [TestMethod] [ExpectedException(typeof(MqttProtocolViolationException))] public void Throw_If_String_Too_Long() { var writer = new MqttBufferWriter(4096, 65535); - + writer.WriteString(string.Empty.PadLeft(65536)); } @@ -75,12 +75,12 @@ public void Write_And_Read_Multiple_Times() writer.WriteString("fjgffiogfhgfhoihgoireghreghreguhreguireoghreouighreouighreughreguiorehreuiohruiorehreuioghreug"); writer.WriteBinary(new byte[3]); - var readPayload = new ArraySegment(writer.GetBuffer(), 0, writer.Length).ToArray(); + var readPayload = writer.GetWrittenMemory(); for (var i = 0; i < 100000; i++) { var reader = new MqttBufferReader(); - reader.SetBuffer(readPayload.AsMemory(0, readPayload.Length)); + reader.SetBuffer(readPayload); reader.ReadString(); reader.ReadBinaryData(); diff --git a/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs b/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs index d65d96a47..7153d1aef 100644 --- a/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs +++ b/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs @@ -15,7 +15,7 @@ public static byte[] AddMqttHeader(this MqttBufferWriter writer, MqttControlPack writer.WriteByte(MqttBufferWriter.BuildFixedHeader(header)); writer.WriteVariableByteInteger((uint)body.Length); writer.Write(body); - return writer.GetBuffer(); + return writer.GetWrittenMemory().ToArray(); } } } diff --git a/Source/MQTTnet.Tests/Protocol_Tests.cs b/Source/MQTTnet.Tests/Protocol_Tests.cs index 6d5eee809..988c306fd 100644 --- a/Source/MQTTnet.Tests/Protocol_Tests.cs +++ b/Source/MQTTnet.Tests/Protocol_Tests.cs @@ -20,10 +20,10 @@ public void Encode_Four_Byte_Integer() { writer.WriteVariableByteInteger(value); - var buffer = writer.GetBuffer(); + var buffer = writer.GetWrittenMemory(); var reader = new MqttBufferReader(); - reader.SetBuffer(buffer.AsMemory(0, writer.Length)); + reader.SetBuffer(buffer); var checkValue = reader.ReadVariableByteInteger(); Assert.AreEqual(value, checkValue); @@ -41,10 +41,10 @@ public void Encode_Two_Byte_Integer() { writer.WriteTwoByteInteger(value); - var buffer = writer.GetBuffer(); + var buffer = writer.GetWrittenMemory(); var reader = new MqttBufferReader(); - reader.SetBuffer(buffer.AsMemory(0, writer.Length)); + reader.SetBuffer(buffer); var checkValue = reader.ReadTwoByteInteger(); Assert.AreEqual(value, checkValue); diff --git a/Source/MQTTnet/Formatter/MqttBufferWriter.cs b/Source/MQTTnet/Formatter/MqttBufferWriter.cs index 83c14a062..482aa4abc 100644 --- a/Source/MQTTnet/Formatter/MqttBufferWriter.cs +++ b/Source/MQTTnet/Formatter/MqttBufferWriter.cs @@ -27,13 +27,14 @@ public sealed class MqttBufferWriter byte[] _buffer; int _position; + public int Length { get; private set; } + public MqttBufferWriter(int bufferSize, int maxBufferSize) { _buffer = new byte[bufferSize]; _maxBufferSize = maxBufferSize; } - public int Length { get; private set; } public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0) { @@ -60,9 +61,15 @@ public void Cleanup() _buffer = new byte[_maxBufferSize]; } - public byte[] GetBuffer() + + public ReadOnlySpan GetWrittenSpan() + { + return _buffer.AsSpan(0, Length); + } + + public ReadOnlyMemory GetWrittenMemory() { - return _buffer; + return _buffer.AsMemory(0, Length); } public static int GetVariableByteIntegerSize(uint value) @@ -107,7 +114,7 @@ public void Write(MqttBufferWriter propertyWriter) { ArgumentNullException.ThrowIfNull(propertyWriter); - Write(propertyWriter._buffer.AsSpan(0, propertyWriter.Length)); + Write(propertyWriter.GetWrittenSpan()); } public void Write(ReadOnlySpan buffer) @@ -117,42 +124,43 @@ public void Write(ReadOnlySpan buffer) return; } - EnsureAdditionalCapacity(buffer.Length); + var size = buffer.Length; + var span = GetSpan(size); - buffer.CopyTo(_buffer.AsSpan(_position)); - - IncreasePosition(buffer.Length); + buffer.CopyTo(span); + Advance(size); } public void WriteBinary(ReadOnlySpan value) { - var valueLength = value.Length; - EnsureAdditionalCapacity(valueLength + 2); + var size = value.Length + 2; + var span = GetSpan(size); - BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), (ushort)valueLength); - value.CopyTo(_buffer.AsSpan(_position + 2)); + BinaryPrimitives.WriteUInt16BigEndian(span, (ushort)value.Length); + value.CopyTo(span[2..]); - IncreasePosition(valueLength + 2); + Advance(size); } public void WriteByte(byte @byte) { - EnsureAdditionalCapacity(1); + const int size = sizeof(byte); + var span = GetSpan(size); - _buffer[_position] = @byte; - IncreasePosition(1); + span[0] = @byte; + Advance(size); } public void WriteString(string value) { if (string.IsNullOrEmpty(value)) { - EnsureAdditionalCapacity(2); - - _buffer.AsSpan(_position, 2).Fill(default); + const int size = 2; + var span = GetSpan(size); - IncreasePosition(2); + span.Fill(default); + Advance(size); } else { @@ -161,10 +169,9 @@ public void WriteString(string value) // So the buffer should always have much more capacity left so that a correct value // here is only waste of CPU cycles. var byteCount = value.Length * 4; + var span = GetSpan(byteCount + 2); - EnsureAdditionalCapacity(byteCount + 2); - - var writtenBytes = Encoding.UTF8.GetBytes(value, 0, value.Length, _buffer, _position + 2); + var writtenBytes = Encoding.UTF8.GetBytes(value, span[2..]); // From RFC: 1.5.4 UTF-8 Encoded String // Unless stated otherwise all UTF-8 encoded strings can have any length in the range 0 to 65,535 bytes. @@ -173,27 +180,30 @@ public void WriteString(string value) throw new MqttProtocolViolationException($"The maximum string length is 65535. The current string has a length of {writtenBytes}."); } - BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), (ushort)writtenBytes); + BinaryPrimitives.WriteUInt16BigEndian(span, (ushort)writtenBytes); - IncreasePosition(writtenBytes + 2); + Advance(writtenBytes + 2); } } public void WriteTwoByteInteger(ushort value) { - EnsureAdditionalCapacity(2); + const int size = sizeof(ushort); + var span = GetSpan(size); - BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), value); + BinaryPrimitives.WriteUInt16BigEndian(span, value); - IncreasePosition(2); + Advance(size); } public void WriteVariableByteInteger(uint value) { + EnsureCapacity(sizeof(uint)); + if (value <= 127) { _buffer[_position] = (byte)value; - IncreasePosition(1); + Advance(1); return; } @@ -218,21 +228,19 @@ public void WriteVariableByteInteger(uint value) size++; } while (x > 0); - IncreasePosition(size); + Advance(size); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - void EnsureAdditionalCapacity(int additionalCapacity) + Span GetSpan(int size) { - var bufferLength = _buffer.Length; - - var freeSpace = bufferLength - _position; - if (freeSpace >= additionalCapacity) + var freeSpace = _buffer.Length - _position; + if (freeSpace < size) { - return; + EnsureCapacity(_buffer.Length + size - freeSpace); } - EnsureCapacity(bufferLength + additionalCapacity - freeSpace); + return _buffer.AsSpan(_position, size); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -255,9 +263,9 @@ void EnsureCapacity(int capacity) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - void IncreasePosition(int length) + void Advance(int count) { - _position += length; + _position += count; if (_position > Length) { diff --git a/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs b/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs index 43ace2738..c6e0ee81f 100644 --- a/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs +++ b/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs @@ -122,7 +122,7 @@ public MqttPacketBuffer Encode(MqttPacket packet) _bufferWriter.WriteByte(fixedHeader); _bufferWriter.WriteVariableByteInteger(remainingLength); - var firstSegment = _bufferWriter.GetBuffer().AsMemory(headerOffset, _bufferWriter.Length - headerOffset); + var firstSegment = _bufferWriter.GetWrittenMemory()[headerOffset..]; return payload.Length == 0 ? new MqttPacketBuffer(firstSegment) diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs b/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs index 41b9a9ac4..2c30f560f 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs @@ -56,8 +56,7 @@ public MqttPacketBuffer Encode(MqttPacket packet) _bufferWriter.WriteByte(fixedHeader); _bufferWriter.WriteVariableByteInteger(remainingLength); - var buffer = _bufferWriter.GetBuffer(); - var firstSegment = buffer.AsMemory(headerOffset, _bufferWriter.Length - headerOffset); + var firstSegment = _bufferWriter.GetWrittenMemory()[headerOffset..]; return publishPacket == null ? new MqttPacketBuffer(firstSegment)