From 91adfb95eb8a005d5fab72de8fa5784bbe6cd0ea Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 11 Jul 2020 06:25:26 +0700 Subject: [PATCH 1/9] Port scala akka PR #26816 to Akka.NET --- src/common.props | 2 +- .../Akka.MultiNodeTestRunner.Shared.csproj | 3 + .../Actor/ActorSystemDispatcherSpec.cs | 180 ++++++++++++++++++ src/core/Akka/Actor/ActorRefProvider.cs | 32 +++- src/core/Akka/Actor/ActorSystem.cs | 44 +++-- src/core/Akka/Actor/Deploy.cs | 3 + .../Akka/Actor/Internal/ActorSystemImpl.cs | 53 +++++- src/core/Akka/Actor/Settings.cs | 2 +- src/core/Akka/Configuration/Pigeon.conf | 48 ++++- src/core/Akka/Dispatch/AbstractDispatcher.cs | 13 +- src/core/Akka/Dispatch/Dispatchers.cs | 52 ++++- .../Helios.Concurrency.DedicatedThreadPool.cs | 14 +- src/core/Akka/Util/Option.cs | 2 + 13 files changed, 401 insertions(+), 47 deletions(-) create mode 100644 src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs diff --git a/src/common.props b/src/common.props index 12cf95ec503..c630df9486b 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2020 Akka.NET Team Akka.NET Team - 1.4.8 + 1.4.9 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE diff --git a/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj b/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj index 5a99724c6c7..bff83b1d9b2 100644 --- a/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj +++ b/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj @@ -5,6 +5,9 @@ $(NetStandardLibVersion) false false + + false + diff --git a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs new file mode 100644 index 00000000000..4f59a0b6407 --- /dev/null +++ b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs @@ -0,0 +1,180 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.Actor.Setup; +using Akka.Configuration; +using Akka.Dispatch; +using Akka.TestKit; +using Akka.Util; +using FluentAssertions; +using Xunit; + +namespace Akka.Tests.Actor +{ + public class ActorSystemDispatcherSpec : AkkaSpec + { + private class SnitchingSynchonizationContext : SynchronizationContext + { + private readonly IActorRef _testActor; + + public SnitchingSynchonizationContext(IActorRef testActor) + { + _testActor = testActor; + } + + public override void OperationStarted() + { + _testActor.Tell("called"); + } + } + + [Fact] + public void The_ActorSystem_must_work_with_a_passed_in_SynchronizationContext() + { + var ecProbe = CreateTestProbe(); + var ec = new SnitchingSynchonizationContext(ecProbe); + var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-passed-in-ec", defaultSynchronizationContext: ec); + + try + { + var actor = system2.ActorOf(); + var probe = CreateTestProbe(system2); + + actor.Tell("ping", probe); + + ecProbe.ExpectMsg("called", TimeSpan.FromSeconds(1)); + probe.ExpectMsg("pong", TimeSpan.FromSeconds(1)); + } + finally + { + Shutdown(system2); + } + } + + [Fact] + public void The_ActorSystem_must_not_use_passed_in_SynchronizationContext_if_executor_is_configured_in() + { + var ecProbe = CreateTestProbe(); + var ec = new SnitchingSynchonizationContext(ecProbe); + + var config = + ConfigurationFactory.ParseString("akka.actor.default-dispatcher.executor = fork-join-executor") + .WithFallback(Sys.Settings.Config); + var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-ec-configured", config, ec); + + try + { + var actor = system2.ActorOf(); + var probe = CreateTestProbe(system2); + + actor.Tell("ping", probe); + + ecProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + probe.ExpectMsg("ping", TimeSpan.FromSeconds(1)); + } + finally + { + Shutdown(system2); + } + } + + [Fact] + public void The_ActorSystem_must_provide_a_single_place_to_override_the_internal_dispatcher() + { + var config = + ConfigurationFactory.ParseString("akka.actor.internal-dispatcher = akka.actor.default-dispatcher") + .WithFallback(Sys.Settings.Config); + var sys = ActorSystem.Create("ActorSystemDispatchersSpec-override-internal-disp", config); + try + { + // that the user guardian runs on the overriden dispatcher instead of internal + // isn't really a guarantee any internal actor has been made running on the right one + // but it's better than no test coverage at all + UserGuardianDispatcher(sys).Should().Be("akka.actor.default-dispatcher"); + } + finally + { + Shutdown(sys); + } + } + + [Fact] + public void The_ActorSystem_must_provide_internal_execution_context_instance_through_BootstrapSetup() + { + var ecProbe = CreateTestProbe(); + var ec = new SnitchingSynchonizationContext(ecProbe); + + // using the default for internal dispatcher and passing a pre-existing execution context + var config = + ConfigurationFactory.ParseString("akka.actor.internal-dispatcher = akka.actor.default-dispatcher") + .WithFallback(Sys.Settings.Config); + var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-passed-in-ec-for-internal", config, ec); + + try + { + var actor = system2.ActorOf(Props.Create() + .WithDispatcher(Dispatchers.InternalDispatcherId)); + + var probe = CreateTestProbe(system2); + + actor.Tell("ping", probe); + + ecProbe.ExpectMsg("called", TimeSpan.FromSeconds(1)); + probe.ExpectMsg("pong", TimeSpan.FromSeconds(1)); + } + finally + { + Shutdown(system2); + } + } + + [Fact] + public void The_ActorSystem_must_use_the_default_dispatcher_by_a_user_provided_user_guardian() + { + var sys = new ActorSystemImpl( + "ActorSystemDispatchersSpec-custom-user-guardian", + ConfigurationFactory.Default(), + ActorSystemSetup.Empty, + Option.None, + Option.None); + sys.Start(); + try + { + UserGuardianDispatcher(sys).Should().Be("akka.actor.default-dispatcher"); + } + finally + { + Shutdown(sys); + } + } + + private string UserGuardianDispatcher(ActorSystem system) + { + var impl = (ActorSystemImpl)system; + return ((ActorCell)((ActorRefWithCell)impl.Guardian).Underlying).Dispatcher.Id; + } + + private class PingPongActor : UntypedActor + { + protected override void OnReceive(object message) + { + if((string)message == "ping") + Sender.Tell("pong"); + } + } + + private class EchoActor : UntypedActor + { + protected override void OnReceive(object message) + { + Sender.Tell(message); + } + } + } +} + diff --git a/src/core/Akka/Actor/ActorRefProvider.cs b/src/core/Akka/Actor/ActorRefProvider.cs index 8b877d91c84..ba028369b57 100644 --- a/src/core/Akka/Actor/ActorRefProvider.cs +++ b/src/core/Akka/Actor/ActorRefProvider.cs @@ -256,7 +256,8 @@ public LocalActorRefProvider(string systemName, Settings settings, EventStream e /// public EventStream EventStream { get { return _eventStream; } } - private MessageDispatcher DefaultDispatcher { get { return _system.Dispatchers.DefaultGlobalDispatcher; } } + //private MessageDispatcher DefaultDispatcher { get { return _system.Dispatchers.DefaultGlobalDispatcher; } } + private MessageDispatcher InternalDispatcher => _system.Dispatchers.Lookup(Dispatchers.InternalDispatcherId); private SupervisorStrategy UserGuardianSupervisorStrategy { get { return _userGuardianStrategyConfigurator.Create(); } } @@ -296,7 +297,7 @@ private RootGuardianActorRef CreateRootGuardian(ActorSystemImpl system) return Directive.Stop; }); var props = Props.Create(rootGuardianStrategy); - var rootGuardian = new RootGuardianActorRef(system, props, DefaultDispatcher, _defaultMailbox, supervisor, _rootPath, _deadLetters, _extraNames); + var rootGuardian = new RootGuardianActorRef(system, props, InternalDispatcher, _defaultMailbox, supervisor, _rootPath, _deadLetters, _extraNames); return rootGuardian; } @@ -314,9 +315,30 @@ private LocalActorRef CreateUserGuardian(LocalActorRef rootGuardian, string name { var cell = rootGuardian.Cell; cell.ReserveChild(name); - var props = Props.Create(UserGuardianSupervisorStrategy); + // make user provided guardians not run on internal dispatcher + MessageDispatcher dispatcher; + if (_system.GuardianProps.IsEmpty) + { + dispatcher = InternalDispatcher; + } + else + { + var props = _system.GuardianProps.Value; + var dispatcherId = + props.Deploy.Dispatcher == Deploy.DispatcherSameAsParent + ? Dispatchers.DefaultDispatcherId + : props.Dispatcher; + dispatcher = _system.Dispatchers.Lookup(dispatcherId); + } + + var userGuardian = new LocalActorRef( + _system, + _system.GuardianProps.GetOrElse(Props.Create(UserGuardianSupervisorStrategy)), + dispatcher, + _defaultMailbox, + rootGuardian, + RootPath / name); - var userGuardian = new LocalActorRef(_system, props, DefaultDispatcher, _defaultMailbox, rootGuardian, RootPath / name); cell.InitChild(userGuardian); userGuardian.Start(); return userGuardian; @@ -328,7 +350,7 @@ private LocalActorRef CreateSystemGuardian(LocalActorRef rootGuardian, string na cell.ReserveChild(name); var props = Props.Create(() => new SystemGuardianActor(userGuardian), _systemGuardianStrategy); - var systemGuardian = new LocalActorRef(_system, props, DefaultDispatcher, _defaultMailbox, rootGuardian, RootPath / name); + var systemGuardian = new LocalActorRef(_system, props, InternalDispatcher, _defaultMailbox, rootGuardian, RootPath / name); cell.InitChild(systemGuardian); systemGuardian.Start(); return systemGuardian; diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index 8d8658ad3d3..f2072fc9139 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor.Internal; using Akka.Actor.Setup; @@ -96,14 +97,22 @@ internal static ProviderSelection GetProvider(string providerClass) /// public sealed class BootstrapSetup : Setup.Setup { - internal BootstrapSetup() : this(Option.None, Option.None) + internal BootstrapSetup() + : this( + Option.None, + Option.None, + Option.None) { } - internal BootstrapSetup(Option config, Option actorRefProvider) + internal BootstrapSetup( + Option config, + Option actorRefProvider, + Option defaultSynchronizationContext) { Config = config; ActorRefProvider = actorRefProvider; + DefaultSynchronizationContext = defaultSynchronizationContext; } /// @@ -119,6 +128,8 @@ internal BootstrapSetup(Option config, Option actorRe /// public Option ActorRefProvider { get; } + public Option DefaultSynchronizationContext { get; } + /// /// Create a new instance. /// @@ -129,13 +140,16 @@ public static BootstrapSetup Create() public BootstrapSetup WithActorRefProvider(ProviderSelection name) { - return new BootstrapSetup(Config, name); + return new BootstrapSetup(Config, name, DefaultSynchronizationContext); } public BootstrapSetup WithConfig(Config config) { - return new BootstrapSetup(config, ActorRefProvider); + return new BootstrapSetup(config, ActorRefProvider, DefaultSynchronizationContext); } + + public BootstrapSetup WithdefaultSynchronizationContext(SynchronizationContext SynchronizationContext) + => new BootstrapSetup(Config, ActorRefProvider, SynchronizationContext); } /// @@ -196,6 +210,8 @@ public abstract class ActorSystem : IActorRefFactory, IDisposable /// Gets the log public abstract ILoggingAdapter Log { get; } + internal abstract Option DefaultSynchronizationContext { get; } + /// /// Start-up time since the epoch. /// @@ -216,9 +232,9 @@ public abstract class ActorSystem : IActorRefFactory, IDisposable /// /// The configuration used to create the actor system /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, Config config) + public static ActorSystem Create(string name, Config config, SynchronizationContext defaultSynchronizationContext = null) { - return CreateAndStartSystem(name, config, ActorSystemSetup.Empty); + return CreateAndStartSystem(name, config, ActorSystemSetup.Empty, defaultSynchronizationContext); } /// @@ -229,9 +245,9 @@ public static ActorSystem Create(string name, Config config) /// /// The bootstrap setup used to help programmatically initialize the . /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, BootstrapSetup setup) + public static ActorSystem Create(string name, BootstrapSetup setup, SynchronizationContext defaultSynchronizationContext = null) { - return Create(name, ActorSystemSetup.Create(setup)); + return Create(name, ActorSystemSetup.Create(setup), defaultSynchronizationContext); } /// @@ -242,12 +258,12 @@ public static ActorSystem Create(string name, BootstrapSetup setup) /// /// The bootstrap setup used to help programmatically initialize the . /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, ActorSystemSetup setup) + public static ActorSystem Create(string name, ActorSystemSetup setup, SynchronizationContext defaultSynchronizationContext = null) { var bootstrapSetup = setup.Get(); var appConfig = bootstrapSetup.FlatSelect(_ => _.Config).GetOrElse(ConfigurationFactory.Load()); - return CreateAndStartSystem(name, appConfig, setup); + return CreateAndStartSystem(name, appConfig, setup, defaultSynchronizationContext); } /// @@ -257,14 +273,14 @@ public static ActorSystem Create(string name, ActorSystemSetup setup) /// Must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' /// /// A newly created actor system with the given name. - public static ActorSystem Create(string name) + public static ActorSystem Create(string name, SynchronizationContext defaultSynchronizationContext = null) { - return Create(name, ActorSystemSetup.Empty); + return Create(name, ActorSystemSetup.Empty, defaultSynchronizationContext); } - private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup) + private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup, SynchronizationContext defaultSynchronizationContext) { - var system = new ActorSystemImpl(name, withFallback, setup); + var system = new ActorSystemImpl(name, withFallback, setup, defaultSynchronizationContext, Option.None); system.Start(); return system; } diff --git a/src/core/Akka/Actor/Deploy.cs b/src/core/Akka/Actor/Deploy.cs index 9bba09db75f..becdf5e35df 100644 --- a/src/core/Akka/Actor/Deploy.cs +++ b/src/core/Akka/Actor/Deploy.cs @@ -29,6 +29,9 @@ public class Deploy : IEquatable, ISurrogated /// This deployment does not have a mailbox associated with it. /// public static readonly string NoMailboxGiven = string.Empty; + + internal const string DispatcherSameAsParent = ".."; + /// /// This deployment has an unspecified scope associated with it. /// diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index ca91ab05ce4..122fe6f3d91 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -54,7 +54,12 @@ public class ActorSystemImpl : ExtendedActorSystem, ISupportSerializationConfigR /// /// The name given to the actor system. public ActorSystemImpl(string name) - : this(name, ConfigurationFactory.Default(), ActorSystemSetup.Empty) + : this( + name, + ConfigurationFactory.Default(), + ActorSystemSetup.Empty, + Option.None, + Option.None) { } @@ -69,7 +74,12 @@ public ActorSystemImpl(string name) /// Note that the name must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-'). /// /// This exception is thrown if the given is undefined. - public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) + public ActorSystemImpl( + string name, + Config config, + ActorSystemSetup setup, + Option? defaultSynchronizationContext = null, + Option? guardianProps = null) { if(!Regex.Match(name, "^[a-zA-Z0-9][a-zA-Z0-9-]*$").Success) throw new ArgumentException( @@ -79,7 +89,10 @@ public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) if(config is null) throw new ArgumentNullException(nameof(config), $"Cannot create {typeof(ActorSystemImpl)}: Configuration must not be null."); - _name = name; + _name = name; + + GuardianProps = guardianProps ?? Option.None; + ConfigureSettings(config, setup); ConfigureEventStream(); ConfigureLoggers(); @@ -88,7 +101,9 @@ public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) ConfigureTerminationCallbacks(); ConfigureSerialization(); ConfigureMailboxes(); - ConfigureDispatchers(); + ConfigureDispatchers( + defaultSynchronizationContext ?? Option.None, + setup.Get().Value); ConfigureActorProducerPipeline(); } @@ -134,6 +149,11 @@ public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) /// public override IInternalActorRef SystemGuardian { get { return _provider.SystemGuardian; } } + public Option GuardianProps { get; } + + private Option _defaultSynchronizationContext; + internal override Option DefaultSynchronizationContext => _defaultSynchronizationContext; + /// /// Creates a new system actor that lives under the "/system" guardian. /// @@ -246,7 +266,9 @@ private void WarnIfJsonIsDefaultSerializer() /// public override IActorRef ActorOf(Props props, string name = null) { - return _provider.Guardian.Cell.AttachChild(props, false, name); + if(GuardianProps.IsEmpty) + return _provider.Guardian.Cell.AttachChild(props, false, name); + throw new InvalidOperationException($"cannot create top-level actor { (string.IsNullOrEmpty(name) ? "" : $"[{name} ]")}from the outside on ActorSystem with custom user guardian"); } /// @@ -396,6 +418,7 @@ public override bool HasExtension() private void ConfigureSettings(Config config, ActorSystemSetup setup) { + // TODO: on this line, in scala, the config is validated with `Dispatchers.InternalDispatcherId` path removed. _settings = new Settings(this, config, setup); } @@ -446,9 +469,25 @@ private void ConfigureLoggers() _log = new BusLogging(_eventStream, "ActorSystem(" + _name + ")", GetType(), new DefaultLogMessageFormatter()); } - private void ConfigureDispatchers() + private void ConfigureDispatchers(Option defaultSynchronizationContext, BootstrapSetup setup) { - _dispatchers = new Dispatchers(this, new DefaultDispatcherPrerequisites(EventStream, Scheduler, Settings, Mailboxes)); + Option synchronizationContext; + if (setup != null && setup.DefaultSynchronizationContext.HasValue) + synchronizationContext = setup.DefaultSynchronizationContext.Value; + else + synchronizationContext = defaultSynchronizationContext; + + _defaultSynchronizationContext = synchronizationContext; + + _dispatchers = new Dispatchers( + this, + new DefaultDispatcherPrerequisites( + EventStream, + Scheduler, + Settings, + Mailboxes, + synchronizationContext), + _log); } private void ConfigureActorProducerPipeline() diff --git a/src/core/Akka/Actor/Settings.cs b/src/core/Akka/Actor/Settings.cs index e9169ad40ba..2ccc8e5d361 100644 --- a/src/core/Akka/Actor/Settings.cs +++ b/src/core/Akka/Actor/Settings.cs @@ -33,7 +33,7 @@ private void RebuildConfig() Config = _userConfig.SafeWithFallback(_fallbackConfig); //if we get a new config definition loaded after all ActorRefProviders have been started, such as Akka.Persistence... - System?.Dispatchers?.ReloadPrerequisites(new DefaultDispatcherPrerequisites(System.EventStream, System.Scheduler, this, System.Mailboxes)); + System?.Dispatchers?.ReloadPrerequisites(new DefaultDispatcherPrerequisites(System.EventStream, System.Scheduler, this, System.Mailboxes, System.DefaultSynchronizationContext)); if (System is Internal.ISupportSerializationConfigReload rs) rs.ReloadSerialization(); } diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index 0a63485ef51..361985082ec 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -310,7 +310,23 @@ akka { # This will be used if you have set "executor = "fork-join-executor"" # Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool fork-join-executor { - dedicated-thread-pool{ #settings for Helios.DedicatedThreadPool + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 8 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 1.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + + # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack + # like peeking mode which "pop". + task-peeking-mode = "FIFO" + + + dedicated-thread-pool{ #settings for Helios.DedicatedThreadPool thread-count = 3 #number of threads #deadlock-timeout = 3s #optional timeout for deadlock detection threadtype = background #values can be "background" or "foreground" @@ -340,6 +356,20 @@ akka { # be a subtype of this type. The empty string signifies no requirement. mailbox-requirement = "" } + + # Default separate internal dispatcher to run Akka internal tasks and actors on + # protecting them against starvation because of accidental blocking in user actors (which run on the + # default dispatcher) + internal-dispatcher { + type = "Dispatcher" + executor = "fork-join-executor" + throughput = 5 + fork-join-executor { + parallelism-min = 4 + parallelism-factor = 1.0 + parallelism-max = 64 + } + } default-mailbox { # FQCN of the MailboxType. The Class of the FQCN must have a public @@ -621,15 +651,15 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # on which file IO tasks are scheduled - file-io-dispatcher = "akka.actor.default-dispatcher" + file-io-dispatcher = "akka.actor.internal-dispatcher" # The maximum number of bytes (or "unlimited") to transfer in one batch # when using `WriteFile` command which uses `FileChannel.transferTo` to @@ -748,11 +778,11 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" } udp-connected { @@ -843,18 +873,18 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" } dns { # Fully qualified config path which holds the dispatcher configuration # for the manager and resolver router actors. # For actual router configuration see akka.actor.deployment./IO-DNS/* - dispatcher = "akka.actor.default-dispatcher" + dispatcher = "akka.actor.internal-dispatcher" # Name of the subconfig at path akka.io.dns, see inet-address below resolver = "inet-address" diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index 0b52c829763..711f9c55c31 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -44,6 +44,8 @@ public interface IDispatcherPrerequisites /// The list of registered for the current . /// Mailboxes Mailboxes { get; } + + Option DefaultSynchronizationContext { get; } } /// @@ -58,12 +60,19 @@ public sealed class DefaultDispatcherPrerequisites : IDispatcherPrerequisites /// TBD /// TBD /// TBD - public DefaultDispatcherPrerequisites(EventStream eventStream, IScheduler scheduler, Settings settings, Mailboxes mailboxes) + /// TBD + public DefaultDispatcherPrerequisites( + EventStream eventStream, + IScheduler scheduler, + Settings settings, + Mailboxes mailboxes, + Option defaultSynchronizationContext) { Mailboxes = mailboxes; Settings = settings; Scheduler = scheduler; EventStream = eventStream; + DefaultSynchronizationContext = defaultSynchronizationContext; } /// @@ -82,6 +91,8 @@ public DefaultDispatcherPrerequisites(EventStream eventStream, IScheduler schedu /// TBD /// public Mailboxes Mailboxes { get; private set; } + + public Option DefaultSynchronizationContext { get; } } /// diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 1b630aa3031..870cfa1c641 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; +using Akka.Event; using Helios.Concurrency; using ConfigurationFactory = Akka.Configuration.ConfigurationFactory; @@ -198,6 +199,18 @@ public override void Shutdown() /// /// The registry of all instances available to this . + /// + /// Dispatchers are to be defined in configuration to allow for tuning + /// for different environments. Use the method to create + /// a dispatcher as specified in configuration. + /// + /// A dispatcher config can also be an alias, in that case it is a config string value pointing + /// to the actual dispatcher config. + /// + /// Look in `akka.actor.default-dispatcher` section of the reference.conf + /// for documentation of dispatcher options. + /// + /// Not for user instantiation or extension /// public sealed class Dispatchers { @@ -206,6 +219,8 @@ public sealed class Dispatchers /// public static readonly string DefaultDispatcherId = "akka.actor.default-dispatcher"; + public const string InternalDispatcherId = "akka.actor.internal-dispatcher"; + /// /// The identifier for synchronized dispatchers. /// @@ -214,6 +229,7 @@ public sealed class Dispatchers private readonly ActorSystem _system; private Config _cachingConfig; private readonly MessageDispatcher _defaultGlobalDispatcher; + private readonly ILoggingAdapter _logger; /// /// The list of all configurators used to create instances. @@ -225,12 +241,15 @@ public sealed class Dispatchers /// Initializes a new instance of the class. /// The system. /// The prerequisites required for some instances. - public Dispatchers(ActorSystem system, IDispatcherPrerequisites prerequisites) + public Dispatchers(ActorSystem system, IDispatcherPrerequisites prerequisites, ILoggingAdapter logger) { _system = system; Prerequisites = prerequisites; _cachingConfig = new CachingConfig(prerequisites.Settings.Config); _defaultGlobalDispatcher = Lookup(DefaultDispatcherId); + _logger = logger; + + InternalDispatcher = Lookup(InternalDispatcherId); } /// Gets the one and only default dispatcher. @@ -239,6 +258,8 @@ public MessageDispatcher DefaultGlobalDispatcher get { return _defaultGlobalDispatcher; } } + internal MessageDispatcher InternalDispatcher { get; } + /// /// The for the default dispatcher. /// @@ -270,8 +291,13 @@ internal void ReloadPrerequisites(IDispatcherPrerequisites prerequisites) public IDispatcherPrerequisites Prerequisites { get; private set; } /// - /// Returns a dispatcher as specified in configuration. Please note that this method _MAY_ - /// create and return a new dispatcher on _EVERY_ call. + /// Returns a dispatcher as specified in configuration. Please note that this + /// method _may_ create and return a NEW dispatcher, _every_ call (depending on the `MessageDispatcherConfigurator` + /// dispatcher config the id points to). + /// + /// A dispatcher id can also be an alias. In the case it is a string value in the config it is treated as the id + /// of the actual dispatcher config to use. If several ids leading to the same actual dispatcher config is used only one + /// instance is created. This means that for dispatchers you expect to be shared they will be. /// /// TBD /// @@ -280,7 +306,7 @@ internal void ReloadPrerequisites(IDispatcherPrerequisites prerequisites) /// TBD public MessageDispatcher Lookup(string dispatcherName) { - return LookupConfigurator(dispatcherName).Dispatcher(); + return LookupConfigurator(dispatcherName, 0).Dispatcher(); } /// @@ -295,7 +321,7 @@ public bool HasDispatcher(string id) return _dispatcherConfigurators.ContainsKey(id) || _cachingConfig.HasPath(id); } - private MessageDispatcherConfigurator LookupConfigurator(string id) + private MessageDispatcherConfigurator LookupConfigurator(string id, int depth) { if (!_dispatcherConfigurators.TryGetValue(id, out var configurator)) { @@ -304,7 +330,21 @@ private MessageDispatcherConfigurator LookupConfigurator(string id) // created until used, i.e. cheap. MessageDispatcherConfigurator newConfigurator; if (_cachingConfig.HasPath(id)) - newConfigurator = ConfiguratorFrom(Config(id)); + { + var valueAtPath = _cachingConfig.GetValue(id); + if (valueAtPath.IsString()) + { + // a dispatcher key can be an alias of another dispatcher, if it is a string + // we treat that string value as the id of a dispatcher to lookup, it will be stored + // both under the actual id and the alias id in the 'dispatcherConfigurators' cache + var actualId = valueAtPath.GetString(); + _logger.Debug($"Dispatcher id [{id}] is an alias, actual dispatcher will be [{actualId}]"); + newConfigurator = LookupConfigurator(actualId, depth + 1); + } else if (valueAtPath.IsObject()) + newConfigurator = ConfiguratorFrom(Config(id)); + else + throw new ConfigurationException($"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); + } else throw new ConfigurationException($"Dispatcher {id} not configured."); diff --git a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs index 2a0250b6d51..9ceadcf2b4c 100644 --- a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs +++ b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs @@ -94,11 +94,16 @@ public DedicatedThreadPoolSettings(int numThreads, throw new ArgumentOutOfRangeException(nameof(numThreads), $"numThreads must be at least 1. Was {numThreads}"); } #else - public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null) - : this(numThreads, DefaultThreadType, name, deadlockTimeout) + public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null, SynchronizationContext synchronizationContext = null) + : this(numThreads, DefaultThreadType, name, deadlockTimeout, synchronizationContext) { } - public DedicatedThreadPoolSettings(int numThreads, ThreadType threadType, string name = null, TimeSpan? deadlockTimeout = null) + public DedicatedThreadPoolSettings( + int numThreads, + ThreadType threadType, + string name = null, + TimeSpan? deadlockTimeout = null, + SynchronizationContext synchronizationContext = null) { Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid()); ThreadType = threadType; @@ -108,6 +113,7 @@ public DedicatedThreadPoolSettings(int numThreads, ThreadType threadType, string throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout)); if (numThreads <= 0) throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads)); + SynchronizationContext = synchronizationContext ?? SynchronizationContext.Current; } #endif @@ -150,6 +156,8 @@ public DedicatedThreadPoolSettings(int numThreads, ThreadType threadType, string /// Gets the thread stack size, 0 represents the default stack size. /// public int ThreadMaxStackSize { get; private set; } + + public SynchronizationContext SynchronizationContext { get; } } /// diff --git a/src/core/Akka/Util/Option.cs b/src/core/Akka/Util/Option.cs index bf16d1008fb..b412f376953 100644 --- a/src/core/Akka/Util/Option.cs +++ b/src/core/Akka/Util/Option.cs @@ -38,6 +38,8 @@ public Option(T value) /// public bool HasValue { get; } + public bool IsEmpty => !HasValue; + /// /// TBD /// From 9a7a041c519caa006237e3a57320c6bf30deb5a4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 14 Jul 2020 01:21:03 +0700 Subject: [PATCH 2/9] Remove SynchronizationContext usage --- .../Actor/ActorSystemDispatcherSpec.cs | 79 +------------------ src/core/Akka/Actor/ActorSystem.cs | 38 ++++----- .../Akka/Actor/Internal/ActorSystemImpl.cs | 22 +----- src/core/Akka/Actor/Settings.cs | 2 +- src/core/Akka/Dispatch/AbstractDispatcher.cs | 9 +-- 5 files changed, 20 insertions(+), 130 deletions(-) diff --git a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs index 4f59a0b6407..101dbf8d996 100644 --- a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs +++ b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs @@ -33,39 +33,13 @@ public override void OperationStarted() } } - [Fact] - public void The_ActorSystem_must_work_with_a_passed_in_SynchronizationContext() - { - var ecProbe = CreateTestProbe(); - var ec = new SnitchingSynchonizationContext(ecProbe); - var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-passed-in-ec", defaultSynchronizationContext: ec); - - try - { - var actor = system2.ActorOf(); - var probe = CreateTestProbe(system2); - - actor.Tell("ping", probe); - - ecProbe.ExpectMsg("called", TimeSpan.FromSeconds(1)); - probe.ExpectMsg("pong", TimeSpan.FromSeconds(1)); - } - finally - { - Shutdown(system2); - } - } - [Fact] public void The_ActorSystem_must_not_use_passed_in_SynchronizationContext_if_executor_is_configured_in() { - var ecProbe = CreateTestProbe(); - var ec = new SnitchingSynchonizationContext(ecProbe); - var config = ConfigurationFactory.ParseString("akka.actor.default-dispatcher.executor = fork-join-executor") .WithFallback(Sys.Settings.Config); - var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-ec-configured", config, ec); + var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-ec-configured", config); try { @@ -74,7 +48,6 @@ public void The_ActorSystem_must_not_use_passed_in_SynchronizationContext_if_exe actor.Tell("ping", probe); - ecProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); probe.ExpectMsg("ping", TimeSpan.FromSeconds(1)); } finally @@ -103,56 +76,6 @@ public void The_ActorSystem_must_provide_a_single_place_to_override_the_internal } } - [Fact] - public void The_ActorSystem_must_provide_internal_execution_context_instance_through_BootstrapSetup() - { - var ecProbe = CreateTestProbe(); - var ec = new SnitchingSynchonizationContext(ecProbe); - - // using the default for internal dispatcher and passing a pre-existing execution context - var config = - ConfigurationFactory.ParseString("akka.actor.internal-dispatcher = akka.actor.default-dispatcher") - .WithFallback(Sys.Settings.Config); - var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-passed-in-ec-for-internal", config, ec); - - try - { - var actor = system2.ActorOf(Props.Create() - .WithDispatcher(Dispatchers.InternalDispatcherId)); - - var probe = CreateTestProbe(system2); - - actor.Tell("ping", probe); - - ecProbe.ExpectMsg("called", TimeSpan.FromSeconds(1)); - probe.ExpectMsg("pong", TimeSpan.FromSeconds(1)); - } - finally - { - Shutdown(system2); - } - } - - [Fact] - public void The_ActorSystem_must_use_the_default_dispatcher_by_a_user_provided_user_guardian() - { - var sys = new ActorSystemImpl( - "ActorSystemDispatchersSpec-custom-user-guardian", - ConfigurationFactory.Default(), - ActorSystemSetup.Empty, - Option.None, - Option.None); - sys.Start(); - try - { - UserGuardianDispatcher(sys).Should().Be("akka.actor.default-dispatcher"); - } - finally - { - Shutdown(sys); - } - } - private string UserGuardianDispatcher(ActorSystem system) { var impl = (ActorSystemImpl)system; diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index f2072fc9139..a4be985f742 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -100,19 +100,16 @@ public sealed class BootstrapSetup : Setup.Setup internal BootstrapSetup() : this( Option.None, - Option.None, - Option.None) + Option.None) { } internal BootstrapSetup( Option config, - Option actorRefProvider, - Option defaultSynchronizationContext) + Option actorRefProvider) { Config = config; ActorRefProvider = actorRefProvider; - DefaultSynchronizationContext = defaultSynchronizationContext; } /// @@ -128,8 +125,6 @@ internal BootstrapSetup( /// public Option ActorRefProvider { get; } - public Option DefaultSynchronizationContext { get; } - /// /// Create a new instance. /// @@ -140,16 +135,13 @@ public static BootstrapSetup Create() public BootstrapSetup WithActorRefProvider(ProviderSelection name) { - return new BootstrapSetup(Config, name, DefaultSynchronizationContext); + return new BootstrapSetup(Config, name); } public BootstrapSetup WithConfig(Config config) { - return new BootstrapSetup(config, ActorRefProvider, DefaultSynchronizationContext); + return new BootstrapSetup(config, ActorRefProvider); } - - public BootstrapSetup WithdefaultSynchronizationContext(SynchronizationContext SynchronizationContext) - => new BootstrapSetup(Config, ActorRefProvider, SynchronizationContext); } /// @@ -210,8 +202,6 @@ public abstract class ActorSystem : IActorRefFactory, IDisposable /// Gets the log public abstract ILoggingAdapter Log { get; } - internal abstract Option DefaultSynchronizationContext { get; } - /// /// Start-up time since the epoch. /// @@ -232,9 +222,9 @@ public abstract class ActorSystem : IActorRefFactory, IDisposable /// /// The configuration used to create the actor system /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, Config config, SynchronizationContext defaultSynchronizationContext = null) + public static ActorSystem Create(string name, Config config) { - return CreateAndStartSystem(name, config, ActorSystemSetup.Empty, defaultSynchronizationContext); + return CreateAndStartSystem(name, config, ActorSystemSetup.Empty); } /// @@ -245,9 +235,9 @@ public static ActorSystem Create(string name, Config config, SynchronizationCont /// /// The bootstrap setup used to help programmatically initialize the . /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, BootstrapSetup setup, SynchronizationContext defaultSynchronizationContext = null) + public static ActorSystem Create(string name, BootstrapSetup setup) { - return Create(name, ActorSystemSetup.Create(setup), defaultSynchronizationContext); + return Create(name, ActorSystemSetup.Create(setup)); } /// @@ -258,12 +248,12 @@ public static ActorSystem Create(string name, BootstrapSetup setup, Synchronizat /// /// The bootstrap setup used to help programmatically initialize the . /// A newly created actor system with the given name and configuration. - public static ActorSystem Create(string name, ActorSystemSetup setup, SynchronizationContext defaultSynchronizationContext = null) + public static ActorSystem Create(string name, ActorSystemSetup setup) { var bootstrapSetup = setup.Get(); var appConfig = bootstrapSetup.FlatSelect(_ => _.Config).GetOrElse(ConfigurationFactory.Load()); - return CreateAndStartSystem(name, appConfig, setup, defaultSynchronizationContext); + return CreateAndStartSystem(name, appConfig, setup); } /// @@ -273,14 +263,14 @@ public static ActorSystem Create(string name, ActorSystemSetup setup, Synchroniz /// Must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' /// /// A newly created actor system with the given name. - public static ActorSystem Create(string name, SynchronizationContext defaultSynchronizationContext = null) + public static ActorSystem Create(string name) { - return Create(name, ActorSystemSetup.Empty, defaultSynchronizationContext); + return Create(name, ActorSystemSetup.Empty); } - private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup, SynchronizationContext defaultSynchronizationContext) + private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup) { - var system = new ActorSystemImpl(name, withFallback, setup, defaultSynchronizationContext, Option.None); + var system = new ActorSystemImpl(name, withFallback, setup, Option.None); system.Start(); return system; } diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index 122fe6f3d91..c8c2418086e 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -58,7 +58,6 @@ public ActorSystemImpl(string name) name, ConfigurationFactory.Default(), ActorSystemSetup.Empty, - Option.None, Option.None) { } @@ -78,7 +77,6 @@ public ActorSystemImpl( string name, Config config, ActorSystemSetup setup, - Option? defaultSynchronizationContext = null, Option? guardianProps = null) { if(!Regex.Match(name, "^[a-zA-Z0-9][a-zA-Z0-9-]*$").Success) @@ -101,9 +99,7 @@ public ActorSystemImpl( ConfigureTerminationCallbacks(); ConfigureSerialization(); ConfigureMailboxes(); - ConfigureDispatchers( - defaultSynchronizationContext ?? Option.None, - setup.Get().Value); + ConfigureDispatchers(); ConfigureActorProducerPipeline(); } @@ -151,9 +147,6 @@ public ActorSystemImpl( public Option GuardianProps { get; } - private Option _defaultSynchronizationContext; - internal override Option DefaultSynchronizationContext => _defaultSynchronizationContext; - /// /// Creates a new system actor that lives under the "/system" guardian. /// @@ -469,24 +462,15 @@ private void ConfigureLoggers() _log = new BusLogging(_eventStream, "ActorSystem(" + _name + ")", GetType(), new DefaultLogMessageFormatter()); } - private void ConfigureDispatchers(Option defaultSynchronizationContext, BootstrapSetup setup) + private void ConfigureDispatchers() { - Option synchronizationContext; - if (setup != null && setup.DefaultSynchronizationContext.HasValue) - synchronizationContext = setup.DefaultSynchronizationContext.Value; - else - synchronizationContext = defaultSynchronizationContext; - - _defaultSynchronizationContext = synchronizationContext; - _dispatchers = new Dispatchers( this, new DefaultDispatcherPrerequisites( EventStream, Scheduler, Settings, - Mailboxes, - synchronizationContext), + Mailboxes), _log); } diff --git a/src/core/Akka/Actor/Settings.cs b/src/core/Akka/Actor/Settings.cs index 2ccc8e5d361..e9169ad40ba 100644 --- a/src/core/Akka/Actor/Settings.cs +++ b/src/core/Akka/Actor/Settings.cs @@ -33,7 +33,7 @@ private void RebuildConfig() Config = _userConfig.SafeWithFallback(_fallbackConfig); //if we get a new config definition loaded after all ActorRefProviders have been started, such as Akka.Persistence... - System?.Dispatchers?.ReloadPrerequisites(new DefaultDispatcherPrerequisites(System.EventStream, System.Scheduler, this, System.Mailboxes, System.DefaultSynchronizationContext)); + System?.Dispatchers?.ReloadPrerequisites(new DefaultDispatcherPrerequisites(System.EventStream, System.Scheduler, this, System.Mailboxes)); if (System is Internal.ISupportSerializationConfigReload rs) rs.ReloadSerialization(); } diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index 711f9c55c31..fe0214fead4 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -44,8 +44,6 @@ public interface IDispatcherPrerequisites /// The list of registered for the current . /// Mailboxes Mailboxes { get; } - - Option DefaultSynchronizationContext { get; } } /// @@ -60,19 +58,16 @@ public sealed class DefaultDispatcherPrerequisites : IDispatcherPrerequisites /// TBD /// TBD /// TBD - /// TBD public DefaultDispatcherPrerequisites( EventStream eventStream, IScheduler scheduler, Settings settings, - Mailboxes mailboxes, - Option defaultSynchronizationContext) + Mailboxes mailboxes) { Mailboxes = mailboxes; Settings = settings; Scheduler = scheduler; EventStream = eventStream; - DefaultSynchronizationContext = defaultSynchronizationContext; } /// @@ -91,8 +86,6 @@ public DefaultDispatcherPrerequisites( /// TBD /// public Mailboxes Mailboxes { get; private set; } - - public Option DefaultSynchronizationContext { get; } } /// From ff7d06ac9dc5d85f04219ddcec0fbc418ac793d4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 14 Jul 2020 03:10:13 +0700 Subject: [PATCH 3/9] Use internal dispatcher in other part of Akka.NET, update documentation --- docs/articles/actors/dispatchers.md | 8 ++++++++ .../cluster/Akka.Cluster.Sharding/ClusterSharding.cs | 2 +- src/contrib/cluster/Akka.Cluster.Sharding/reference.conf | 2 +- .../Client/ClusterClientReceptionist.cs | 2 +- .../cluster/Akka.Cluster.Tools/Client/reference.conf | 2 +- .../PublishSubscribe/DistributedPubSub.cs | 2 +- .../Akka.Cluster.Tools/PublishSubscribe/reference.conf | 2 +- .../cluster/Akka.DistributedData/ReplicatorSettings.cs | 6 +++--- src/contrib/cluster/Akka.DistributedData/reference.conf | 4 ++-- src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs | 2 +- src/core/Akka.Cluster/ClusterSettings.cs | 2 +- src/core/Akka.Cluster/Configuration/Cluster.conf | 4 ++-- src/core/Akka/Dispatch/Dispatchers.cs | 2 +- src/core/Akka/Properties/AssemblyInfo.cs | 2 ++ 14 files changed, 26 insertions(+), 16 deletions(-) diff --git a/docs/articles/actors/dispatchers.md b/docs/articles/actors/dispatchers.md index c6963215c52..73ee10f96b9 100644 --- a/docs/articles/actors/dispatchers.md +++ b/docs/articles/actors/dispatchers.md @@ -70,6 +70,7 @@ system.ActorOf(Props.Create().WithDispatcher("my-dispatcher"), "my-acto Some dispatcher configurations are available out-of-the-box for convenience. You can use them during actor deployment, [as described above](#configuring-dispatchers). * **default-dispatcher** - A configuration that uses the [ThreadPoolDispatcher](#threadpooldispatcher). As the name says, this is the default dispatcher configuration used by the global dispatcher, and you don't need to define anything during deployment to use it. +* **internal-dispatcher** - To protect the internal Actors that is spawned by the various Akka modules, a separate internal dispatcher is used by default. * **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher). * **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher). * **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher). @@ -174,3 +175,10 @@ The following configuration keys are available for any dispatcher configuration: > [!NOTE] > The throughput-deadline-time is used as a *best effort*, not as a *hard limit*. This means that if a message takes more time than the deadline allows, Akka.NET won't interrupt the process. Instead it will wait for it to finish before giving turn to the next actor. + +## Dispatcher aliases + +When a dispatcher is looked up, and the given setting contains a string rather than a dispatcher config block, +the lookup will treat it as an alias, and follow that string to an alternate location for a dispatcher config. +If the dispatcher config is referenced both through an alias and through the absolute path only one dispatcher will +be used and shared among the two ids. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index e9c240ead89..c65a29bc904 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -262,7 +262,7 @@ public ClusterSharding(ExtendedActorSystem system) { var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name"); var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher"); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName); }); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index 36f571ec725..979eaa2df76 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -136,7 +136,7 @@ akka.cluster.sharding { coordinator-singleton = "akka.cluster.singleton" # The id of the dispatcher to use for ClusterSharding actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. # This dispatcher for the entity actors is defined by the user provided # Props, i.e. this dispatcher is not used for the entity actors. diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs index a2909ea966f..99ed47f5489 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs @@ -133,7 +133,7 @@ private IActorRef CreateReceptionist() { var name = _config.GetString("name"); var dispatcher = _config.GetString("use-dispatcher", null); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; // important to use var mediator here to activate it outside of ClusterReceptionist constructor var mediator = PubSubMediator; diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf index dc0c97d7e61..c71820b6282 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf @@ -24,7 +24,7 @@ akka.cluster.client.receptionist { response-tunnel-receive-timeout = 30s # The id of the dispatcher to use for ClusterReceptionist actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs index ac260f426dd..39829f71ced 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs @@ -102,7 +102,7 @@ private IActorRef CreateMediator() var name = _system.Settings.Config.GetString("akka.cluster.pub-sub.name"); var dispatcher = _system.Settings.Config.GetString("akka.cluster.pub-sub.use-dispatcher", null); if (string.IsNullOrEmpty(dispatcher)) - dispatcher = Dispatchers.DefaultDispatcherId; + dispatcher = Dispatchers.InternalDispatcherId; return _system.SystemActorOf( Props.Create(() => new DistributedPubSubMediator(_settings)) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf index f3b73ed2b32..9590dcd8841 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf @@ -30,7 +30,7 @@ akka.cluster.pub-sub { max-delta-elements = 3000 # The id of the dispatcher to use for DistributedPubSubMediator actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs b/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs index 149f8b098cd..c6f9d188d3e 100644 --- a/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs +++ b/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs @@ -43,7 +43,7 @@ public static ReplicatorSettings Create(Config config) throw ConfigurationException.NullOrEmptyConfig(); var dispatcher = config.GetString("use-dispatcher", null); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; var durableConfig = config.GetConfig("durable"); var durableKeys = durableConfig.GetStringList("keys"); @@ -63,7 +63,7 @@ public static ReplicatorSettings Create(Config config) { throw new ArgumentException($"`akka.cluster.distributed-data.durable.store-actor-class` is set to an invalid class {durableStoreType}."); } - durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(durableConfig.GetString("use-dispatcher")); + durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(dispatcher); } // TODO: This constructor call fails when these fields are not populated inside the Config object: @@ -212,7 +212,7 @@ private ReplicatorSettings Copy(string role = null, public ReplicatorSettings WithGossipInterval(TimeSpan gossipInterval) => Copy(gossipInterval: gossipInterval); public ReplicatorSettings WithNotifySubscribersInterval(TimeSpan notifySubscribersInterval) => Copy(notifySubscribersInterval: notifySubscribersInterval); public ReplicatorSettings WithMaxDeltaElements(int maxDeltaElements) => Copy(maxDeltaElements: maxDeltaElements); - public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.DefaultDispatcherId : dispatcher); + public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.InternalDispatcherId : dispatcher); public ReplicatorSettings WithPruning(TimeSpan pruningInterval, TimeSpan maxPruningDissemination) => Copy(pruningInterval: pruningInterval, maxPruningDissemination: maxPruningDissemination); public ReplicatorSettings WithDurableKeys(IImmutableSet durableKeys) => Copy(durableKeys: durableKeys); diff --git a/src/contrib/cluster/Akka.DistributedData/reference.conf b/src/contrib/cluster/Akka.DistributedData/reference.conf index e154cbf9cc0..4652b28bd42 100644 --- a/src/contrib/cluster/Akka.DistributedData/reference.conf +++ b/src/contrib/cluster/Akka.DistributedData/reference.conf @@ -26,8 +26,8 @@ akka.cluster.distributed-data { # the replicas. Next chunk will be transferred in next round of gossip. max-delta-elements = 1000 - # The id of the dispatcher to use for Replicator actors. If not specified - # default dispatcher is used. + # The id of the dispatcher to use for Replicator actors. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs index 091d7ef6469..1b2e96aae91 100644 --- a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs @@ -45,7 +45,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements() settings.MinNrOfMembers.Should().Be(1); settings.MinNrOfMembersOfRole.Should().Equal(ImmutableDictionary.Empty); settings.Roles.Should().BeEquivalentTo(ImmutableHashSet.Empty); - settings.UseDispatcher.Should().Be(Dispatchers.DefaultDispatcherId); + settings.UseDispatcher.Should().Be(Dispatchers.InternalDispatcherId); settings.GossipDifferentViewProbability.Should().Be(0.8); settings.ReduceGossipDifferentViewProbability.Should().Be(400); diff --git a/src/core/Akka.Cluster/ClusterSettings.cs b/src/core/Akka.Cluster/ClusterSettings.cs index 87001228652..e829e508ff3 100644 --- a/src/core/Akka.Cluster/ClusterSettings.cs +++ b/src/core/Akka.Cluster/ClusterSettings.cs @@ -70,7 +70,7 @@ public ClusterSettings(Config config, string systemName) MinNrOfMembers = clusterConfig.GetInt("min-nr-of-members", 0); _useDispatcher = clusterConfig.GetString("use-dispatcher", null); - if (String.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.InternalDispatcherId; GossipDifferentViewProbability = clusterConfig.GetDouble("gossip-different-view-probability", 0); ReduceGossipDifferentViewProbability = clusterConfig.GetInt("reduce-gossip-different-view-probability", 0); SchedulerTickDuration = clusterConfig.GetTimeSpan("scheduler.tick-duration", null); diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index 8f24cef4f6e..2c486d23f1b 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -128,8 +128,8 @@ akka { # Disable with "off". publish-stats-interval = off - # The id of the dispatcher to use for cluster actors. If not specified - # default dispatcher is used. + # The id of the dispatcher to use for cluster actors. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 870cfa1c641..b84de30cfec 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -219,7 +219,7 @@ public sealed class Dispatchers /// public static readonly string DefaultDispatcherId = "akka.actor.default-dispatcher"; - public const string InternalDispatcherId = "akka.actor.internal-dispatcher"; + internal const string InternalDispatcherId = "akka.actor.internal-dispatcher"; /// /// The identifier for synchronized dispatchers. diff --git a/src/core/Akka/Properties/AssemblyInfo.cs b/src/core/Akka/Properties/AssemblyInfo.cs index 9ae4b1cc883..1cad13e7549 100644 --- a/src/core/Akka/Properties/AssemblyInfo.cs +++ b/src/core/Akka/Properties/AssemblyInfo.cs @@ -31,6 +31,8 @@ [assembly: InternalsVisibleTo("Akka.Remote.Tests.MultiNode")] [assembly: InternalsVisibleTo("Akka.Remote.TestKit.Tests")] [assembly: InternalsVisibleTo("Akka.Cluster")] +[assembly: InternalsVisibleTo("Akka.Cluster.Sharding")] +[assembly: InternalsVisibleTo("Akka.Cluster.Tools")] [assembly: InternalsVisibleTo("Akka.Cluster.Tests")] [assembly: InternalsVisibleTo("Akka.MultiNodeTestRunner.Shared.Tests")] [assembly: InternalsVisibleTo("Akka.Cluster.Tests.MultiNode")] From d9a0e9c62b0cd3c9e9f48abfcb9bc4bedaa72ddb Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 15 Jul 2020 01:08:53 +0700 Subject: [PATCH 4/9] Port internal blocking dispatcher and default internal dispatcher for Akka.Stream --- .../Dsl/UnfoldResourceAsyncSourceSpec.cs | 2 +- .../Dsl/UnfoldResourceSourceSpec.cs | 2 +- .../Akka.Streams.Tests/IO/FileSinkSpec.cs | 17 +- .../Akka.Streams.Tests/IO/FileSourceSpec.cs | 2 +- .../IO/InputStreamSinkSpec.cs | 4 +- .../IO/OutputStreamSourceSpec.cs | 2 +- .../StreamDispatcherSpec.cs | 37 ++++ src/core/Akka.Streams/ActorMaterializer.cs | 165 +++++++++++++++--- src/core/Akka.Streams/Attributes.cs | 4 +- .../Akka.Streams/Implementation/IO/IOSinks.cs | 18 +- .../Implementation/IO/IOSources.cs | 7 +- .../Implementation/Stages/Stages.cs | 3 +- src/core/Akka.Streams/reference.conf | 22 +-- src/core/Akka/Configuration/Pigeon.conf | 17 +- src/core/Akka/Dispatch/Dispatchers.cs | 12 +- 15 files changed, 256 insertions(+), 58 deletions(-) create mode 100644 src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs index 8e58384f08a..923a30e94c3 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs @@ -293,7 +293,7 @@ public void A_UnfoldResourceAsyncSource_must_use_dedicated_blocking_io_dispatche var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSourceAsync")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 3a80cbee941..3600bec0fe1 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -211,7 +211,7 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 0a975dd3557..b0e2c4a986b 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Dispatch; using Akka.IO; using Akka.Streams.Dsl; using Akka.Streams.Implementation; @@ -217,20 +218,24 @@ public void SynchronousFileSink_should_use_dedicated_blocking_io_dispatcher_by_d { TargetFile(f => { - var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("FileSinkSpec-dispatcher-testing-1", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try { //hack for Iterator.continually - Source.FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), Int32.MaxValue).GetEnumerator()) + Source + .FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), int.MaxValue).GetEnumerator()) .RunWith(FileIO.ToFile(f), materializer); - ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); + ((ActorMaterializerImpl)materializer) + .Supervisor + .Tell(StreamSupervisor.GetChildren.Instance, TestActor); var refs = ExpectMsg().Refs; - //NOTE: Akka uses "fileSource" as name for DefaultAttributes.FileSink - I think it's mistake on the JVM implementation side var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("fileSink")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + + // haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { @@ -248,7 +253,7 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att { TargetFile(f => { - var sys = ActorSystem.Create("dispatcher_testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("FileSinkSpec-dispatcher-testing-2", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index b484e2037b1..029a7e53033 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -280,7 +280,7 @@ public void FileSource_should_use_dedicated_blocking_io_dispatcher_by_default() var actorRef = ExpectMsg().Refs.First(r => r.Path.ToString().Contains("fileSource")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs index 5f4fe22c7f1..2df6ac577e8 100644 --- a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs @@ -302,7 +302,7 @@ public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_ { this.AssertAllStagesStopped(() => { - var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("InputStreamSink-testing", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try { @@ -310,7 +310,7 @@ public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_ (materializer as ActorMaterializerImpl).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); var children = ExpectMsg().Refs; var actorRef = children.First(c => c.Path.ToString().Contains("inputStreamSink")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs index 0c251fe0118..19e14c16869 100644 --- a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs @@ -213,7 +213,7 @@ public void OutputStreamSource_must_use_dedicated_default_blocking_io_dispatcher TestActor); var actorRef = ExpectMsg() .Refs.First(c => c.Path.ToString().Contains("outputStreamSource")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs b/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs new file mode 100644 index 00000000000..5cb1ee4ddff --- /dev/null +++ b/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Dispatch; +using Akka.TestKit; +using Xunit; +using FluentAssertions; + +namespace Akka.Streams.Tests +{ + public class StreamDispatcherSpec : AkkaSpec + { + [Fact] + public void The_default_blocking_io_dispatcher_for_streams_must_be_the_same_as_the_default_blocking_io_dispatcher_for_actors() + { + var materializer = ActorMaterializer.Create(Sys); + + var streamIoDispatcher = Sys.Dispatchers.Lookup(ActorAttributes.IODispatcher.Name); + var actorIoDispatcher = Sys.Dispatchers.Lookup(Dispatchers.DefaultBlockingDispatcherId); + + streamIoDispatcher.Should().Be(actorIoDispatcher); + } + + [Fact] + public void The_deprecated_default_stream_io_dispatcher_must_be_the_same_as_the_default_blocking_io_dispatcher_for_actors() + { + var materializer = ActorMaterializer.Create(Sys); + + var streamIoDispatcher = Sys.Dispatchers.Lookup("akka.stream.default-blocking-io-dispatcher"); + var actorIoDispatcher = Sys.Dispatchers.Lookup(Dispatchers.DefaultBlockingDispatcherId); + + streamIoDispatcher.Should().Be(actorIoDispatcher); + } + } +} diff --git a/src/core/Akka.Streams/ActorMaterializer.cs b/src/core/Akka.Streams/ActorMaterializer.cs index fc8866314f8..aa0c8c11fd3 100644 --- a/src/core/Akka.Streams/ActorMaterializer.cs +++ b/src/core/Akka.Streams/ActorMaterializer.cs @@ -336,7 +336,7 @@ private static ActorMaterializerSettings Create(Config config) streamRefSettings: StreamRefSettings.Create(config.GetConfig("stream-ref"))); } - private const int DefaultlMaxFixedbufferSize = 1000; + private const int DefaultMaxFixedBufferSize = 1000; /// /// TBD /// @@ -401,9 +401,35 @@ private static ActorMaterializerSettings Create(Config config) /// TBD /// TBD /// TBD + /// TBD /// TBD - public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Decider supervisionDecider, StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = DefaultlMaxFixedbufferSize) + public ActorMaterializerSettings( + int initialInputBufferSize, + int maxInputBufferSize, + string dispatcher, + Decider supervisionDecider, + StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, + StreamRefSettings streamRefSettings, + bool isDebugLogging, + int outputBurstLimit, + bool isFuzzingMode, + bool isAutoFusing, + int maxFixedBufferSize, + int syncProcessingLimit = DefaultMaxFixedBufferSize) { + if(initialInputBufferSize <= 0) + throw new ArgumentException($"{nameof(initialInputBufferSize)} must be > 0", nameof(initialInputBufferSize)); + if(syncProcessingLimit <= 0) + throw new ArgumentException($"{nameof(syncProcessingLimit)} must be > 0", nameof(syncProcessingLimit)); + + if(maxInputBufferSize <= 0) + throw new ArgumentException($"{nameof(maxInputBufferSize)} must be > 0", nameof(maxInputBufferSize)); + if((maxInputBufferSize & (maxInputBufferSize - 1)) != 0) + throw new ArgumentException($"{nameof(maxInputBufferSize)} must be a power of two", nameof(maxInputBufferSize)); + + if(initialInputBufferSize > maxInputBufferSize) + throw new ArgumentException($"initialInputBufferSize({initialInputBufferSize}) must be <= maxInputBufferSize({maxInputBufferSize})"); + InitialInputBufferSize = initialInputBufferSize; MaxInputBufferSize = maxInputBufferSize; Dispatcher = dispatcher; @@ -418,55 +444,110 @@ public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferS StreamRefSettings = streamRefSettings; } + private ActorMaterializerSettings Copy( + int? initialInputBufferSize = null, + int? maxInputBufferSize = null, + string dispatcher = null, + Decider supervisionDecider = null, + StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings = null, + StreamRefSettings streamRefSettings = null, + bool? isDebugLogging = null, + int? outputBurstLimit = null, + bool? isFuzzingMode = null, + bool? isAutoFusing = null, + int? maxFixedBufferSize = null, + int? syncProcessingLimit = null) + { + return new ActorMaterializerSettings( + initialInputBufferSize??InitialInputBufferSize, + maxInputBufferSize??MaxInputBufferSize, + dispatcher??Dispatcher, + supervisionDecider??SupervisionDecider, + subscriptionTimeoutSettings??SubscriptionTimeoutSettings, + streamRefSettings ?? StreamRefSettings, + isDebugLogging ?? IsDebugLogging, + outputBurstLimit??OutputBurstLimit, + isFuzzingMode??IsFuzzingMode, + isAutoFusing??IsAutoFusing, + maxFixedBufferSize??MaxFixedBufferSize, + syncProcessingLimit??SyncProcessingLimit); + } + /// - /// TBD + /// Each asynchronous piece of a materialized stream topology is executed by one Actor + /// that manages an input buffer for all inlets of its shape. This setting configures + /// the default for initial and maximal input buffer in number of elements for each inlet. + /// This can be overridden for individual parts of the + /// stream topology by using . /// /// TBD /// TBD /// TBD public ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { - return new ActorMaterializerSettings(initialSize, maxSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (initialSize == InitialInputBufferSize && maxSize == MaxInputBufferSize) + return this; + return Copy(initialInputBufferSize: initialSize, maxInputBufferSize: maxSize); } /// - /// TBD + /// This setting configures the default dispatcher to be used by streams materialized + /// with the . This can be overridden for individual parts of the + /// stream topology by using . /// /// TBD /// TBD public ActorMaterializerSettings WithDispatcher(string dispatcher) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (dispatcher == Dispatcher) return this; + return Copy(dispatcher: dispatcher); } /// - /// TBD + /// Decides how exceptions from application code are to be handled, unless + /// overridden for specific flows of the stream operations with + /// /// /// TBD /// TBD public ActorMaterializerSettings WithSupervisionStrategy(Decider decider) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, decider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (decider.Equals(SupervisionDecider)) return this; + return Copy(supervisionDecider: decider); } /// - /// TBD + /// Enable to log all elements that are dropped due to failures (at DEBUG level). /// /// TBD /// TBD public ActorMaterializerSettings WithDebugLogging(bool isEnabled) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, isEnabled, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsDebugLogging == isEnabled) return this; + return Copy(isDebugLogging: isEnabled); } /// - /// TBD + /// Test utility: fuzzing mode means that GraphStage events are not processed + /// in FIFO order within a fused subgraph, but randomized. /// /// TBD /// TBD public ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, isFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsFuzzingMode == isFuzzingMode) return this; + return Copy(isFuzzingMode: isFuzzingMode); + } + + /// + /// Maximum number of elements emitted in batch if downstream signals large demand. + /// + /// + /// + public ActorMaterializerSettings WithOutputBurstLimit(int limit) + { + if (limit == OutputBurstLimit) return this; + return Copy(outputBurstLimit: limit); } /// @@ -476,31 +557,37 @@ public ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) /// TBD public ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, isAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsAutoFusing == isAutoFusing) return this; + return Copy(isAutoFusing: isAutoFusing); } /// - /// TBD + /// Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated. + /// This defaults to a large value because it is usually better to fail early when + /// system memory is not sufficient to hold the buffer. /// /// TBD /// TBD public ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, maxFixedBufferSize, SyncProcessingLimit); + if (MaxFixedBufferSize == maxFixedBufferSize) return this; + return Copy(maxFixedBufferSize: maxFixedBufferSize); } /// - /// TBD + /// Limit for number of messages that can be processed synchronously in stream to substream communication /// /// TBD /// TBD public ActorMaterializerSettings WithSyncProcessingLimit(int limit) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, limit); + if (SyncProcessingLimit == limit) return this; + return Copy(syncProcessingLimit: limit); } /// - /// TBD + /// Leaked publishers and subscribers are cleaned up when they are not used within a given + /// deadline, configured by . /// /// TBD /// TBD @@ -508,17 +595,49 @@ public ActorMaterializerSettings WithSubscriptionTimeoutSettings(StreamSubscript { if (Equals(settings, SubscriptionTimeoutSettings)) return this; - - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, settings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + return Copy(subscriptionTimeoutSettings: settings); } public ActorMaterializerSettings WithStreamRefSettings(StreamRefSettings settings) { if (settings == null) throw new ArgumentNullException(nameof(settings)); if (ReferenceEquals(settings, this.StreamRefSettings)) return this; - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, - SupervisionDecider, SubscriptionTimeoutSettings, settings, IsDebugLogging, OutputBurstLimit, - IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + return Copy(streamRefSettings: settings); + } + + public override bool Equals(object obj) + { + if (!(obj is ActorMaterializerSettings s)) return false; + return + s.InitialInputBufferSize == InitialInputBufferSize && + s.MaxInputBufferSize == MaxInputBufferSize && + s.Dispatcher == Dispatcher && + s.SupervisionDecider == SupervisionDecider && + s.SubscriptionTimeoutSettings == SubscriptionTimeoutSettings && + s.IsDebugLogging == IsDebugLogging && + s.OutputBurstLimit == OutputBurstLimit && + s.SyncProcessingLimit == SyncProcessingLimit && + s.IsFuzzingMode == IsFuzzingMode && + s.IsAutoFusing == IsAutoFusing && + s.SubscriptionTimeoutSettings == SubscriptionTimeoutSettings && + s.StreamRefSettings == StreamRefSettings; + } + + internal Attributes ToAttributes() + { + return new Attributes(new Attributes.IAttribute[] + { + new Attributes.InputBuffer(InitialInputBufferSize, MaxInputBufferSize), + Attributes.CancellationStrategy.Default, + new ActorAttributes.Dispatcher(Dispatcher), + new ActorAttributes.SupervisionStrategy(SupervisionDecider), + new ActorAttributes.DebugLogging(IsDebugLogging), + new ActorAttributes.StreamSubscriptionTimeout(SubscriptionTimeoutSettings.Timeout, SubscriptionTimeoutSettings.Mode), + new ActorAttributes.OutputBurstLimit(OutputBurstLimit), + new ActorAttributes.FuzzingMode(IsFuzzingMode), + new ActorAttributes.MaxFixedBufferSize(MaxFixedBufferSize), + new ActorAttributes.SyncProcessingLimit(SyncProcessingLimit), + }); } } diff --git a/src/core/Akka.Streams/Attributes.cs b/src/core/Akka.Streams/Attributes.cs index d4b79abf32d..da57c7ab2c6 100644 --- a/src/core/Akka.Streams/Attributes.cs +++ b/src/core/Akka.Streams/Attributes.cs @@ -228,7 +228,7 @@ private AsyncBoundary() { } /// public sealed class CancellationStrategy:IMandatoryAttribute { - internal CancellationStrategy Default { get; } = new CancellationStrategy(new PropagateFailure()); + internal static CancellationStrategy Default { get; } = new CancellationStrategy(new PropagateFailure()); public IStrategy Strategy { get; } @@ -596,6 +596,8 @@ public SupervisionStrategy(Decider decider) public override string ToString() => "SupervisionStrategy"; } + public static Dispatcher IODispatcher { get; } = new Dispatcher("akka.stream.materializer.blocking-io-dispatcher"); + /// /// Enables additional low level troubleshooting logging at DEBUG log level /// diff --git a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs index b9cd2830aaa..b4779e7ef22 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs @@ -87,7 +87,12 @@ public override object Create(MaterializationContext context, out Task var props = FileSubscriber.Props(_f, ioResultPromise, settings.MaxInputBufferSize, _startPosition, _fileMode); var dispatcher = context.EffectiveAttributes.GetAttribute(DefaultAttributes.IODispatcher.AttributeList.First()) as ActorAttributes.Dispatcher; - var actorRef = mat.ActorOf(context, props.WithDispatcher(dispatcher.Name)); + var actorRef = mat.ActorOf( + context, + props.WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name)); materializer = ioResultPromise.Task; return new ActorSubscriberImpl(actorRef); } @@ -151,7 +156,16 @@ public override object Create(MaterializationContext context, out Task var ioResultPromise = new TaskCompletionSource(); var os = _createOutput(); - var props = OutputStreamSubscriber.Props(os, ioResultPromise, settings.MaxInputBufferSize, _autoFlush); + var maxInputBufferSize = context + .EffectiveAttributes + .GetMandatoryAttribute() + .Max; + var props = OutputStreamSubscriber + .Props(os, ioResultPromise, maxInputBufferSize, _autoFlush) + .WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name); var actorRef = mat.ActorOf(context, props); materializer = ioResultPromise.Task; diff --git a/src/core/Akka.Streams/Implementation/IO/IOSources.cs b/src/core/Akka.Streams/Implementation/IO/IOSources.cs index 3e7445ffdb6..c8a22407372 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSources.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSources.cs @@ -160,7 +160,12 @@ public override IPublisher Create(MaterializationContext context, ou { // can throw, i.e. FileNotFound var inputStream = _createInputStream(); - var props = InputStreamPublisher.Props(inputStream, ioResultPromise, _chunkSize); + var props = InputStreamPublisher + .Props(inputStream, ioResultPromise, _chunkSize) + .WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name); var actorRef = materializer.ActorOf(context, props); pub = new ActorPublisherImpl(actorRef); } diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index d58e3497e0a..1e8a632931c 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -7,6 +7,7 @@ using System; using System.Threading.Tasks; +using Akka.Dispatch; using Akka.Streams.Stage; using Akka.Streams.Supervision; @@ -20,7 +21,7 @@ public static class DefaultAttributes /// /// TBD /// - public static readonly Attributes IODispatcher = ActorAttributes.CreateDispatcher("akka.stream.default-blocking-io-dispatcher"); + public static readonly Attributes IODispatcher = ActorAttributes.CreateDispatcher(ActorAttributes.IODispatcher.Name); /// /// TBD diff --git a/src/core/Akka.Streams/reference.conf b/src/core/Akka.Streams/reference.conf index 75332c14097..7c69c485a3d 100644 --- a/src/core/Akka.Streams/reference.conf +++ b/src/core/Akka.Streams/reference.conf @@ -22,6 +22,8 @@ akka { # Note: If you change this value also change the fallback value in ActorMaterializerSettings dispatcher = "" + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + # Cleanup leaked publishers and subscribers when they are not used within a given # deadline subscription-timeout { @@ -118,22 +120,12 @@ akka { } } - # Fully qualified config path which holds the dispatcher configuration - # to be used by FlowMaterialiser when creating Actors for IO operations, - # such as FileSource, FileSink and others. - blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - - default-blocking-io-dispatcher { - type = "Dispatcher" - executor = "thread-pool-executor" - throughput = 1 + # Deprecated, left here to not break Akka HTTP which refers to it + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" - thread-pool-executor { - core-pool-size-min = 2 - core-pool-size-factor = 2.0 - core-pool-size-max = 16 - } - } + # Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher' + # instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute + default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" } # configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections) diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index 361985082ec..2b8373d3829 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -364,13 +364,26 @@ akka { type = "Dispatcher" executor = "fork-join-executor" throughput = 5 + fork-join-executor { parallelism-min = 4 parallelism-factor = 1.0 parallelism-max = 64 } } - + + default-blocking-io-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + throughput = 1 + + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-factor = 2.0 + core-pool-size-max = 16 + } + } + default-mailbox { # FQCN of the MailboxType. The Class of the FQCN must have a public # constructor with @@ -659,7 +672,7 @@ akka { # Fully qualified config path which holds the dispatcher configuration # on which file IO tasks are scheduled - file-io-dispatcher = "akka.actor.internal-dispatcher" + file-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" # The maximum number of bytes (or "unlimited") to transfer in one batch # when using `WriteFile` command which uses `FileChannel.transferTo` to diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index b84de30cfec..2f4dc401345 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -219,7 +219,17 @@ public sealed class Dispatchers /// public static readonly string DefaultDispatcherId = "akka.actor.default-dispatcher"; - internal const string InternalDispatcherId = "akka.actor.internal-dispatcher"; + /// + /// The id of a default dispatcher to use for operations known to be blocking. Note that + /// for optimal performance you will want to isolate different blocking resources + /// on different thread pools. + /// + public static readonly string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + + /// + /// INTERNAL API + /// + internal static readonly string InternalDispatcherId = "akka.actor.internal-dispatcher"; /// /// The identifier for synchronized dispatchers. From 1d6e58864e73cce60afa0f298b06dc02d19fe527 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 15 Jul 2020 03:11:09 +0700 Subject: [PATCH 5/9] Add circular dispatcher alias reference checking, unroll recursion --- .../Actor/ActorSystemDispatcherSpec.cs | 29 ++++++++------- src/core/Akka/Dispatch/Dispatchers.cs | 37 +++++++++++-------- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs index 101dbf8d996..a4fc4de9d3e 100644 --- a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs +++ b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs @@ -13,25 +13,19 @@ using Akka.Util; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Tests.Actor { public class ActorSystemDispatcherSpec : AkkaSpec { - private class SnitchingSynchonizationContext : SynchronizationContext - { - private readonly IActorRef _testActor; + private static Config Config => ConfigurationFactory.ParseString(@" + dispatcher-loop-1 = dispatcher-loop-2 + dispatcher-loop-2 = dispatcher-loop-1 +"); - public SnitchingSynchonizationContext(IActorRef testActor) - { - _testActor = testActor; - } - - public override void OperationStarted() - { - _testActor.Tell("called"); - } - } + public ActorSystemDispatcherSpec(ITestOutputHelper output):base(output, Config) + { } [Fact] public void The_ActorSystem_must_not_use_passed_in_SynchronizationContext_if_executor_is_configured_in() @@ -76,6 +70,15 @@ public void The_ActorSystem_must_provide_a_single_place_to_override_the_internal } } + [Fact] + public void The_ActorSystem_must_provide_a_good_error_on_a_dispatcher_alias_loop_in_config() + { + Sys.Dispatchers.Invoking(d => d.Lookup("dispatcher-loop-1")) + .ShouldThrow() + .And.Message + .StartsWith("Could not find a concrete dispatcher config after following").ShouldBeTrue(); + } + private string UserGuardianDispatcher(ActorSystem system) { var impl = (ActorSystemImpl)system; diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 2f4dc401345..ee924e08ec2 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -231,6 +231,8 @@ public sealed class Dispatchers /// internal static readonly string InternalDispatcherId = "akka.actor.internal-dispatcher"; + private const int MaxDispatcherAliasDepth = 20; + /// /// The identifier for synchronized dispatchers. /// @@ -316,7 +318,7 @@ internal void ReloadPrerequisites(IDispatcherPrerequisites prerequisites) /// TBD public MessageDispatcher Lookup(string dispatcherName) { - return LookupConfigurator(dispatcherName, 0).Dispatcher(); + return LookupConfigurator(dispatcherName).Dispatcher(); } /// @@ -331,14 +333,17 @@ public bool HasDispatcher(string id) return _dispatcherConfigurators.ContainsKey(id) || _cachingConfig.HasPath(id); } - private MessageDispatcherConfigurator LookupConfigurator(string id, int depth) + private MessageDispatcherConfigurator LookupConfigurator(string id) { - if (!_dispatcherConfigurators.TryGetValue(id, out var configurator)) + var depth = 0; + while(depth < MaxDispatcherAliasDepth) { + if (_dispatcherConfigurators.TryGetValue(id, out var configurator)) + return configurator; + // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. - MessageDispatcherConfigurator newConfigurator; if (_cachingConfig.HasPath(id)) { var valueAtPath = _cachingConfig.GetValue(id); @@ -349,19 +354,21 @@ private MessageDispatcherConfigurator LookupConfigurator(string id, int depth) // both under the actual id and the alias id in the 'dispatcherConfigurators' cache var actualId = valueAtPath.GetString(); _logger.Debug($"Dispatcher id [{id}] is an alias, actual dispatcher will be [{actualId}]"); - newConfigurator = LookupConfigurator(actualId, depth + 1); - } else if (valueAtPath.IsObject()) - newConfigurator = ConfiguratorFrom(Config(id)); - else - throw new ConfigurationException($"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); - } - else - throw new ConfigurationException($"Dispatcher {id} not configured."); + id = actualId; + depth++; + continue; + } - return _dispatcherConfigurators.TryAdd(id, newConfigurator) ? newConfigurator : _dispatcherConfigurators[id]; + if (valueAtPath.IsObject()) + { + var newConfigurator = ConfiguratorFrom(Config(id)); + return _dispatcherConfigurators.TryAdd(id, newConfigurator) ? newConfigurator : _dispatcherConfigurators[id]; + } + throw new ConfigurationException($"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); + } + throw new ConfigurationException($"Dispatcher {id} not configured."); } - - return configurator; + throw new ConfigurationException($"Could not find a concrete dispatcher config after following {MaxDispatcherAliasDepth} deep. Is there a circular reference in your config? Last followed Id was [{id}]"); } /// From 9acb12a918383dfe55e28c6ca4f144c1cd5f6a76 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 15 Jul 2020 03:49:34 +0700 Subject: [PATCH 6/9] Make ClusterSingletonManager and Proxy to use internal dispatchers --- .../Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs | 5 ++++- .../Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs | 5 ++++- src/core/Akka/Actor/ActorRefProvider.cs | 3 +-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index 027f71c3647..9fd06898419 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -14,6 +14,7 @@ using Akka.Actor; using Akka.Configuration; using Akka.Coordination; +using Akka.Dispatch; using Akka.Event; using Akka.Pattern; using Akka.Remote; @@ -604,7 +605,9 @@ public static Props Props(Props singletonProps, ClusterSingletonManagerSettings /// TBD public static Props Props(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings) { - return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)) + .WithDispatcher(Dispatchers.InternalDispatcherId) + .WithDeploy(Deploy.Local); } private readonly Props _singletonProps; diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index 4706669ee8d..f2e38827f2a 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -11,6 +11,7 @@ using System.Linq; using Akka.Actor; using Akka.Configuration; +using Akka.Dispatch; using Akka.Event; namespace Akka.Cluster.Tools.Singleton @@ -70,7 +71,9 @@ public static Config DefaultConfig() /// TBD public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings) { - return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)) + .WithDispatcher(Dispatchers.InternalDispatcherId) + .WithDeploy(Deploy.Local); } private readonly ClusterSingletonProxySettings _settings; diff --git a/src/core/Akka/Actor/ActorRefProvider.cs b/src/core/Akka/Actor/ActorRefProvider.cs index ba028369b57..4e51f2991ee 100644 --- a/src/core/Akka/Actor/ActorRefProvider.cs +++ b/src/core/Akka/Actor/ActorRefProvider.cs @@ -256,8 +256,7 @@ public LocalActorRefProvider(string systemName, Settings settings, EventStream e /// public EventStream EventStream { get { return _eventStream; } } - //private MessageDispatcher DefaultDispatcher { get { return _system.Dispatchers.DefaultGlobalDispatcher; } } - private MessageDispatcher InternalDispatcher => _system.Dispatchers.Lookup(Dispatchers.InternalDispatcherId); + private MessageDispatcher InternalDispatcher => _system.Dispatchers.InternalDispatcher; private SupervisorStrategy UserGuardianSupervisorStrategy { get { return _userGuardianStrategyConfigurator.Create(); } } From e76172b07d6ff93fac83c20a0c9cd588718875d4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 15 Jul 2020 04:22:33 +0700 Subject: [PATCH 7/9] Update API approver list --- .../Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt | 8 ++++++-- .../CoreAPISpec.ApproveStreams.approved.txt | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 2f501b48a23..71b2bde2c88 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -1,5 +1,6 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.TestKit")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests.MultiNode")] @@ -1840,12 +1841,13 @@ namespace Akka.Actor.Internal public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { public ActorSystemImpl(string name) { } - public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup) { } + public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup, System.Nullable> guardianProps = null) { } public override Akka.Actor.ActorProducerPipelineResolver ActorPipelineResolver { get; } public override Akka.Actor.IActorRef DeadLetters { get; } public override Akka.Dispatch.Dispatchers Dispatchers { get; } public override Akka.Event.EventStream EventStream { get; } public override Akka.Actor.IInternalActorRef Guardian { get; } + public Akka.Util.Option GuardianProps { get; } public override Akka.Event.ILoggingAdapter Log { get; } public override Akka.Actor.IInternalActorRef LookupRoot { get; } public override Akka.Dispatch.Mailboxes Mailboxes { get; } @@ -2451,9 +2453,10 @@ namespace Akka.Dispatch } public sealed class Dispatchers { + public static readonly string DefaultBlockingDispatcherId; public static readonly string DefaultDispatcherId; public static readonly string SynchronizedDispatcherId; - public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites) { } + public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { } public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } @@ -4822,6 +4825,7 @@ namespace Akka.Util public static readonly Akka.Util.Option None; public Option(T value) { } public bool HasValue { get; } + public bool IsEmpty { get; } public T Value { get; } public bool Equals(Akka.Util.Option other) { } public override bool Equals(object obj) { } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 1400a378611..e57c72fe097 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -17,6 +17,7 @@ namespace Akka.Streams } public class static ActorAttributes { + public static Akka.Streams.ActorAttributes.Dispatcher IODispatcher { get; } public static Akka.Streams.Attributes CreateDebugLogging(bool enabled) { } public static Akka.Streams.Attributes CreateDispatcher(string dispatcherName) { } public static Akka.Streams.Attributes CreateFuzzingMode(bool enabled) { } @@ -138,12 +139,14 @@ namespace Akka.Streams public readonly int SyncProcessingLimit; public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Akka.Streams.Supervision.Decider supervisionDecider, Akka.Streams.StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, Akka.Streams.Dsl.StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = 1000) { } public static Akka.Streams.ActorMaterializerSettings Create(Akka.Actor.ActorSystem system) { } + public override bool Equals(object obj) { } public Akka.Streams.ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { } public Akka.Streams.ActorMaterializerSettings WithDebugLogging(bool isEnabled) { } public Akka.Streams.ActorMaterializerSettings WithDispatcher(string dispatcher) { } public Akka.Streams.ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { } public Akka.Streams.ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { } public Akka.Streams.ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { } + public Akka.Streams.ActorMaterializerSettings WithOutputBurstLimit(int limit) { } public Akka.Streams.ActorMaterializerSettings WithStreamRefSettings(Akka.Streams.Dsl.StreamRefSettings settings) { } public Akka.Streams.ActorMaterializerSettings WithSubscriptionTimeoutSettings(Akka.Streams.StreamSubscriptionTimeoutSettings settings) { } public Akka.Streams.ActorMaterializerSettings WithSupervisionStrategy(Akka.Streams.Supervision.Decider decider) { } From 32a07365700b8833d86d468385387aad8a1b9a60 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 22 Jul 2020 00:05:22 +0700 Subject: [PATCH 8/9] Remove SynchronizationContext --- .../Akka/Helios.Concurrency.DedicatedThreadPool.cs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs index 9ceadcf2b4c..788efed9f5e 100644 --- a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs +++ b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs @@ -94,16 +94,15 @@ public DedicatedThreadPoolSettings(int numThreads, throw new ArgumentOutOfRangeException(nameof(numThreads), $"numThreads must be at least 1. Was {numThreads}"); } #else - public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null, SynchronizationContext synchronizationContext = null) - : this(numThreads, DefaultThreadType, name, deadlockTimeout, synchronizationContext) + public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null) + : this(numThreads, DefaultThreadType, name, deadlockTimeout) { } public DedicatedThreadPoolSettings( int numThreads, ThreadType threadType, string name = null, - TimeSpan? deadlockTimeout = null, - SynchronizationContext synchronizationContext = null) + TimeSpan? deadlockTimeout = null) { Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid()); ThreadType = threadType; @@ -113,7 +112,6 @@ public DedicatedThreadPoolSettings( throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout)); if (numThreads <= 0) throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads)); - SynchronizationContext = synchronizationContext ?? SynchronizationContext.Current; } #endif @@ -156,8 +154,6 @@ public DedicatedThreadPoolSettings( /// Gets the thread stack size, 0 represents the default stack size. /// public int ThreadMaxStackSize { get; private set; } - - public SynchronizationContext SynchronizationContext { get; } } /// From 8f8f6f0016c8c16044d32e2d62b07bb479b62fe1 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 22 Jul 2020 00:44:46 +0700 Subject: [PATCH 9/9] Clean up Pigeon.conf config file --- src/core/Akka/Configuration/Pigeon.conf | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index ee655e40f37..6237292e601 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -305,7 +305,6 @@ akka { # This will be used if you have set "executor = "default-executor"". # Uses the default .NET threadpool default-executor { - } # Same as default executor @@ -329,13 +328,6 @@ akka { # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack # like peeking mode which "pop". task-peeking-mode = "FIFO" - - - dedicated-thread-pool{ #settings for Helios.DedicatedThreadPool - thread-count = 3 #number of threads - #deadlock-timeout = 3s #optional timeout for deadlock detection - threadtype = background #values can be "background" or "foreground" - } } # For running in current synchronization contexts @@ -382,11 +374,12 @@ akka { executor = "thread-pool-executor" throughput = 1 - thread-pool-executor { - core-pool-size-min = 2 - core-pool-size-factor = 2.0 - core-pool-size-max = 16 - } + # Akka.NET does not have a fine grained control over thread pool executor + # thread-pool-executor { + # core-pool-size-min = 2 + # core-pool-size-factor = 2.0 + # core-pool-size-max = 16 + # } } default-mailbox {