Skip to content

Commit

Permalink
Add Service Bus emulator support (#6737)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastienros authored Jan 6, 2025
1 parent c934862 commit 4494825
Show file tree
Hide file tree
Showing 42 changed files with 2,306 additions and 120 deletions.
17 changes: 17 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DaprServiceC", "playground\
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Dashboard", "Dashboard", "{830F7CA9-8E51-4D62-832F-91F53F85B7AE}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureServiceBus", "AzureServiceBus", "{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBusWorker", "playground\AzureServiceBus\ServiceBusWorker\ServiceBusWorker.csproj", "{162F0B66-E88F-4735-8CE0-BE8950F74CC6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBus.AppHost", "playground\AzureServiceBus\ServiceBus.AppHost\ServiceBus.AppHost.csproj", "{A7EC9111-F3CC-46E8-B95E-3768481D67B4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1653,6 +1659,14 @@ Global
{B26653B9-439E-4850-A7F8-43C6E5121952}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.Build.0 = Release|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Release|Any CPU.Build.0 = Release|Any CPU
{A7EC9111-F3CC-46E8-B95E-3768481D67B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A7EC9111-F3CC-46E8-B95E-3768481D67B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7EC9111-F3CC-46E8-B95E-3768481D67B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7EC9111-F3CC-46E8-B95E-3768481D67B4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1956,6 +1970,9 @@ Global
{042DD8C6-A26C-4B06-80A1-FE7F8659C5BC} = {B7345F72-712F-436C-AE18-CAF7CDD4A990}
{B26653B9-439E-4850-A7F8-43C6E5121952} = {57A42144-739E-49A7-BADB-BB8F5F20FA17}
{830F7CA9-8E51-4D62-832F-91F53F85B7AE} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{162F0B66-E88F-4735-8CE0-BE8950F74CC6} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
{A7EC9111-F3CC-46E8-B95E-3768481D67B4} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
#if !SKIP_PROVISIONED_AZURE_RESOURCE
using Azure.Messaging.ServiceBus;
#endif
using Azure.Storage.Blobs;
using Azure.Storage.Queues;

Expand All @@ -15,9 +13,7 @@
builder.AddAzureQueueClient("queue");
builder.AddAzureBlobClient("blob");
builder.AddAzureEventHubProducerClient("eventhubs", static settings => settings.EventHubName = "myhub");
#if !SKIP_PROVISIONED_AZURE_RESOURCE
builder.AddAzureServiceBusClient("messaging");
#endif

var app = builder.Build();

Expand Down Expand Up @@ -56,15 +52,13 @@ static string RandomString(int length)
return Results.Ok("Message sent to Azure EventHubs.");
});

#if !SKIP_PROVISIONED_AZURE_RESOURCE
app.MapGet("/publish/asb", async (ServiceBusClient client, CancellationToken cancellationToken, int length = 20) =>
{
var sender = client.CreateSender("myqueue");
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(RandomString(length)));
await sender.SendMessageAsync(message, cancellationToken);
return Results.Ok("Message sent to Azure Service Bus.");
});
#endif

app.MapGet("/", async (HttpClient client) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,18 @@
var queue = storage.AddQueues("queue");
var blob = storage.AddBlobs("blob");
var eventHubs = builder.AddAzureEventHubs("eventhubs").RunAsEmulator().AddEventHub("myhub");

#if !SKIP_PROVISIONED_AZURE_RESOURCE
var serviceBus = builder.AddAzureServiceBus("messaging").AddQueue("myqueue");
#endif
var serviceBus = builder.AddAzureServiceBus("messaging").RunAsEmulator().WithQueue("myqueue");

var funcApp = builder.AddAzureFunctionsProject<Projects.AzureFunctionsEndToEnd_Functions>("funcapp")
.WithExternalHttpEndpoints()
.WithReference(eventHubs)
#if !SKIP_PROVISIONED_AZURE_RESOURCE
.WithReference(serviceBus)
#endif
.WithReference(eventHubs).WaitFor(eventHubs)
.WithReference(serviceBus).WaitFor(serviceBus)
.WithReference(blob)
.WithReference(queue);

builder.AddProject<Projects.AzureFunctionsEndToEnd_ApiService>("apiservice")
.WithReference(eventHubs)
#if !SKIP_PROVISIONED_AZURE_RESOURCE
.WithReference(serviceBus)
#endif
.WithReference(eventHubs).WaitFor(eventHubs)
.WithReference(serviceBus).WaitFor(serviceBus)
.WithReference(queue)
.WithReference(blob)
.WithReference(funcApp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
using System.Globalization;
using System.Text;
using Azure.Messaging.EventHubs.Producer;
#if !SKIP_PROVISIONED_AZURE_RESOURCE
using Azure.Messaging.ServiceBus;
#endif
using Azure.Storage.Blobs;
using Azure.Storage.Queues;
using Microsoft.AspNetCore.Http;
Expand All @@ -17,9 +15,7 @@ namespace AzureFunctionsEndToEnd.Functions;

public class MyHttpTrigger(
ILogger<MyHttpTrigger> logger,
#if !SKIP_PROVISIONED_AZURE_RESOURCE
ServiceBusClient serviceBusClient,
#endif
EventHubProducerClient eventHubProducerClient,
QueueServiceClient queueServiceClient,
BlobServiceClient blobServiceClient)
Expand All @@ -29,9 +25,7 @@ public IResult Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] Ht
{
logger.LogInformation("C# HTTP trigger function processed a request.");
var stringBuilder = new StringBuilder();
#if !SKIP_PROVISIONED_AZURE_RESOURCE
stringBuilder.AppendLine(CultureInfo.InvariantCulture, $"Aspire-injected ServiceBusClient namespace: {serviceBusClient.FullyQualifiedNamespace}");
#endif
stringBuilder.AppendLine(CultureInfo.InvariantCulture, $"Aspire-injected EventHubProducerClient namespace: {eventHubProducerClient.FullyQualifiedNamespace}");
stringBuilder.AppendLine(CultureInfo.InvariantCulture, $"Aspire-injected QueueServiceClient URI: {queueServiceClient.Uri}");
stringBuilder.AppendLine(CultureInfo.InvariantCulture, $"Aspire-injected BlobServiceClient URI: {blobServiceClient.Uri}");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#if !SKIP_PROVISIONED_AZURE_RESOURCE
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

Expand All @@ -11,11 +10,10 @@ namespace AzureFunctionsEndToEnd.Functions;
public class MyServiceBusTrigger(ILogger<MyServiceBusTrigger> logger)
{
[Function(nameof(MyServiceBusTrigger))]
public void Run([ServiceBusTrigger("myqueue", Connection = "messaging" )] ServiceBusReceivedMessage message)
public void Run([ServiceBusTrigger("myqueue", Connection = "messaging")] ServiceBusReceivedMessage message)
{
logger.LogInformation("Message ID: {id}", message.MessageId);
logger.LogInformation("Message Body: {body}", message.Body);
logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
builder.AddAzureQueueClient("queue");
builder.AddAzureBlobClient("blob");
builder.AddAzureEventHubProducerClient("eventhubs", static settings => settings.EventHubName = "myhub");
#if !SKIP_PROVISIONED_AZURE_RESOURCE
builder.AddAzureServiceBusClient("messaging");
#endif

builder.ConfigureFunctionsWebApplication();

Expand Down
47 changes: 47 additions & 0 deletions playground/AzureServiceBus/ServiceBus.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System.Text.Json.Nodes;
using Aspire.Hosting.Azure.ServiceBus;

var builder = DistributedApplication.CreateBuilder(args);

var serviceBus = builder.AddAzureServiceBus("sbemulator");

serviceBus
.WithQueue("queue1", queue =>
{
queue.DeadLetteringOnMessageExpiration = false;
})
.WithTopic("topic1", topic =>
{
var subscription = new ServiceBusSubscription("sub1")
{
MaxDeliveryCount = 10,
};
topic.Subscriptions.Add(subscription);

var rule = new ServiceBusRule("app-prop-filter-1")
{
CorrelationFilter = new()
{
ContentType = "application/text",
CorrelationId = "id1",
Subject = "subject1",
MessageId = "msgid1",
ReplyTo = "someQueue",
ReplyToSessionId = "sessionId",
SessionId = "session1",
SendTo = "xyz"
}
};
subscription.Rules.Add(rule);
})
;

serviceBus.RunAsEmulator(configure => configure.ConfigureEmulator(document =>
{
document["UserConfig"]!["Logging"] = new JsonObject { ["Type"] = "Console" };
}));

builder.AddProject<Projects.ServiceBusWorker>("worker")
.WithReference(serviceBus).WaitFor(serviceBus);

builder.Build().Run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",

"profiles": {
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "https://localhost:15887;http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:16175",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:17037",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true"
}
},
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16175",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:17038",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true",
"ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true"
}
},
"generate-manifest": {
"commandName": "Project",
"launchBrowser": true,
"dotnetRunMessages": true,
"commandLineArgs": "--publisher manifest --output-path aspire-manifest.json",
"applicationUrl": "http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16175"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsAspireHost>true</IsAspireHost>
<UserSecretsId>c12f723f-2545-4f8f-8c3b-fb7bdeadbd55</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.Hosting.AppHost" />
<AspireProjectOrPackageReference Include="Aspire.Hosting.Azure.ServiceBus" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ServiceBusWorker\ServiceBusWorker.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Aspire.Hosting.Dcp": "Warning"
}
}
}
43 changes: 43 additions & 0 deletions playground/AzureServiceBus/ServiceBusWorker/Consumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Text;
using Azure.Messaging.ServiceBus;

namespace ServiceBusWorker;

internal sealed class Consumer(ServiceBusClient client, ILogger<Consumer> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var processor = client.CreateProcessor("queue1", new ServiceBusProcessorOptions
{
AutoCompleteMessages = true,
MaxConcurrentCalls = 1, // Process one message at a time
});

processor.ProcessMessageAsync += MessageHandler;

processor.ProcessErrorAsync += ErrorHandler;

await processor.StartProcessingAsync(cancellationToken);
}

private Task MessageHandler(ProcessMessageEventArgs args)
{
// Process the message
logger.LogInformation("Received message: {Message}", Encoding.UTF8.GetString(args.Message.Body));

return Task.CompletedTask;
}

private Task ErrorHandler(ProcessErrorEventArgs args)
{
logger.LogError(args.Exception, "Error processing message");

return Task.CompletedTask;
}

public override Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping consumer...");
return Task.CompletedTask;
}
}
30 changes: 30 additions & 0 deletions playground/AzureServiceBus/ServiceBusWorker/Producer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using Azure.Messaging.ServiceBus;

namespace ServiceBusWorker;

internal sealed class Producer(ServiceBusClient client, ILogger<Producer> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Starting producer...");

await using var sender = client.CreateSender("queue1");

var periodicTimer = new PeriodicTimer(TimeSpan.FromSeconds(5));

while (!cancellationToken.IsCancellationRequested)
{
await periodicTimer.WaitForNextTickAsync(cancellationToken);

await sender.SendMessageAsync(new ServiceBusMessage($"Hello, World! It's {DateTime.Now} here."), cancellationToken);
}
}

public override Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping producer...");
return Task.CompletedTask;
}
}
14 changes: 14 additions & 0 deletions playground/AzureServiceBus/ServiceBusWorker/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using ServiceBusWorker;

var builder = Host.CreateApplicationBuilder(args);

builder.AddServiceDefaults();

builder.AddAzureServiceBusClient("sbemulator");

builder.Services.AddHostedService<Consumer>();
builder.Services.AddHostedService<Producer>();

var host = builder.Build();

await host.RunAsync();
Loading

0 comments on commit 4494825

Please sign in to comment.