Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory allocationt. #2109

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a793a3a
Reduce memory allocation of DecodePublishPacket.
xljiulang Nov 18, 2024
6c7e0e5
Reduce memory allocation of MqttBufferReader.
xljiulang Nov 19, 2024
e7f4248
Change the type of the Packet field from ArraySegment<byte> to ReadOn…
xljiulang Nov 19, 2024
e578f0c
MqttBufferWriter.WriteBinary(byte[]) parameter type changed to ReadOn…
xljiulang Nov 19, 2024
9fa3337
Change the type of PayloadSegment to ReadOnlyMemory<byte>
xljiulang Nov 19, 2024
7edac8f
Change the type of PayloadSegment to ReadOnlyMemory<byte>
xljiulang Nov 19, 2024
0a31a61
Optimizing MqttBufferReader
xljiulang Nov 19, 2024
8b89a54
More byte[] parameters or properties are changed to ReadonlyMemory<byte>
xljiulang Nov 19, 2024
b26b4eb
MqttPacketInspector.FillReceiveBuffer: ReadOnlyMemory->ReadOnlySpan
xljiulang Nov 19, 2024
b1a1865
Add high performance payload to json deserialization extension
xljiulang Nov 19, 2024
53242ee
MqttBufferWriter: Rename WriteBinary(byte[],int,int) to Write(Readonl…
xljiulang Nov 20, 2024
42076f4
Using BinaryPrimitives to write length values.
xljiulang Nov 20, 2024
8f76c02
MqttBufferWriter: Use Span<byte> GetSpan(int) to get the buffer to wr…
xljiulang Nov 21, 2024
07772a1
MqttConnectPacket: Restore Password to byte[]? type.
xljiulang Nov 21, 2024
e2ee3f8
MqttConnectPacket: Restore Password to byte[]? type.
xljiulang Nov 21, 2024
f1593ff
MqttClientExtensions: Using ArrayPool makes PublishStringAsync reach …
xljiulang Nov 21, 2024
8f69184
Add 0 allocated MqttPayloadOwnerFactory to build mqtt Payload.
xljiulang Nov 21, 2024
b992a4f
MqttPayloadOwnerFactory: Improve implementation and add unit tests.
xljiulang Nov 21, 2024
7a61bc2
MqttServerExtensions: Add high-performance Inject related extension m…
xljiulang Nov 22, 2024
94f0f9e
Add support for payloadFactory extension methods
xljiulang Nov 22, 2024
41331c5
MqttPayloadOwnerFactory: CancellationToken support.
xljiulang Nov 22, 2024
f2367aa
Add PublishStreamAsync() and InjectStreamAsync()
xljiulang Nov 22, 2024
eb334f0
Improve ConvertPayloadToJson
xljiulang Nov 22, 2024
180b31b
Add CreateJsonReaderOptions
xljiulang Nov 22, 2024
58390e8
Reduce the use of unnecessary MqttApplicationMessageBuilder
xljiulang Nov 28, 2024
6664f15
Update Client_Publish_Samples.
xljiulang Nov 28, 2024
d1792a5
SingleSegmentPayloadOwner: use payloadFactory
xljiulang Nov 29, 2024
0ed0b48
Rpc.ExecuteAsync: Reduce memory allocation.
xljiulang Nov 29, 2024
c7ab618
Add ExecuteTimeOutAsync() extendsion methods.
xljiulang Nov 29, 2024
5a98f70
Merge branch 'dotnet:master' into buffer-reader
xljiulang Dec 1, 2024
a3ac26b
MqttRetainedMessageModel: copy the payload.
xljiulang Dec 1, 2024
0a25ad6
Merge branch 'main' into buffer-reader
xljiulang Dec 2, 2024
5b2a995
MqttBufferWriter: Add GetWrittenArraySegment method.
xljiulang Dec 3, 2024
6394733
Add the BufferSize property, which is used for the pool strategy.
xljiulang Dec 3, 2024
ee6b950
Do not use the PayloadSegment field in anticipation of its removal in…
xljiulang Dec 3, 2024
7ef0a2c
Merge branch 'buffer-reader' of https://github.com/xljiulang/MQTTnet …
xljiulang Dec 3, 2024
4025d18
ValidatingConnectionEventArgs: Restore RawPasswordto byte[]? type.
xljiulang Dec 3, 2024
8aea746
Some PublishAsync and InjectAsync methods remove await and return dir…
xljiulang Dec 3, 2024
c661764
Merge branch 'main' into buffer-reader
xljiulang Dec 4, 2024
a8b6fe9
ConfigureAwait(false)!
xljiulang Dec 4, 2024
22aae2d
MqttChannelAdapter: Use ArrayPool to reduce memory allocation.
xljiulang Dec 5, 2024
20d7866
Fixed the buffer size issue of PacketInspector.
xljiulang Dec 5, 2024
c7a16a9
Add more ExecuteTimeoutAsync overload methods;
xljiulang Dec 5, 2024
5ae8522
Update doc and unit test.
xljiulang Dec 5, 2024
d2c36ee
Enhance unit testing of MqttPayloadOwnerFactory.
xljiulang Dec 5, 2024
8df3a96
Update and fix MessageProcessingBenchmark.
xljiulang Dec 5, 2024
f2a81ea
MqttPacketInspector: add FillReceiveBuffer(ReadOnlySequence<byte>) me…
xljiulang Dec 6, 2024
759a816
Use Pipe to reduce the memory allocation of MqttPacketInspector.
xljiulang Dec 7, 2024
52daba1
InspectPacketAsync() is performed only in the Fill state.
xljiulang Dec 7, 2024
83138b9
Fixed the issue of writing 0-byte Binary.
xljiulang Dec 9, 2024
f82344a
MqttBufferWriter: Add WriteFourByteInteger(uint) method.
xljiulang Dec 9, 2024
cc09847
MqttBufferWriter: Added IBufferWriter<byte> AsLowLevelBufferWriter() …
xljiulang Dec 9, 2024
d0e1ce4
Fixed duplicate write of (MqttPropertyId id, ushort value).
xljiulang Dec 9, 2024
057365d
Merge branch 'dotnet:master' into buffer-reader
xljiulang Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions Samples/Client/Client_Publish_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ public static async Task Publish_Application_Message()

await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("samples/temperature/living_room")
.WithPayload("19.5")
.Build();

await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
await mqttClient.PublishStringAsync(
topic: "samples/temperature/living_room",
payload: "19.5",
cancellationToken: CancellationToken.None);

await mqttClient.DisconnectAsync();

Expand All @@ -61,27 +59,21 @@ public static async Task Publish_Multiple_Application_Messages()
.Build();

await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("samples/temperature/living_room")
.WithPayload("19.5")
.Build();

await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("samples/temperature/living_room")
.WithPayload("20.0")
.Build();

await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("samples/temperature/living_room")
.WithPayload("21.0")
.Build();

await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

await mqttClient.PublishStringAsync(
topic: "samples/temperature/living_room",
payload: "19.5",
cancellationToken: CancellationToken.None);

await mqttClient.PublishStringAsync(
topic: "samples/temperature/living_room",
payload: "20.0",
cancellationToken: CancellationToken.None);

await mqttClient.PublishStringAsync(
topic: "samples/temperature/living_room",
payload: "21.0",
cancellationToken: CancellationToken.None);

await mqttClient.DisconnectAsync();

Expand Down
2 changes: 1 addition & 1 deletion Samples/MQTTnet.Samples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<NuGetAuditMode>all</NuGetAuditMode>
<NuGetAudit>true</NuGetAudit>
<NuGetAuditLevel>low</NuGetAuditLevel>
<AnalysisLevel>latest-Recommended</AnalysisLevel>
<!--<AnalysisLevel>latest-Recommended</AnalysisLevel>-->
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Samples/RpcClient/RpcClient_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static async Task Send_Request()
{
// Access to a fully featured application message is not supported for RPC calls!
// The method will throw an exception when the response was not received in time.
await mqttRpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
await mqttRpcClient.ExecuteTimeOutAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}

Console.WriteLine("The RPC call was successful.");
Expand Down
8 changes: 4 additions & 4 deletions Samples/Server/Server_Retained_Messages_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public static async Task Persist_Retained_Messages()
sealed class MqttRetainedMessageModel
{
public string? ContentType { get; set; }
public byte[]? CorrelationData { get; set; }
public byte[]? Payload { get; set; }
public ReadOnlyMemory<byte> CorrelationData { get; set; }
public ReadOnlySequence<byte> Payload { get; set; }
public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; }
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }
public string? ResponseTopic { get; set; }
Expand All @@ -110,7 +110,7 @@ public static MqttRetainedMessageModel Create(MqttApplicationMessage message)

