Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port scala akka PR #26816 to Akka.NET #4511

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
91adfb9
Port scala akka PR #26816 to Akka.NET
Arkatufus Jul 10, 2020
9a7a041
Remove SynchronizationContext usage
Arkatufus Jul 13, 2020
ff7d06a
Use internal dispatcher in other part of Akka.NET, update documentation
Arkatufus Jul 13, 2020
d9a0e9c
Port internal blocking dispatcher and default internal dispatcher for…
Arkatufus Jul 14, 2020
1d6e588
Add circular dispatcher alias reference checking, unroll recursion
Arkatufus Jul 14, 2020
9acb12a
Make ClusterSingletonManager and Proxy to use internal dispatchers
Arkatufus Jul 14, 2020
e76172b
Update API approver list
Arkatufus Jul 14, 2020
14ce58b
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Arkatufus Jul 14, 2020
9d7ebb1
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Arkatufus Jul 15, 2020
7af7a15
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Arkatufus Jul 16, 2020
5fe55d4
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Arkatufus Jul 17, 2020
cc99809
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Aaronontheweb Jul 20, 2020
32a0736
Remove SynchronizationContext
Arkatufus Jul 21, 2020
8f8f6f0
Clean up Pigeon.conf config file
Arkatufus Jul 21, 2020
c7237bf
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Arkatufus Jul 21, 2020
f6745fa
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Aaronontheweb Jul 23, 2020
00884de
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Aaronontheweb Oct 5, 2020
eea49ff
Merge branch 'dev' into Port_#26816_Internal_dispatcher
Aaronontheweb Oct 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/articles/actors/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ system.ActorOf(Props.Create<MyActor>().WithDispatcher("my-dispatcher"), "my-acto
Some dispatcher configurations are available out-of-the-box for convenience. You can use them during actor deployment, [as described above](#configuring-dispatchers).

* **default-dispatcher** - A configuration that uses the [ThreadPoolDispatcher](#threadpooldispatcher). As the name says, this is the default dispatcher configuration used by the global dispatcher, and you don't need to define anything during deployment to use it.
* **internal-dispatcher** - To protect the internal Actors that is spawned by the various Akka modules, a separate internal dispatcher is used by default.
* **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher).
* **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher).
* **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher).
Expand Down Expand Up @@ -174,3 +175,10 @@ The following configuration keys are available for any dispatcher configuration:

> [!NOTE]
> The throughput-deadline-time is used as a *best effort*, not as a *hard limit*. This means that if a message takes more time than the deadline allows, Akka.NET won't interrupt the process. Instead it will wait for it to finish before giving turn to the next actor.

## Dispatcher aliases

When a dispatcher is looked up, and the given setting contains a string rather than a dispatcher config block,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

