Skip to content

Commit

Permalink
MqttBufferWriter: Use Span<byte> GetSpan(int) to get the buffer to wr…
Browse files Browse the repository at this point in the history
…ite.
  • Loading branch information
xljiulang committed Nov 21, 2024
1 parent 42076f4 commit 8f76c02
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 70 deletions.
21 changes: 11 additions & 10 deletions Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,43 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Formatter;
using System;
using System.Runtime.InteropServices;
using System.Text;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[MemoryDiagnoser]
public class MqttBufferReaderBenchmark
{
byte[] _buffer;
int _bufferLength;
ArraySegment<byte> _buffer;

[GlobalSetup]
public void GlobalSetup()
{
var writer = new MqttBufferWriter(1024, 1024);
writer.WriteString("hgfjkdfkjlghfdjghdfljkdfhgdlkjfshgsldkfjghsdflkjghdsflkjhrstiuoghlkfjbhnfbutghjoiöjhklötnbhtroliöuhbjntluiobkjzbhtdrlskbhtruhjkfthgbkftgjhgfiklhotriuöhbjtrsioöbtrsötrhträhtrühjtriüoätrhjtsrölbktrbnhtrulöbionhströloubinströoliubhnsöotrunbtöroisntröointröioujhgötiohjgötorshjnbgtöorihbnjtröoihbjntröobntröoibntrjhötrohjbtröoihntröoibnrtoiöbtrjnboöitrhjtnriohötrhjtöroihjtroöihjtroösibntsroönbotöirsbntöoihjntröoihntroöbtrboöitrnhoöitrhjntröoishbnjtröosbhtröbntriohjtröoijtöoitbjöotibjnhöotirhbjntroöibhnjrtoöibnhtroöibnhtörsbnhtöoirbnhtöroibntoörhjnbträöbtrbträbtrbtirbätrsibohjntrsöiobthnjiohjsrtoib");

_buffer = writer.GetBuffer();
_bufferLength = writer.Length;
if (MemoryMarshal.TryGetArray(writer.GetWrittenMemory(), out var segment))
{
_buffer = segment;
}
}

[Benchmark]
public void Use_Span()
{
var span = _buffer.AsSpan(0, _bufferLength);
Encoding.UTF8.GetString(span);
Encoding.UTF8.GetString(_buffer.AsSpan());
}

[Benchmark]
public void Use_Encoding()
{
Encoding.UTF8.GetString(_buffer, 0, _bufferLength);
Encoding.UTF8.GetString(_buffer.Array, _buffer.Offset, _buffer.Count);
}
}
}
6 changes: 3 additions & 3 deletions Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class MqttPacketReaderWriterBenchmark : BaseBenchmark
{
readonly byte[] _demoPayload = new byte[1024];

byte[] _readPayload;
ReadOnlyMemory<byte> _readPayload;

[GlobalCleanup]
public void GlobalCleanup()
Expand All @@ -42,7 +42,7 @@ public void GlobalSetup()
writer.WriteString("fjgffiogfhgfhoihgoireghreghreguhreguireoghreouighreouighreughreguiorehreuiohruiorehreuioghreug");
writer.WriteBinary(_demoPayload);

_readPayload = new ArraySegment<byte>(writer.GetBuffer(), 0, writer.Length).ToArray();
_readPayload = writer.GetWrittenMemory();
}

[Benchmark]
Expand All @@ -51,7 +51,7 @@ public void Read_100_000_Messages()
for (var i = 0; i < 100000; i++)
{
var reader = new MqttBufferReader();
reader.SetBuffer(_readPayload.AsMemory(0, _readPayload.Length));
reader.SetBuffer(_readPayload);

reader.ReadString();
reader.ReadBinaryData();
Expand Down
7 changes: 3 additions & 4 deletions Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public void Read_Various_Positions_and_Offsets()
writer.WriteVariableByteInteger(elementNumberValue);
elementSize = writer.Length;
elementBytes = new byte[elementSize];
var buffer = writer.GetBuffer();
Array.Copy(buffer, elementBytes, elementSize);
writer.GetWrittenSpan().CopyTo(elementBytes);

alreadyBigEndian = true; // nothing to swap
}
break;
Expand All @@ -146,8 +146,7 @@ public void Read_Various_Positions_and_Offsets()
writer.WriteString(elementStringValue);
elementSize = writer.Length;
elementBytes = new byte[elementSize];
var buffer = writer.GetBuffer();
Array.Copy(buffer, elementBytes, elementSize);
writer.GetWrittenSpan().CopyTo(elementBytes);
alreadyBigEndian = true; // nothing to swap
}
break;
Expand Down
12 changes: 6 additions & 6 deletions Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public void Use_All_Data_Types()
writer.WriteVariableByteInteger(1234U);
writer.WriteVariableByteInteger(9876U);