// Create a copy of the buffer from the payload segment because
// it cannot be serialized and deserialized with the JSON serializer.
Payload = message.Payload.ToArray(),
Payload = new ReadOnlySequence<byte>(message.Payload.ToArray()),
UserProperties = message.UserProperties,
ResponseTopic = message.ResponseTopic,
CorrelationData = message.CorrelationData,
Expand All @@ -128,7 +128,7 @@ public MqttApplicationMessage ToApplicationMessage()
return new MqttApplicationMessage
{
Topic = Topic,
PayloadSegment = new ArraySegment<byte>(Payload ?? Array.Empty<byte>()),
Payload = Payload,
PayloadFormatIndicator = PayloadFormatIndicator,
ResponseTopic = ResponseTopic,
CorrelationData = CorrelationData,
Expand Down
17 changes: 6 additions & 11 deletions Samples/Server/Server_Simple_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,13 @@ public static async Task Publish_Message_From_Broker()
* See _Run_Minimal_Server_ for more information.
*/

using (var mqttServer = await StartMqttServer())
{
// Create a new message using the builder as usual.
var message = new MqttApplicationMessageBuilder().WithTopic("HelloWorld").WithPayload("Test").Build();
using var mqttServer = await StartMqttServer();

// Now inject the new message at the broker.
await mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(message)
{
SenderClientId = "SenderClientId"
});
}
// Now inject the new message at the broker.
await mqttServer.InjectStringAsync(
clientId: "SenderClientId",
topic: "HelloWorld",
payload: "Test");
}