the lookup will treat it as an alias, and follow that string to an alternate location for a dispatcher config.
If the dispatcher config is referenced both through an alias and through the absolute path only one dispatcher will
be used and shared among the two ids.
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2020 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.4.8</VersionPrefix>
<VersionPrefix>1.4.9</VersionPrefix>
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public ClusterSharding(ExtendedActorSystem system)
{
var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name");
var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher");
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - looks like all of the sharding /system actors (the guardians and coordinators) are supposed to run on the internal dispatcher

return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ akka.cluster.sharding {
coordinator-singleton = "akka.cluster.singleton"

# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private IActorRef CreateReceptionist()
{
var name = _config.GetString("name");
var dispatcher = _config.GetString("use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


// important to use var mediator here to activate it outside of ClusterReceptionist constructor
var mediator = PubSubMediator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ akka.cluster.client.receptionist {
response-tunnel-receive-timeout = 30s

# The id of the dispatcher to use for ClusterReceptionist actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private IActorRef CreateMediator()
var name = _system.Settings.Config.GetString("akka.cluster.pub-sub.name");
var dispatcher = _system.Settings.Config.GetString("akka.cluster.pub-sub.use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher))
dispatcher = Dispatchers.DefaultDispatcherId;
dispatcher = Dispatchers.InternalDispatcherId;

return _system.SystemActorOf(
Props.Create(() => new DistributedPubSubMediator(_settings))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ akka.cluster.pub-sub {
max-delta-elements = 3000

# The id of the dispatcher to use for DistributedPubSubMediator actors.
# If not specified default dispatcher is used.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Coordination;
using Akka.Dispatch;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote;
Expand Down Expand Up @@ -604,7 +605,9 @@ public static Props Props(Props singletonProps, ClusterSingletonManagerSettings
/// <returns>TBD</returns>
public static Props Props(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings)
{
return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)).WithDeploy(Deploy.Local);
return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings))
.WithDispatcher(Dispatchers.InternalDispatcherId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.WithDeploy(Deploy.Local);
}

private readonly Props _singletonProps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;

namespace Akka.Cluster.Tools.Singleton
Expand Down Expand Up @@ -70,7 +71,9 @@ public static Config DefaultConfig()
/// <returns>TBD</returns>
public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings)
{
return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)).WithDeploy(Deploy.Local);
return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings))
.WithDispatcher(Dispatchers.InternalDispatcherId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.WithDeploy(Deploy.Local);
}

private readonly ClusterSingletonProxySettings _settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static ReplicatorSettings Create(Config config)
throw ConfigurationException.NullOrEmptyConfig<ReplicatorSettings>();

var dispatcher = config.GetString("use-dispatcher", null);
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId;

var durableConfig = config.GetConfig("durable");
var durableKeys = durableConfig.GetStringList("keys");
Expand All @@ -63,7 +63,7 @@ public static ReplicatorSettings Create(Config config)
{
throw new ArgumentException($"`akka.cluster.distributed-data.durable.store-actor-class` is set to an invalid class {durableStoreType}.");
}
durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(durableConfig.GetString("use-dispatcher"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change

durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(dispatcher);
}

// TODO: This constructor call fails when these fields are not populated inside the Config object:
Expand Down Expand Up @@ -212,7 +212,7 @@ private ReplicatorSettings Copy(string role = null,
public ReplicatorSettings WithGossipInterval(TimeSpan gossipInterval) => Copy(gossipInterval: gossipInterval);
public ReplicatorSettings WithNotifySubscribersInterval(TimeSpan notifySubscribersInterval) => Copy(notifySubscribersInterval: notifySubscribersInterval);
public ReplicatorSettings WithMaxDeltaElements(int maxDeltaElements) => Copy(maxDeltaElements: maxDeltaElements);
public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.DefaultDispatcherId : dispatcher);
public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.InternalDispatcherId : dispatcher);
public ReplicatorSettings WithPruning(TimeSpan pruningInterval, TimeSpan maxPruningDissemination) =>
Copy(pruningInterval: pruningInterval, maxPruningDissemination: maxPruningDissemination);
public ReplicatorSettings WithDurableKeys(IImmutableSet<string> durableKeys) => Copy(durableKeys: durableKeys);
Expand Down
4 changes: 2 additions & 2 deletions src/contrib/cluster/Akka.DistributedData/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ akka.cluster.distributed-data {
# the replicas. Next chunk will be transferred in next round of gossip.
max-delta-elements = 1000

# The id of the dispatcher to use for Replicator actors. If not specified
# default dispatcher is used.
# The id of the dispatcher to use for Replicator actors.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
8 changes: 6 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.TestKit")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests.MultiNode")]
Expand Down Expand Up @@ -1841,12 +1842,13 @@ namespace Akka.Actor.Internal
public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem
{
public ActorSystemImpl(string name) { }
public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup) { }
public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup, System.Nullable<Akka.Util.Option<Akka.Actor.Props>> guardianProps = null) { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change - need to add a separate overload. Can't add an optional parameter to an existing one. Otherwise this will break binary compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, even though this API is "public" it's really only called by the ActorSystem.Create method - so probably not a big deal.

public override Akka.Actor.ActorProducerPipelineResolver ActorPipelineResolver { get; }
public override Akka.Actor.IActorRef DeadLetters { get; }
public override Akka.Dispatch.Dispatchers Dispatchers { get; }
public override Akka.Event.EventStream EventStream { get; }
public override Akka.Actor.IInternalActorRef Guardian { get; }
public Akka.Util.Option<Akka.Actor.Props> GuardianProps { get; }
public override Akka.Event.ILoggingAdapter Log { get; }
public override Akka.Actor.IInternalActorRef LookupRoot { get; }
public override Akka.Dispatch.Mailboxes Mailboxes { get; }
Expand Down Expand Up @@ -2452,9 +2454,10 @@ namespace Akka.Dispatch
}
public sealed class Dispatchers
{
public static readonly string DefaultBlockingDispatcherId;
public static readonly string DefaultDispatcherId;
public static readonly string SynchronizedDispatcherId;
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites) { }
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { }
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; }
Expand Down Expand Up @@ -4823,6 +4826,7 @@ namespace Akka.Util
public static readonly Akka.Util.Option<T> None;
public Option(T value) { }
public bool HasValue { get; }
public bool IsEmpty { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this if we already have HasValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a convenience property, it saves from typing an awkward if(!HasValue) and it reads better.

public T Value { get; }
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Akka.Streams
}
public class static ActorAttributes
{
public static Akka.Streams.ActorAttributes.Dispatcher IODispatcher { get; }
public static Akka.Streams.Attributes CreateDebugLogging(bool enabled) { }
public static Akka.Streams.Attributes CreateDispatcher(string dispatcherName) { }
public static Akka.Streams.Attributes CreateFuzzingMode(bool enabled) { }
Expand Down Expand Up @@ -138,12 +139,14 @@ namespace Akka.Streams
public readonly int SyncProcessingLimit;
public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Akka.Streams.Supervision.Decider supervisionDecider, Akka.Streams.StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, Akka.Streams.Dsl.StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = 1000) { }
public static Akka.Streams.ActorMaterializerSettings Create(Akka.Actor.ActorSystem system) { }
public override bool Equals(object obj) { }
public Akka.Streams.ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { }
public Akka.Streams.ActorMaterializerSettings WithDebugLogging(bool isEnabled) { }
public Akka.Streams.ActorMaterializerSettings WithDispatcher(string dispatcher) { }
public Akka.Streams.ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { }
public Akka.Streams.ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { }
public Akka.Streams.ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { }
public Akka.Streams.ActorMaterializerSettings WithOutputBurstLimit(int limit) { }
public Akka.Streams.ActorMaterializerSettings WithStreamRefSettings(Akka.Streams.Dsl.StreamRefSettings settings) { }
public Akka.Streams.ActorMaterializerSettings WithSubscriptionTimeoutSettings(Akka.Streams.StreamSubscriptionTimeoutSettings settings) { }
public Akka.Streams.ActorMaterializerSettings WithSupervisionStrategy(Akka.Streams.Supervision.Decider decider) { }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements()
settings.MinNrOfMembers.Should().Be(1);
settings.MinNrOfMembersOfRole.Should().Equal(ImmutableDictionary<string, int>.Empty);
settings.Roles.Should().BeEquivalentTo(ImmutableHashSet<string>.Empty);
settings.UseDispatcher.Should().Be(Dispatchers.DefaultDispatcherId);
settings.UseDispatcher.Should().Be(Dispatchers.InternalDispatcherId);
settings.GossipDifferentViewProbability.Should().Be(0.8);
settings.ReduceGossipDifferentViewProbability.Should().Be(400);

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ClusterSettings(Config config, string systemName)
MinNrOfMembers = clusterConfig.GetInt("min-nr-of-members", 0);

