Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ChannelExecutor #4882

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/articles/actors/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Some dispatcher configurations are available out-of-the-box for convenience. You
* **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher).
* **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher).
* **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher).
* **channel-executor** - new as of v1.4.19, the [`ChannelExecutor`](#channelexecutor) is used to run on top of the .NET `ThreadPool` and allow Akka.NET to dynamically scale thread usage up and down with demand in exchange for better CPU and throughput performance.

## Built-in Dispatchers

Expand Down Expand Up @@ -165,6 +166,63 @@ private void Form1_Load(object sender, System.EventArgs e)
}
```

### `ChannelExecutor`
In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecutor` - a new dispatcher type that re-uses the same configuration as a `ForkJoinDispatcher` but runs entirely on top of the .NET `ThreadPool` and is able to take advantage of dynamic thread pool scaling to size / resize workloads on the fly.

During its initial development and benchmarks, we observed the following:

1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited.
2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows.

> [!NOTE]
> We are in the process of gathering data from users on how well `ChannelExecutor` performs in the real world. If you are interested in trying out the `ChannelExecutor`, please read the directions in this document and then comment on [the "Akka.NET v1.4.19: ChannelExecutor performance data" discussion thread](https://github.com/akkadotnet/akka.net/discussions/4983).

The `ChannelExectuor` re-uses the same threading settings as the `ForkJoinExecutor` to determine its effective upper and lower parallelism limits, and you can configure the `ChannelExecutor` to run inside your `ActorSystem` via the following HOCON configuration:

```
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor { #channelexecutor will re-use these settings
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}

akka.actor.internal-dispatcher = {
executor = channel-executor
throughput = 5
fork-join-executor {
parallelism-min = 4
parallelism-factor = 1.0
parallelism-max = 64
}
}

akka.remote.default-remote-dispatcher {
type = Dispatcher
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}

akka.remote.backoff-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
```

This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`.

> [!IMPORTANT]
> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`.

#### Common Dispatcher Configuration

The following configuration keys are available for any dispatcher configuration:
Expand Down
1 change: 1 addition & 0 deletions src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Configuration;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;

Expand Down
3 changes: 2 additions & 1 deletion src/benchmark/RemotePingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static uint CpuSpeed()
public static Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port)
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
akka {
actor.provider = remote
loglevel = ERROR
suppress-json-serializer-warning = on
Expand All @@ -57,6 +57,7 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr
port = 0
hostname = ""localhost""
}

}
}");

Expand Down
26 changes: 23 additions & 3 deletions src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public StressSpecConfig()
convergence-within-factor = 1.0
}
akka.actor.provider = cluster

akka.cluster {
failure-detector.acceptable-heartbeat-pause = 3s
downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster""
Expand All @@ -86,10 +87,29 @@ public StressSpecConfig()
akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""]
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.actor.default-dispatcher.fork-join-executor {
parallelism - min = 8
parallelism - max = 8
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.actor.internal-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.remote.default-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
");

TestTransport = true;
Expand Down
21 changes: 9 additions & 12 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -578,23 +578,20 @@ akka {

### Default dispatcher for the remoting subsystem

### Default dispatcher for the remoting subsystem

default-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really the right configuration, rather than a hard thread count of 4.

parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}

backoff-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,17 @@ protected override MessageDispatcherConfigurator Configurator()
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}

public class ChannelDispatcherExecutorThroughputSpec : WarmDispatcherThroughputSpecBase
{
public static Config DispatcherConfiguration => ConfigurationFactory.ParseString(@"
id = PerfTest
executor = channel-executor
");

protected override MessageDispatcherConfigurator Configurator()
{
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}
}
4 changes: 4 additions & 0 deletions src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public static ActorSystem Create(string name)

private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup)
{
// allows the ThreadPool to scale up / down dynamically
// by removing minimum thread count, which in our benchmarks
// appears to negatively impact performance
ThreadPool.SetMinThreads(0, 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The differences in the benchmarks with this and without it are massive. However, if this causes problems for users they can easily reset it by calling ThreadPool.SetMinThreads(yourValue, yourValue) after the ActorSystem is created. I doubt many users will need to do that and changing this default likely works to the benefit of most.

var system = new ActorSystemImpl(name, withFallback, setup, Option<Props>.None);
system.Start();
return system;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == '$(NetStandardLibVersion)'">
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr
public IDispatcherPrerequisites Prerequisites { get; private set; }
}

internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to configure the TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));

