Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Demo integration #160

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/fiskaltrust.Launcher/ProcessHost/ProcessHostPlebeian.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,15 @@ private async Task StartHosting(string[] uris)
await _hosting.HostService(url, hostingType.Value, (IPOS)instance, addEndpoints);
break;
case PackageType.Helper:
await _hosting.HostService(url, hostingType.Value, (IHelper)instance, addEndpoints);
if (_packageConfiguration.Package == "fiskaltrust.Middleware.Helper.PosApi")
{
(object instancePOS, Action<WebApplication> addPosEndpoints, Type instancePosInterface) = GetHelperIPOS((IPOS)instance);
await _hosting.HostService(url, hostingType.Value, (IPOS)instancePOS, addPosEndpoints);
}
else
{
await _hosting.HostService(url, hostingType.Value, (IHelper)instance, addEndpoints);
}
break;
default:
throw new NotImplementedException();
Expand All @@ -180,6 +188,11 @@ private async Task StartHosting(string[] uris)
}
}

private static (object, Action<WebApplication>, Type) GetHelperIPOS(IPOS instance)
{
return (instance, (WebApplication app) => app.AddQueueEndpoints(instance), typeof(IPOS));
}

private static (object, Action<WebApplication>, Type) GetQueue(IServiceProvider services)
{
var queue = services.GetRequiredService<IPOS>();
Expand Down
55 changes: 55 additions & 0 deletions src/fiskaltrust.Launcher/Services/HostingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
using System.Runtime.Versioning;
using Microsoft.AspNetCore.Server.HttpSys;
using System.Text.Json;
using MQTTnet;
using MQTTnet.Client;
using System.Text;
using Newtonsoft.Json;
using fiskaltrust.ifPOS.v1;
using MQTTnet.Server;

namespace fiskaltrust.Launcher.Services
{
Expand All @@ -32,6 +38,7 @@ public class HostingService
private readonly long _messageSize = 16 * 1024 * 1024;
private readonly TimeSpan _sendTimeout = TimeSpan.FromSeconds(15);
private readonly TimeSpan _receiveTimeout = TimeSpan.FromDays(20);
private IMqttClient _mqttClient;

public HostingService(ILogger<HostingService> logger, PackageConfiguration packageConfiguration, LauncherConfiguration launcherConfiguration, IProcessHostService? processHostService = null)
{
Expand Down Expand Up @@ -100,6 +107,11 @@ public async Task<WebApplication> HostService<T>(Uri uri, HostingType hostingTyp
_logger.LogWarning($"{nameof(_launcherConfiguration.UseHttpSysBinding)} is only supported on Windows.");
}
}
//if (instance is IPOS pos)
//{
// _logger.LogInformation("Setup MQTT");
// await HandleMQTTConnection(pos);
//}

// Create the appropriate host based on the hosting type
switch (hostingType)
Expand Down Expand Up @@ -242,6 +254,49 @@ private BasicHttpBinding CreateBasicHttpBinding(BasicHttpSecurityMode securityMo
return binding;
}

private async Task HandleMQTTConnection(IPOS pos)
{
var mqttFactory = new MqttFactory();

var clientId = _packageConfiguration.Id;

_mqttClient = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(clientId.ToString())
.WithCleanSession(false)
.WithoutThrowOnNonSuccessfulConnectResponse()
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithWebSocketServer(o => o.WithUri("gateway-sandbox.fiskaltrust.eu:80/mqtt"))
.Build();

_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
await e.AcknowledgeAsync(CancellationToken.None);
await _mqttClient.PublishStringAsync(e.ApplicationMessage.ResponseTopic, JsonConvert.SerializeObject(new SignRequestAccepted()), MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);

_logger.LogInformation("New mqtt message arrived {message}", Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
var result = await pos.SignAsync(JsonConvert.DeserializeObject<ReceiptRequest>(Encoding.UTF8.GetString(e.ApplicationMessage.Payload)));
await _mqttClient.PublishStringAsync(e.ApplicationMessage.ResponseTopic + "/done", JsonConvert.SerializeObject(new SignRequestDoneMessage(result)), MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
};

var result = await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
_logger.LogInformation("Connectionresult: {result}", JsonConvert.SerializeObject(result));

var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic($"{_launcherConfiguration.CashboxId}/signrequest");
})
.Build();
_logger.LogInformation("MQTT subscribed to topic {topic}", string.Join(",", mqttSubscribeOptions.TopicFilters));
await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
}

record SignRequestAccepted();

record SignRequestDoneMessage(ReceiptResponse response);

private WebApplication CreateGrpcHost<T>(WebApplicationBuilder builder, Uri uri, T instance) where T : class
{
if (OperatingSystem.IsWindows() && _launcherConfiguration.UseHttpSysBinding!.Value)
Expand Down
15 changes: 8 additions & 7 deletions src/fiskaltrust.Launcher/fiskaltrust.Launcher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(SelfUpdate)'=='true' or '$(SelfUpdate)'==''">
<DefineConstants>$(DefineConstants);EnableSelfUpdate</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(SelfUpdate)'=='true' or '$(SelfUpdate)'==''">
<DefineConstants>$(DefineConstants);EnableSelfUpdate</DefineConstants>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CoreWCF.Http" Version="1.4.1" />
<PackageReference Include="CoreWCF.NetTcp" Version="1.4.1" />
<PackageReference Include="fiskaltrust.interface" Version="1.3.50-rc1" />
<PackageReference Include="fiskaltrust.Middleware.Abstractions" Version="1.3.3" />

<PackageReference Include="MQTTnet" Version="4.3.3.952" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Soap" Version="1.3.50-rc2" />
<PackageReference Include="fiskaltrust.storage.serialization" Version="1.3.47" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Grpc" Version="1.3.50-rc2" />
<PackageReference Include="fiskaltrust.Middleware.Interface.Client.Http" Version="1.3.50-rc2" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.3.952" />
<PackageReference Include="Polly" Version="7.2.4" />
<PackageReference Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageReference Include="Serilog.AspNetCore" Version="7.0.0" />
Expand All @@ -37,7 +38,7 @@
<PackageReference Include="DiffPlex" Version="1.7.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../fiskaltrust.Launcher.Common/fiskaltrust.Launcher.Common.csproj" PrivateAssets="all" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../fiskaltrust.Launcher.Common/fiskaltrust.Launcher.Common.csproj" PrivateAssets="all" />
</ItemGroup>
</Project>
Loading