Skip to content

Commit

Permalink
Added AllowPacketFragmentationSelector option.
Browse files Browse the repository at this point in the history
  • Loading branch information
xljiulang committed Dec 7, 2024
1 parent 8566fd5 commit 841334a
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 1 deletion.
9 changes: 9 additions & 0 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public bool IsSecureConnection
}
}

public bool IsWebSocketConnection
{
get
{
var httpFeature = _connection.Features.Get<IHttpContextFeature>();
return httpFeature != null && httpFeature.HttpContext != null && httpFeature.HttpContext.WebSockets.IsWebSocketRequest;
}
}

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public async Task ConnectAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Benchmarks/SerializerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public BenchmarkMqttChannel(ArraySegment<byte> buffer)

public X509Certificate2 ClientCertificate { get; }

public bool IsWebSocketConnection => false;

public void Reset()
{
_position = _buffer.Offset;
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public ValidatingConnectionEventArgs(MqttConnectPacket connectPacket, IMqttChann

public bool IsSecureConnection => ChannelAdapter.IsSecureConnection;

public bool IsWebSocketConnection => ChannelAdapter.IsWebSocketConnection;

/// <summary>
/// Gets or sets the keep alive period.
/// The connection is normally left open by the client so that is can send and receive data at any time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,14 @@ await sslStream.AuthenticateAsServerAsync(

using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, _rootLogger))
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation;
if (_options.AllowPacketFragmentationSelector == null)
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation;
}
else
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentationSelector(clientAdapter);
}
await clientHandler(clientAdapter).ConfigureAwait(false);
}
}
Expand Down
8 changes: 8 additions & 0 deletions Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Security.Authentication;
using MQTTnet.Certificates;
using System.Security.Cryptography.X509Certificates;
using MQTTnet.Adapter;

// ReSharper disable UnusedMember.Global
namespace MQTTnet.Server
Expand Down Expand Up @@ -136,6 +137,13 @@ public MqttServerOptionsBuilder WithoutPacketFragmentation()
{
_options.DefaultEndpointOptions.AllowPacketFragmentation = false;
_options.TlsEndpointOptions.AllowPacketFragmentation = false;
return WithPacketFragmentationSelector(null);
}

public MqttServerOptionsBuilder WithPacketFragmentationSelector(Func<IMqttChannelAdapter, bool> selector)
{
_options.DefaultEndpointOptions.AllowPacketFragmentationSelector = selector;
_options.TlsEndpointOptions.AllowPacketFragmentationSelector = selector;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using MQTTnet.Adapter;
using System.Net;
using System.Net.Sockets;

Expand Down Expand Up @@ -31,6 +32,12 @@ public abstract class MqttServerTcpEndpointBaseOptions
/// </summary>
public bool AllowPacketFragmentation { get; set; } = true;

/// <summary>
/// Select whether to AllowPacketFragmentation for an <see cref="IMqttChannelAdapter"/>.
/// Its priority is higher than the <see cref="AllowPacketFragmentation"/>.
/// </summary>
public Func<IMqttChannelAdapter, bool> AllowPacketFragmentationSelector { get; set; }

/// <summary>
/// Gets or sets the TCP keep alive interval.
/// The value _null_ indicates that the OS and framework defaults should be used.
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public MemoryMqttChannel(byte[] buffer)

public X509Certificate2 ClientCertificate { get; }

public bool IsWebSocketConnection => false;

public Task ConnectAsync(CancellationToken cancellationToken)
{
return CompletedTask.Instance;
Expand Down
12 changes: 12 additions & 0 deletions Source/MQTTnet.Tests/Server/Events_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public async Task Fire_Client_Connected_Event()
return CompletedTask.Instance;
};

ValidatingConnectionEventArgs validatingEventArgs = null;
server.ValidatingConnectionAsync += e =>
{
validatingEventArgs = e;
return CompletedTask.Instance;
};


await testEnvironment.ConnectClient(o => o.WithCredentials("TheUser"));

await LongTestDelay();
Expand All @@ -39,6 +47,10 @@ public async Task Fire_Client_Connected_Event()
Assert.IsTrue(eventArgs.RemoteEndPoint.ToString().Contains("127.0.0.1"));
Assert.AreEqual(MqttProtocolVersion.V311, eventArgs.ProtocolVersion);
Assert.AreEqual("TheUser", eventArgs.UserName);

Assert.AreEqual("TheUser", validatingEventArgs.UserName);
Assert.IsFalse(validatingEventArgs.IsSecureConnection);
Assert.IsFalse(validatingEventArgs.IsWebSocketConnection);
}
}

Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public interface IMqttChannelAdapter : IDisposable

bool IsSecureConnection { get; }

bool IsWebSocketConnection { get; }

MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

Task ConnectAsync(CancellationToken cancellationToken);
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packe

public bool IsSecureConnection => _channel.IsSecureConnection;

public bool IsWebSocketConnection => _channel.IsWebSocketConnection;

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public MqttPacketInspector PacketInspector { get; set; }
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Channel/IMqttChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface IMqttChannel : IDisposable

bool IsSecureConnection { get; }

bool IsWebSocketConnection { get; }

Task ConnectAsync(CancellationToken cancellationToken);

Task DisconnectAsync(CancellationToken cancellationToken);
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Implementations/MqttTcpChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public MqttTcpChannel(Stream stream, EndPoint remoteEndPoint, X509Certificate2 c

public bool IsSecureConnection { get; }

public bool IsWebSocketConnection => false;

public async Task ConnectAsync(CancellationToken cancellationToken)
{
CrossPlatformSocket socket = null;
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public MqttWebSocketChannel(WebSocket webSocket, EndPoint remoteEndPoint, bool i

public bool IsSecureConnection { get; private set; }

public bool IsWebSocketConnection => true;

public async Task ConnectAsync(CancellationToken cancellationToken)
{
var uri = _options.Uri;
Expand Down

0 comments on commit 841334a

Please sign in to comment.