public static async Task Run_Minimal_Server()
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<NuGetAudit>true</NuGetAudit>
<NuGetAuditLevel>low</NuGetAuditLevel>
<NuGetAuditLevel>low</NuGetAuditLevel>
<AnalysisLevel>latest-Recommended</AnalysisLevel>
<!--<AnalysisLevel>latest-Recommended</AnalysisLevel>-->
</PropertyGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)

var span = output.GetSpan(buffer.Length);

buffer.Packet.AsSpan().CopyTo(span);
int offset = buffer.Packet.Count;
buffer.Packet.Span.CopyTo(span);
int offset = buffer.Packet.Length;
buffer.Payload.CopyTo(destination: span.Slice(offset));
output.Advance(buffer.Length);
}
Expand Down
18 changes: 2 additions & 16 deletions Source/MQTTnet.AspnetCore/ReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ public static bool TryDecode(
}

var bodySlice = copy.Slice(0, bodyLength);
var bodySegment = GetArraySegment(ref bodySlice);

var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength);
var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySlice, headerLength + bodyLength);
if (formatter.ProtocolVersion == MqttProtocolVersion.Unknown)
{
formatter.DetectProtocolVersion(receivedMqttPacket);
Expand All @@ -62,19 +60,7 @@ public static bool TryDecode(
bytesRead = headerLength + bodyLength;
return true;
}

static ArraySegment<byte> GetArraySegment(ref ReadOnlySequence<byte> input)
{
if (input.IsSingleSegment && MemoryMarshal.TryGetArray(input.First, out var segment))
{
return segment;
}

// Should be rare
var array = input.ToArray();
return new ArraySegment<byte>(array);
}



static void ThrowProtocolViolationException(ReadOnlySpan<byte> valueSpan, int index)
{
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public void Setup()
_channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetEventLogger());
}