{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var fje = config.GetConfig("fork-join-executor");
MaxParallelism = ThreadPoolConfig.ScaledPoolSize(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-use ForkJoinDispatcher configuration block, since it expresses what we need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might actually need to copy HOCON lines over and make them part of the executor config in the future, because having one part of the system depending on a config of another part can be a point of confusion for the end user

fje.GetInt("parallelism-min"),
fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
fje.GetInt("parallelism-max"));
}

public int MaxParallelism {get;}

public override ExecutorService Produce(string id)
{
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug event you can listen for if you want to be certain that the ChannelExecutor is loaded.

return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));
}
}

/// <summary>
/// INTERNAL API
///
Expand Down Expand Up @@ -306,6 +326,8 @@ protected ExecutorServiceConfigurator ConfigureExecutor()
return new CurrentSynchronizationContextExecutorServiceFactory(Config, Prerequisites);
case "task-executor":
return new DefaultTaskSchedulerExecutorConfigurator(Config, Prerequisites);
case "channel-executor":
return new ChannelExecutorConfigurator(Config, Prerequisites);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how we actually pass the new executor into dispatcher configurations.

default:
Type executorConfiguratorType = Type.GetType(executor);
if (executorConfiguratorType == null)
Expand Down
87 changes: 84 additions & 3 deletions src/core/Akka/Dispatch/Dispatchers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -91,6 +92,86 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id)
}
}

/// <summary>
/// INTERNAL API
///
/// Used to power <see cref="ChannelExecutorConfigurator"/>
/// </summary>
internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler
{

[ThreadStatic]
private static bool _threadRunning = false;
private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();

private int _readers = 0;

public FixedConcurrencyTaskScheduler(int degreeOfParallelism)
{
MaximumConcurrencyLevel = degreeOfParallelism;
}


public override int MaximumConcurrencyLevel { get; }

/// <summary>
/// ONLY USED IN DEBUGGER - NO PERF IMPACT.
/// </summary>
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}

protected override bool TryDequeue(Task task)
{
return false;
}

protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
if (_readers < MaximumConcurrencyLevel)
{
var initial = _readers;
var newVale = _readers + 1;
if (initial == Interlocked.CompareExchange(ref _readers, newVale, initial))
{
// try to start a new worker
ThreadPool.UnsafeQueueUserWorkItem(_ => ReadChannel(), null);
}
}
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_threadRunning) return false;
return TryExecuteTask(task);
}

public void ReadChannel()
{
_threadRunning = true;
try
{
while (_tasks.TryDequeue(out var runnable))
{
base.TryExecuteTask(runnable);
}
}
catch
{
// suppress exceptions
}
finally
{
Interlocked.Decrement(ref _readers);

_threadRunning = false;
}
}
}


/// <summary>
/// INTERNAL API
Expand Down Expand Up @@ -273,7 +354,7 @@ public MessageDispatcher DefaultGlobalDispatcher
internal MessageDispatcher InternalDispatcher { get; }

/// <summary>
/// The <see cref="Hocon.Config"/> for the default dispatcher.
/// The <see cref="Configuration.Config"/> for the default dispatcher.
/// </summary>
public Config DefaultDispatcherConfig
{
Expand Down Expand Up @@ -336,7 +417,7 @@ public bool HasDispatcher(string id)
private MessageDispatcherConfigurator LookupConfigurator(string id)
{
var depth = 0;
while(depth < MaxDispatcherAliasDepth)
while (depth < MaxDispatcherAliasDepth)
{
if (_dispatcherConfigurators.TryGetValue(id, out var configurator))
return configurator;
Expand Down Expand Up @@ -374,7 +455,7 @@ private MessageDispatcherConfigurator LookupConfigurator(string id)
/// <summary>
/// INTERNAL API
///
/// Creates a dispatcher from a <see cref="Hocon.Config"/>. Internal test purpose only.
/// Creates a dispatcher from a <see cref="Configuration.Config"/>. Internal test purpose only.
/// <code>
/// From(Config.GetConfig(id));
/// </code>
Expand Down