_useDispatcher = clusterConfig.GetString("use-dispatcher", null);
if (String.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.DefaultDispatcherId;
if (string.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.InternalDispatcherId;
GossipDifferentViewProbability = clusterConfig.GetDouble("gossip-different-view-probability", 0);
ReduceGossipDifferentViewProbability = clusterConfig.GetInt("reduce-gossip-different-view-probability", 0);
SchedulerTickDuration = clusterConfig.GetTimeSpan("scheduler.tick-duration", null);
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ akka {
# Disable with "off".
publish-stats-interval = off

# The id of the dispatcher to use for cluster actors. If not specified
# default dispatcher is used.
# The id of the dispatcher to use for cluster actors.
# If not specified, the internal dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<EnableDefaultCompileItems>false</EnableDefaultCompileItems>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
<!-- FIX for "<GenerateTargetFrameworkAttribute>false</GenerateTargetFrameworkAttribute>..." build error with the latest VS2019 update-->
<GenerateTargetFrameworkAttribute>false</GenerateTargetFrameworkAttribute>
<!-- END FIX-->
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void A_UnfoldResourceAsyncSource_must_use_dedicated_blocking_io_dispatche
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSourceAsync"));
try
{
Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher");
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource"));
try
{
Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher");
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
}
finally
{
Expand Down
Loading