static byte[] Join(params ArraySegment<byte>[] chunks)
static byte[] Join(params ReadOnlyMemory<byte>[] chunks)
{
var buffer = new MemoryStream();
foreach (var chunk in chunks)
{
buffer.Write(chunk.Array, chunk.Offset, chunk.Count);
buffer.Write(chunk.Span);
}

return buffer.ToArray();
Expand Down
82 changes: 82 additions & 0 deletions Source/MQTTnet.Benchmarks/JsonPayloadBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Internal;
using System.Text.Json;
using System.Threading.Tasks;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class JsonPayloadBenchmark : BaseBenchmark
{
[Params(1 * 1024, 4 * 1024, 8 * 1024)]
public int PayloadSize { get; set; }
private Model model;


[GlobalSetup]
public void Setup()
{
var stringValue = new char[PayloadSize];
model = new Model { StringValue = new string(stringValue) };
}

[Benchmark]
public ValueTask SerializeToString_Payload_SendAsync()
{
string payload = JsonSerializer.Serialize(model);
var message = new MqttApplicationMessageBuilder()
.WithTopic("t")
.WithPayload(payload)
.Build();

// send message async
return ValueTask.CompletedTask;
}

[Benchmark]
public ValueTask SerializeToUtf8Bytes_Payload_SendAsync()
{
byte[] payload = JsonSerializer.SerializeToUtf8Bytes(model);
var message = new MqttApplicationMessageBuilder()
.WithTopic("t")
.WithPayload(payload)
.Build();

// send message async
return ValueTask.CompletedTask;
}

[Benchmark(Baseline = true)]
public async ValueTask MqttPayloadOwnerFactory_Payload_SendAsync()
{
await using var payloadOwner = await MqttPayloadOwnerFactory.CreateMultipleSegmentAsync(async writer =>
await JsonSerializer.SerializeAsync(writer.AsStream(leaveOpen: true), model));

var message = new MqttApplicationMessageBuilder()
.WithTopic("t")
.WithPayload(payloadOwner.Payload)
.Build();

// send message async
}



public class Model
{
public string StringValue { get; set; }

public int IntValue { get; set; }

public bool BoolValue { get; set; }

public double DoubleValue { get; set; }
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<NuGetAuditMode>all</NuGetAuditMode>
<NuGetAudit>true</NuGetAudit>
<NuGetAuditLevel>low</NuGetAuditLevel>
<AnalysisLevel>latest-Recommended</AnalysisLevel>
<!--<AnalysisLevel>latest-Recommended</AnalysisLevel>-->
Copy link
Contributor

@mregen mregen Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try this on .NET 9, if you commented it out because of the 100s of warning messages

<PropertyGroup>
  <AnalysisLevel>latest</AnalysisLevel>
  <AnalysisMode>recommended</AnalysisMode>
  <AnalysisModeStyle>default</AnalysisModeStyle>
</PropertyGroup>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's 100s of error messages! We can unify this issue in #2120.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</PropertyGroup>

<ItemGroup>
Expand Down
21 changes: 11 additions & 10 deletions Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,43 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Formatter;
using System;
using System.Runtime.InteropServices;
using System.Text;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[MemoryDiagnoser]
public class MqttBufferReaderBenchmark
{
byte[] _buffer;
int _bufferLength;
ArraySegment<byte> _buffer;

[GlobalSetup]
public void GlobalSetup()
{
var writer = new MqttBufferWriter(1024, 1024);
writer.WriteString("hgfjkdfkjlghfdjghdfljkdfhgdlkjfshgsldkfjghsdflkjghdsflkjhrstiuoghlkfjbhnfbutghjoiöjhklötnbhtroliöuhbjntluiobkjzbhtdrlskbhtruhjkfthgbkftgjhgfiklhotriuöhbjtrsioöbtrsötrhträhtrühjtriüoätrhjtsrölbktrbnhtrulöbionhströloubinströoliubhnsöotrunbtöroisntröointröioujhgötiohjgötorshjnbgtöorihbnjtröoihbjntröobntröoibntrjhötrohjbtröoihntröoibnrtoiöbtrjnboöitrhjtnriohötrhjtöroihjtroöihjtroösibntsroönbotöirsbntöoihjntröoihntroöbtrboöitrnhoöitrhjntröoishbnjtröosbhtröbntriohjtröoijtöoitbjöotibjnhöotirhbjntroöibhnjrtoöibnhtroöibnhtörsbnhtöoirbnhtöroibntoörhjnbträöbtrbträbtrbtirbätrsibohjntrsöiobthnjiohjsrtoib");

_buffer = writer.GetBuffer();
_bufferLength = writer.Length;
if (MemoryMarshal.TryGetArray(writer.GetWrittenMemory(), out var segment))
{
_buffer = segment;
}
}

[Benchmark]
public void Use_Span()
{
var span = _buffer.AsSpan(0, _bufferLength);
Encoding.UTF8.GetString(span);
Encoding.UTF8.GetString(_buffer.AsSpan());
}

[Benchmark]
public void Use_Encoding()
{
Encoding.UTF8.GetString(_buffer, 0, _bufferLength);
Encoding.UTF8.GetString(_buffer.Array, _buffer.Offset, _buffer.Count);
}
}
}
16 changes: 7 additions & 9 deletions Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace MQTTnet.Benchmarks
public class MqttPacketReaderWriterBenchmark : BaseBenchmark
{
readonly byte[] _demoPayload = new byte[1024];
byte[] _readPayload;

ReadOnlyMemory<byte> _readPayload;

[GlobalCleanup]
public void GlobalCleanup()
Expand All @@ -27,7 +27,7 @@ public void GlobalCleanup()
public void GlobalSetup()
{
TestEnvironment.EnableLogger = false;

var writer = new MqttBufferWriter(4096, 65535);
writer.WriteString("A relative short string.");
writer.WriteBinary(_demoPayload);
Expand All @@ -42,18 +42,16 @@ public void GlobalSetup()
writer.WriteString("fjgffiogfhgfhoihgoireghreghreguhreguireoghreouighreouighreughreguiorehreuiohruiorehreuioghreug");
writer.WriteBinary(_demoPayload);

_readPayload = new ArraySegment<byte>(writer.GetBuffer(), 0, writer.Length).ToArray();
_readPayload = writer.GetWrittenMemory();
}

[Benchmark]
public void Read_100_000_Messages()
{
var reader = new MqttBufferReader();
reader.SetBuffer(_readPayload, 0, _readPayload.Length);

for (var i = 0; i < 100000; i++)
{
reader.Seek(0);
var reader = new MqttBufferReader();
reader.SetBuffer(_readPayload);

reader.ReadString();
reader.ReadBinaryData();
Expand All @@ -69,7 +67,7 @@ public void Read_100_000_Messages()
reader.ReadBinaryData();
}
}

[Benchmark]
public void Write_100_000_Messages()
{
Expand Down
Loading
Loading