var buffer = writer.GetBuffer();
var buffer = writer.GetWrittenMemory();

var reader = new MqttBufferReader();
reader.SetBuffer(buffer.AsMemory(0, writer.Length));
reader.SetBuffer(buffer);

Assert.AreEqual("AString", reader.ReadString());
Assert.IsTrue(reader.ReadByte() == 1);
Expand All @@ -48,13 +48,13 @@ public void Use_All_Data_Types()
Assert.AreEqual(1234U, reader.ReadVariableByteInteger());
Assert.AreEqual(9876U, reader.ReadVariableByteInteger());
}

[TestMethod]
[ExpectedException(typeof(MqttProtocolViolationException))]
public void Throw_If_String_Too_Long()
{
var writer = new MqttBufferWriter(4096, 65535);

writer.WriteString(string.Empty.PadLeft(65536));
}

Expand All @@ -75,12 +75,12 @@ public void Write_And_Read_Multiple_Times()
writer.WriteString("fjgffiogfhgfhoihgoireghreghreguhreguireoghreouighreouighreughreguiorehreuiohruiorehreuioghreug");
writer.WriteBinary(new byte[3]);

var readPayload = new ArraySegment<byte>(writer.GetBuffer(), 0, writer.Length).ToArray();
var readPayload = writer.GetWrittenMemory();

for (var i = 0; i < 100000; i++)
{
var reader = new MqttBufferReader();
reader.SetBuffer(readPayload.AsMemory(0, readPayload.Length));
reader.SetBuffer(readPayload);

reader.ReadString();
reader.ReadBinaryData();
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static byte[] AddMqttHeader(this MqttBufferWriter writer, MqttControlPack
writer.WriteByte(MqttBufferWriter.BuildFixedHeader(header));
writer.WriteVariableByteInteger((uint)body.Length);
writer.Write(body);
return writer.GetBuffer();
return writer.GetWrittenMemory().ToArray();
}
}
}
8 changes: 4 additions & 4 deletions Source/MQTTnet.Tests/Protocol_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public void Encode_Four_Byte_Integer()
{
writer.WriteVariableByteInteger(value);

var buffer = writer.GetBuffer();
var buffer = writer.GetWrittenMemory();

var reader = new MqttBufferReader();
reader.SetBuffer(buffer.AsMemory(0, writer.Length));
reader.SetBuffer(buffer);
var checkValue = reader.ReadVariableByteInteger();

Assert.AreEqual(value, checkValue);
Expand All @@ -41,10 +41,10 @@ public void Encode_Two_Byte_Integer()
{
writer.WriteTwoByteInteger(value);

var buffer = writer.GetBuffer();
var buffer = writer.GetWrittenMemory();

var reader = new MqttBufferReader();
reader.SetBuffer(buffer.AsMemory(0, writer.Length));
reader.SetBuffer(buffer);
var checkValue = reader.ReadTwoByteInteger();

Assert.AreEqual(value, checkValue);
Expand Down
86 changes: 47 additions & 39 deletions Source/MQTTnet/Formatter/MqttBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public sealed class MqttBufferWriter
byte[] _buffer;
int _position;

public int Length { get; private set; }

public MqttBufferWriter(int bufferSize, int maxBufferSize)
{
_buffer = new byte[bufferSize];
_maxBufferSize = maxBufferSize;
}

public int Length { get; private set; }

public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{
Expand All @@ -60,9 +61,15 @@ public void Cleanup()
_buffer = new byte[_maxBufferSize];
}

public byte[] GetBuffer()

public ReadOnlySpan<byte> GetWrittenSpan()
{
return _buffer.AsSpan(0, Length);
}

public ReadOnlyMemory<byte> GetWrittenMemory()
{
return _buffer;
return _buffer.AsMemory(0, Length);
}

public static int GetVariableByteIntegerSize(uint value)
Expand Down Expand Up @@ -107,7 +114,7 @@ public void Write(MqttBufferWriter propertyWriter)
{
ArgumentNullException.ThrowIfNull(propertyWriter);

Write(propertyWriter._buffer.AsSpan(0, propertyWriter.Length));
Write(propertyWriter.GetWrittenSpan());
}

public void Write(ReadOnlySpan<byte> buffer)
Expand All @@ -117,42 +124,43 @@ public void Write(ReadOnlySpan<byte> buffer)
return;
}

EnsureAdditionalCapacity(buffer.Length);
var size = buffer.Length;
var span = GetSpan(size);

buffer.CopyTo(_buffer.AsSpan(_position));

IncreasePosition(buffer.Length);
buffer.CopyTo(span);
Advance(size);
}

public void WriteBinary(ReadOnlySpan<byte> value)
{
var valueLength = value.Length;
EnsureAdditionalCapacity(valueLength + 2);
var size = value.Length + 2;
var span = GetSpan(size);

BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), (ushort)valueLength);
value.CopyTo(_buffer.AsSpan(_position + 2));
BinaryPrimitives.WriteUInt16BigEndian(span, (ushort)value.Length);
value.CopyTo(span[2..]);

IncreasePosition(valueLength + 2);
Advance(size);
}


public void WriteByte(byte @byte)
{
EnsureAdditionalCapacity(1);
const int size = sizeof(byte);
var span = GetSpan(size);

_buffer[_position] = @byte;
IncreasePosition(1);
span[0] = @byte;
Advance(size);
}

public void WriteString(string value)
{
if (string.IsNullOrEmpty(value))
{
EnsureAdditionalCapacity(2);

_buffer.AsSpan(_position, 2).Fill(default);
const int size = 2;
var span = GetSpan(size);

IncreasePosition(2);
span.Fill(default);
Advance(size);
}
else
{
Expand All @@ -161,10 +169,9 @@ public void WriteString(string value)
// So the buffer should always have much more capacity left so that a correct value
// here is only waste of CPU cycles.
var byteCount = value.Length * 4;
var span = GetSpan(byteCount + 2);

EnsureAdditionalCapacity(byteCount + 2);

var writtenBytes = Encoding.UTF8.GetBytes(value, 0, value.Length, _buffer, _position + 2);
var writtenBytes = Encoding.UTF8.GetBytes(value, span[2..]);

// From RFC: 1.5.4 UTF-8 Encoded String
// Unless stated otherwise all UTF-8 encoded strings can have any length in the range 0 to 65,535 bytes.
Expand All @@ -173,27 +180,30 @@ public void WriteString(string value)
throw new MqttProtocolViolationException($"The maximum string length is 65535. The current string has a length of {writtenBytes}.");
}

BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), (ushort)writtenBytes);
BinaryPrimitives.WriteUInt16BigEndian(span, (ushort)writtenBytes);

IncreasePosition(writtenBytes + 2);
Advance(writtenBytes + 2);
}
}

public void WriteTwoByteInteger(ushort value)
{
EnsureAdditionalCapacity(2);
const int size = sizeof(ushort);
var span = GetSpan(size);

BinaryPrimitives.WriteUInt16BigEndian(_buffer.AsSpan(_position), value);
BinaryPrimitives.WriteUInt16BigEndian(span, value);

IncreasePosition(2);
Advance(size);
}

public void WriteVariableByteInteger(uint value)
{
EnsureCapacity(sizeof(uint));

if (value <= 127)
{
_buffer[_position] = (byte)value;
IncreasePosition(1);
Advance(1);

return;
}
Expand All @@ -218,21 +228,19 @@ public void WriteVariableByteInteger(uint value)
size++;
} while (x > 0);

IncreasePosition(size);
Advance(size);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void EnsureAdditionalCapacity(int additionalCapacity)
Span<byte> GetSpan(int size)
{
var bufferLength = _buffer.Length;

var freeSpace = bufferLength - _position;
if (freeSpace >= additionalCapacity)
var freeSpace = _buffer.Length - _position;
if (freeSpace < size)
{
return;
EnsureCapacity(_buffer.Length + size - freeSpace);
}

EnsureCapacity(bufferLength + additionalCapacity - freeSpace);
return _buffer.AsSpan(_position, size);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -255,9 +263,9 @@ void EnsureCapacity(int capacity)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void IncreasePosition(int length)
void Advance(int count)
{
_position += length;
_position += count;

if (_position > Length)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public MqttPacketBuffer Encode(MqttPacket packet)
_bufferWriter.WriteByte(fixedHeader);
_bufferWriter.WriteVariableByteInteger(remainingLength);

var firstSegment = _bufferWriter.GetBuffer().AsMemory(headerOffset, _bufferWriter.Length - headerOffset);
var firstSegment = _bufferWriter.GetWrittenMemory()[headerOffset..];

return payload.Length == 0
? new MqttPacketBuffer(firstSegment)
Expand Down
3 changes: 1 addition & 2 deletions Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public MqttPacketBuffer Encode(MqttPacket packet)
_bufferWriter.WriteByte(fixedHeader);
_bufferWriter.WriteVariableByteInteger(remainingLength);

var buffer = _bufferWriter.GetBuffer();
var firstSegment = buffer.AsMemory(headerOffset, _bufferWriter.Length - headerOffset);
var firstSegment = _bufferWriter.GetWrittenMemory()[headerOffset..];

return publishPacket == null
? new MqttPacketBuffer(firstSegment)
Expand Down

0 comments on commit 8f76c02

Please sign in to comment.