diff --git a/docs/articles/actors/dispatchers.md b/docs/articles/actors/dispatchers.md index c6963215c52..73ee10f96b9 100644 --- a/docs/articles/actors/dispatchers.md +++ b/docs/articles/actors/dispatchers.md @@ -70,6 +70,7 @@ system.ActorOf(Props.Create().WithDispatcher("my-dispatcher"), "my-acto Some dispatcher configurations are available out-of-the-box for convenience. You can use them during actor deployment, [as described above](#configuring-dispatchers). * **default-dispatcher** - A configuration that uses the [ThreadPoolDispatcher](#threadpooldispatcher). As the name says, this is the default dispatcher configuration used by the global dispatcher, and you don't need to define anything during deployment to use it. +* **internal-dispatcher** - To protect the internal Actors that is spawned by the various Akka modules, a separate internal dispatcher is used by default. * **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher). * **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher). * **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher). @@ -174,3 +175,10 @@ The following configuration keys are available for any dispatcher configuration: > [!NOTE] > The throughput-deadline-time is used as a *best effort*, not as a *hard limit*. This means that if a message takes more time than the deadline allows, Akka.NET won't interrupt the process. Instead it will wait for it to finish before giving turn to the next actor. + +## Dispatcher aliases + +When a dispatcher is looked up, and the given setting contains a string rather than a dispatcher config block, +the lookup will treat it as an alias, and follow that string to an alternate location for a dispatcher config. +If the dispatcher config is referenced both through an alias and through the absolute path only one dispatcher will +be used and shared among the two ids. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index 8cbd9485670..c7df481a36a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -262,7 +262,7 @@ public ClusterSharding(ExtendedActorSystem system) { var guardianName = system.Settings.Config.GetString("akka.cluster.sharding.guardian-name"); var dispatcher = system.Settings.Config.GetString("akka.cluster.sharding.use-dispatcher"); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; return system.SystemActorOf(Props.Create(() => new ClusterShardingGuardian()).WithDispatcher(dispatcher), guardianName); }); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index 58f9e4bf15a..2a8cc13329d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -137,7 +137,7 @@ akka.cluster.sharding { coordinator-singleton = "akka.cluster.singleton" # The id of the dispatcher to use for ClusterSharding actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. # This dispatcher for the entity actors is defined by the user provided # Props, i.e. this dispatcher is not used for the entity actors. diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs index a2909ea966f..99ed47f5489 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs @@ -133,7 +133,7 @@ private IActorRef CreateReceptionist() { var name = _config.GetString("name"); var dispatcher = _config.GetString("use-dispatcher", null); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; // important to use var mediator here to activate it outside of ClusterReceptionist constructor var mediator = PubSubMediator; diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf index dc0c97d7e61..c71820b6282 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf @@ -24,7 +24,7 @@ akka.cluster.client.receptionist { response-tunnel-receive-timeout = 30s # The id of the dispatcher to use for ClusterReceptionist actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs index ac260f426dd..39829f71ced 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs @@ -102,7 +102,7 @@ private IActorRef CreateMediator() var name = _system.Settings.Config.GetString("akka.cluster.pub-sub.name"); var dispatcher = _system.Settings.Config.GetString("akka.cluster.pub-sub.use-dispatcher", null); if (string.IsNullOrEmpty(dispatcher)) - dispatcher = Dispatchers.DefaultDispatcherId; + dispatcher = Dispatchers.InternalDispatcherId; return _system.SystemActorOf( Props.Create(() => new DistributedPubSubMediator(_settings)) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf index f3b73ed2b32..9590dcd8841 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/reference.conf @@ -30,7 +30,7 @@ akka.cluster.pub-sub { max-delta-elements = 3000 # The id of the dispatcher to use for DistributedPubSubMediator actors. - # If not specified default dispatcher is used. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index 027f71c3647..9fd06898419 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -14,6 +14,7 @@ using Akka.Actor; using Akka.Configuration; using Akka.Coordination; +using Akka.Dispatch; using Akka.Event; using Akka.Pattern; using Akka.Remote; @@ -604,7 +605,9 @@ public static Props Props(Props singletonProps, ClusterSingletonManagerSettings /// TBD public static Props Props(Props singletonProps, object terminationMessage, ClusterSingletonManagerSettings settings) { - return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ClusterSingletonManager(singletonProps, terminationMessage, settings)) + .WithDispatcher(Dispatchers.InternalDispatcherId) + .WithDeploy(Deploy.Local); } private readonly Props _singletonProps; diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index 4706669ee8d..f2e38827f2a 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -11,6 +11,7 @@ using System.Linq; using Akka.Actor; using Akka.Configuration; +using Akka.Dispatch; using Akka.Event; namespace Akka.Cluster.Tools.Singleton @@ -70,7 +71,9 @@ public static Config DefaultConfig() /// TBD public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings) { - return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings)) + .WithDispatcher(Dispatchers.InternalDispatcherId) + .WithDeploy(Deploy.Local); } private readonly ClusterSingletonProxySettings _settings; diff --git a/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs b/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs index 149f8b098cd..c6f9d188d3e 100644 --- a/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs +++ b/src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs @@ -43,7 +43,7 @@ public static ReplicatorSettings Create(Config config) throw ConfigurationException.NullOrEmptyConfig(); var dispatcher = config.GetString("use-dispatcher", null); - if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(dispatcher)) dispatcher = Dispatchers.InternalDispatcherId; var durableConfig = config.GetConfig("durable"); var durableKeys = durableConfig.GetStringList("keys"); @@ -63,7 +63,7 @@ public static ReplicatorSettings Create(Config config) { throw new ArgumentException($"`akka.cluster.distributed-data.durable.store-actor-class` is set to an invalid class {durableStoreType}."); } - durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(durableConfig.GetString("use-dispatcher")); + durableStoreProps = Props.Create(durableStoreType, durableConfig).WithDispatcher(dispatcher); } // TODO: This constructor call fails when these fields are not populated inside the Config object: @@ -212,7 +212,7 @@ private ReplicatorSettings Copy(string role = null, public ReplicatorSettings WithGossipInterval(TimeSpan gossipInterval) => Copy(gossipInterval: gossipInterval); public ReplicatorSettings WithNotifySubscribersInterval(TimeSpan notifySubscribersInterval) => Copy(notifySubscribersInterval: notifySubscribersInterval); public ReplicatorSettings WithMaxDeltaElements(int maxDeltaElements) => Copy(maxDeltaElements: maxDeltaElements); - public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.DefaultDispatcherId : dispatcher); + public ReplicatorSettings WithDispatcher(string dispatcher) => Copy(dispatcher: string.IsNullOrEmpty(dispatcher) ? Dispatchers.InternalDispatcherId : dispatcher); public ReplicatorSettings WithPruning(TimeSpan pruningInterval, TimeSpan maxPruningDissemination) => Copy(pruningInterval: pruningInterval, maxPruningDissemination: maxPruningDissemination); public ReplicatorSettings WithDurableKeys(IImmutableSet durableKeys) => Copy(durableKeys: durableKeys); diff --git a/src/contrib/cluster/Akka.DistributedData/reference.conf b/src/contrib/cluster/Akka.DistributedData/reference.conf index e154cbf9cc0..4652b28bd42 100644 --- a/src/contrib/cluster/Akka.DistributedData/reference.conf +++ b/src/contrib/cluster/Akka.DistributedData/reference.conf @@ -26,8 +26,8 @@ akka.cluster.distributed-data { # the replicas. Next chunk will be transferred in next round of gossip. max-delta-elements = 1000 - # The id of the dispatcher to use for Replicator actors. If not specified - # default dispatcher is used. + # The id of the dispatcher to use for Replicator actors. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 45dd7865683..47d636f79ce 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -1,5 +1,6 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.TestKit")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tests.MultiNode")] @@ -1841,12 +1842,13 @@ namespace Akka.Actor.Internal public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { public ActorSystemImpl(string name) { } - public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup) { } + public ActorSystemImpl(string name, Akka.Configuration.Config config, Akka.Actor.Setup.ActorSystemSetup setup, System.Nullable> guardianProps = null) { } public override Akka.Actor.ActorProducerPipelineResolver ActorPipelineResolver { get; } public override Akka.Actor.IActorRef DeadLetters { get; } public override Akka.Dispatch.Dispatchers Dispatchers { get; } public override Akka.Event.EventStream EventStream { get; } public override Akka.Actor.IInternalActorRef Guardian { get; } + public Akka.Util.Option GuardianProps { get; } public override Akka.Event.ILoggingAdapter Log { get; } public override Akka.Actor.IInternalActorRef LookupRoot { get; } public override Akka.Dispatch.Mailboxes Mailboxes { get; } @@ -2452,9 +2454,10 @@ namespace Akka.Dispatch } public sealed class Dispatchers { + public static readonly string DefaultBlockingDispatcherId; public static readonly string DefaultDispatcherId; public static readonly string SynchronizedDispatcherId; - public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites) { } + public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { } public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } @@ -4838,6 +4841,7 @@ namespace Akka.Util public static readonly Akka.Util.Option None; public Option(T value) { } public bool HasValue { get; } + public bool IsEmpty { get; } public T Value { get; } public bool Equals(Akka.Util.Option other) { } public override bool Equals(object obj) { } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index f446d828a43..ac12fd53bd6 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -19,6 +19,7 @@ namespace Akka.Streams } public class static ActorAttributes { + public static Akka.Streams.ActorAttributes.Dispatcher IODispatcher { get; } public static Akka.Streams.Attributes CreateDebugLogging(bool enabled) { } public static Akka.Streams.Attributes CreateDispatcher(string dispatcherName) { } public static Akka.Streams.Attributes CreateFuzzingMode(bool enabled) { } @@ -140,12 +141,14 @@ namespace Akka.Streams public readonly int SyncProcessingLimit; public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Akka.Streams.Supervision.Decider supervisionDecider, Akka.Streams.StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, Akka.Streams.Dsl.StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = 1000) { } public static Akka.Streams.ActorMaterializerSettings Create(Akka.Actor.ActorSystem system) { } + public override bool Equals(object obj) { } public Akka.Streams.ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { } public Akka.Streams.ActorMaterializerSettings WithDebugLogging(bool isEnabled) { } public Akka.Streams.ActorMaterializerSettings WithDispatcher(string dispatcher) { } public Akka.Streams.ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { } public Akka.Streams.ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { } public Akka.Streams.ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { } + public Akka.Streams.ActorMaterializerSettings WithOutputBurstLimit(int limit) { } public Akka.Streams.ActorMaterializerSettings WithStreamRefSettings(Akka.Streams.Dsl.StreamRefSettings settings) { } public Akka.Streams.ActorMaterializerSettings WithSubscriptionTimeoutSettings(Akka.Streams.StreamSubscriptionTimeoutSettings settings) { } public Akka.Streams.ActorMaterializerSettings WithSupervisionStrategy(Akka.Streams.Supervision.Decider decider) { } diff --git a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs index 091d7ef6469..1b2e96aae91 100644 --- a/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs @@ -45,7 +45,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements() settings.MinNrOfMembers.Should().Be(1); settings.MinNrOfMembersOfRole.Should().Equal(ImmutableDictionary.Empty); settings.Roles.Should().BeEquivalentTo(ImmutableHashSet.Empty); - settings.UseDispatcher.Should().Be(Dispatchers.DefaultDispatcherId); + settings.UseDispatcher.Should().Be(Dispatchers.InternalDispatcherId); settings.GossipDifferentViewProbability.Should().Be(0.8); settings.ReduceGossipDifferentViewProbability.Should().Be(400); diff --git a/src/core/Akka.Cluster/ClusterSettings.cs b/src/core/Akka.Cluster/ClusterSettings.cs index 87001228652..e829e508ff3 100644 --- a/src/core/Akka.Cluster/ClusterSettings.cs +++ b/src/core/Akka.Cluster/ClusterSettings.cs @@ -70,7 +70,7 @@ public ClusterSettings(Config config, string systemName) MinNrOfMembers = clusterConfig.GetInt("min-nr-of-members", 0); _useDispatcher = clusterConfig.GetString("use-dispatcher", null); - if (String.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.DefaultDispatcherId; + if (string.IsNullOrEmpty(_useDispatcher)) _useDispatcher = Dispatchers.InternalDispatcherId; GossipDifferentViewProbability = clusterConfig.GetDouble("gossip-different-view-probability", 0); ReduceGossipDifferentViewProbability = clusterConfig.GetInt("reduce-gossip-different-view-probability", 0); SchedulerTickDuration = clusterConfig.GetTimeSpan("scheduler.tick-duration", null); diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index 8f24cef4f6e..2c486d23f1b 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -128,8 +128,8 @@ akka { # Disable with "off". publish-stats-interval = off - # The id of the dispatcher to use for cluster actors. If not specified - # default dispatcher is used. + # The id of the dispatcher to use for cluster actors. + # If not specified, the internal dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj b/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj index 5a99724c6c7..bff83b1d9b2 100644 --- a/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj +++ b/src/core/Akka.MultiNodeTestRunner.Shared/Akka.MultiNodeTestRunner.Shared.csproj @@ -5,6 +5,9 @@ $(NetStandardLibVersion) false false + + false + diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs index 8e58384f08a..923a30e94c3 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceAsyncSourceSpec.cs @@ -293,7 +293,7 @@ public void A_UnfoldResourceAsyncSource_must_use_dedicated_blocking_io_dispatche var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSourceAsync")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 3a80cbee941..3600bec0fe1 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -211,7 +211,7 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 0a975dd3557..b0e2c4a986b 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Dispatch; using Akka.IO; using Akka.Streams.Dsl; using Akka.Streams.Implementation; @@ -217,20 +218,24 @@ public void SynchronousFileSink_should_use_dedicated_blocking_io_dispatcher_by_d { TargetFile(f => { - var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("FileSinkSpec-dispatcher-testing-1", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try { //hack for Iterator.continually - Source.FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), Int32.MaxValue).GetEnumerator()) + Source + .FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), int.MaxValue).GetEnumerator()) .RunWith(FileIO.ToFile(f), materializer); - ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); + ((ActorMaterializerImpl)materializer) + .Supervisor + .Tell(StreamSupervisor.GetChildren.Instance, TestActor); var refs = ExpectMsg().Refs; - //NOTE: Akka uses "fileSource" as name for DefaultAttributes.FileSink - I think it's mistake on the JVM implementation side var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("fileSink")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + + // haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { @@ -248,7 +253,7 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att { TargetFile(f => { - var sys = ActorSystem.Create("dispatcher_testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("FileSinkSpec-dispatcher-testing-2", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try diff --git a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs index b484e2037b1..029a7e53033 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs @@ -280,7 +280,7 @@ public void FileSource_should_use_dedicated_blocking_io_dispatcher_by_default() var actorRef = ExpectMsg().Refs.First(r => r.Path.ToString().Contains("fileSource")); try { - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs index 5f4fe22c7f1..2df6ac577e8 100644 --- a/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs @@ -302,7 +302,7 @@ public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_ { this.AssertAllStagesStopped(() => { - var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); + var sys = ActorSystem.Create("InputStreamSink-testing", Utils.UnboundedMailboxConfig); var materializer = ActorMaterializer.Create(sys); try { @@ -310,7 +310,7 @@ public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_ (materializer as ActorMaterializerImpl).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); var children = ExpectMsg().Refs; var actorRef = children.First(c => c.Path.ToString().Contains("inputStreamSink")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs index 0c251fe0118..19e14c16869 100644 --- a/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/OutputStreamSourceSpec.cs @@ -213,7 +213,7 @@ public void OutputStreamSource_must_use_dedicated_default_blocking_io_dispatcher TestActor); var actorRef = ExpectMsg() .Refs.First(c => c.Path.ToString().Contains("outputStreamSource")); - Utils.AssertDispatcher(actorRef, "akka.stream.default-blocking-io-dispatcher"); + Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); } finally { diff --git a/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs b/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs new file mode 100644 index 00000000000..5cb1ee4ddff --- /dev/null +++ b/src/core/Akka.Streams.Tests/StreamDispatcherSpec.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Dispatch; +using Akka.TestKit; +using Xunit; +using FluentAssertions; + +namespace Akka.Streams.Tests +{ + public class StreamDispatcherSpec : AkkaSpec + { + [Fact] + public void The_default_blocking_io_dispatcher_for_streams_must_be_the_same_as_the_default_blocking_io_dispatcher_for_actors() + { + var materializer = ActorMaterializer.Create(Sys); + + var streamIoDispatcher = Sys.Dispatchers.Lookup(ActorAttributes.IODispatcher.Name); + var actorIoDispatcher = Sys.Dispatchers.Lookup(Dispatchers.DefaultBlockingDispatcherId); + + streamIoDispatcher.Should().Be(actorIoDispatcher); + } + + [Fact] + public void The_deprecated_default_stream_io_dispatcher_must_be_the_same_as_the_default_blocking_io_dispatcher_for_actors() + { + var materializer = ActorMaterializer.Create(Sys); + + var streamIoDispatcher = Sys.Dispatchers.Lookup("akka.stream.default-blocking-io-dispatcher"); + var actorIoDispatcher = Sys.Dispatchers.Lookup(Dispatchers.DefaultBlockingDispatcherId); + + streamIoDispatcher.Should().Be(actorIoDispatcher); + } + } +} diff --git a/src/core/Akka.Streams/ActorMaterializer.cs b/src/core/Akka.Streams/ActorMaterializer.cs index fc8866314f8..aa0c8c11fd3 100644 --- a/src/core/Akka.Streams/ActorMaterializer.cs +++ b/src/core/Akka.Streams/ActorMaterializer.cs @@ -336,7 +336,7 @@ private static ActorMaterializerSettings Create(Config config) streamRefSettings: StreamRefSettings.Create(config.GetConfig("stream-ref"))); } - private const int DefaultlMaxFixedbufferSize = 1000; + private const int DefaultMaxFixedBufferSize = 1000; /// /// TBD /// @@ -401,9 +401,35 @@ private static ActorMaterializerSettings Create(Config config) /// TBD /// TBD /// TBD + /// TBD /// TBD - public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferSize, string dispatcher, Decider supervisionDecider, StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, StreamRefSettings streamRefSettings, bool isDebugLogging, int outputBurstLimit, bool isFuzzingMode, bool isAutoFusing, int maxFixedBufferSize, int syncProcessingLimit = DefaultlMaxFixedbufferSize) + public ActorMaterializerSettings( + int initialInputBufferSize, + int maxInputBufferSize, + string dispatcher, + Decider supervisionDecider, + StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings, + StreamRefSettings streamRefSettings, + bool isDebugLogging, + int outputBurstLimit, + bool isFuzzingMode, + bool isAutoFusing, + int maxFixedBufferSize, + int syncProcessingLimit = DefaultMaxFixedBufferSize) { + if(initialInputBufferSize <= 0) + throw new ArgumentException($"{nameof(initialInputBufferSize)} must be > 0", nameof(initialInputBufferSize)); + if(syncProcessingLimit <= 0) + throw new ArgumentException($"{nameof(syncProcessingLimit)} must be > 0", nameof(syncProcessingLimit)); + + if(maxInputBufferSize <= 0) + throw new ArgumentException($"{nameof(maxInputBufferSize)} must be > 0", nameof(maxInputBufferSize)); + if((maxInputBufferSize & (maxInputBufferSize - 1)) != 0) + throw new ArgumentException($"{nameof(maxInputBufferSize)} must be a power of two", nameof(maxInputBufferSize)); + + if(initialInputBufferSize > maxInputBufferSize) + throw new ArgumentException($"initialInputBufferSize({initialInputBufferSize}) must be <= maxInputBufferSize({maxInputBufferSize})"); + InitialInputBufferSize = initialInputBufferSize; MaxInputBufferSize = maxInputBufferSize; Dispatcher = dispatcher; @@ -418,55 +444,110 @@ public ActorMaterializerSettings(int initialInputBufferSize, int maxInputBufferS StreamRefSettings = streamRefSettings; } + private ActorMaterializerSettings Copy( + int? initialInputBufferSize = null, + int? maxInputBufferSize = null, + string dispatcher = null, + Decider supervisionDecider = null, + StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings = null, + StreamRefSettings streamRefSettings = null, + bool? isDebugLogging = null, + int? outputBurstLimit = null, + bool? isFuzzingMode = null, + bool? isAutoFusing = null, + int? maxFixedBufferSize = null, + int? syncProcessingLimit = null) + { + return new ActorMaterializerSettings( + initialInputBufferSize??InitialInputBufferSize, + maxInputBufferSize??MaxInputBufferSize, + dispatcher??Dispatcher, + supervisionDecider??SupervisionDecider, + subscriptionTimeoutSettings??SubscriptionTimeoutSettings, + streamRefSettings ?? StreamRefSettings, + isDebugLogging ?? IsDebugLogging, + outputBurstLimit??OutputBurstLimit, + isFuzzingMode??IsFuzzingMode, + isAutoFusing??IsAutoFusing, + maxFixedBufferSize??MaxFixedBufferSize, + syncProcessingLimit??SyncProcessingLimit); + } + /// - /// TBD + /// Each asynchronous piece of a materialized stream topology is executed by one Actor + /// that manages an input buffer for all inlets of its shape. This setting configures + /// the default for initial and maximal input buffer in number of elements for each inlet. + /// This can be overridden for individual parts of the + /// stream topology by using . /// /// TBD /// TBD /// TBD public ActorMaterializerSettings WithInputBuffer(int initialSize, int maxSize) { - return new ActorMaterializerSettings(initialSize, maxSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (initialSize == InitialInputBufferSize && maxSize == MaxInputBufferSize) + return this; + return Copy(initialInputBufferSize: initialSize, maxInputBufferSize: maxSize); } /// - /// TBD + /// This setting configures the default dispatcher to be used by streams materialized + /// with the . This can be overridden for individual parts of the + /// stream topology by using . /// /// TBD /// TBD public ActorMaterializerSettings WithDispatcher(string dispatcher) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (dispatcher == Dispatcher) return this; + return Copy(dispatcher: dispatcher); } /// - /// TBD + /// Decides how exceptions from application code are to be handled, unless + /// overridden for specific flows of the stream operations with + /// /// /// TBD /// TBD public ActorMaterializerSettings WithSupervisionStrategy(Decider decider) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, decider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (decider.Equals(SupervisionDecider)) return this; + return Copy(supervisionDecider: decider); } /// - /// TBD + /// Enable to log all elements that are dropped due to failures (at DEBUG level). /// /// TBD /// TBD public ActorMaterializerSettings WithDebugLogging(bool isEnabled) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, isEnabled, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsDebugLogging == isEnabled) return this; + return Copy(isDebugLogging: isEnabled); } /// - /// TBD + /// Test utility: fuzzing mode means that GraphStage events are not processed + /// in FIFO order within a fused subgraph, but randomized. /// /// TBD /// TBD public ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, isFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsFuzzingMode == isFuzzingMode) return this; + return Copy(isFuzzingMode: isFuzzingMode); + } + + /// + /// Maximum number of elements emitted in batch if downstream signals large demand. + /// + /// + /// + public ActorMaterializerSettings WithOutputBurstLimit(int limit) + { + if (limit == OutputBurstLimit) return this; + return Copy(outputBurstLimit: limit); } /// @@ -476,31 +557,37 @@ public ActorMaterializerSettings WithFuzzingMode(bool isFuzzingMode) /// TBD public ActorMaterializerSettings WithAutoFusing(bool isAutoFusing) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, isAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + if (IsAutoFusing == isAutoFusing) return this; + return Copy(isAutoFusing: isAutoFusing); } /// - /// TBD + /// Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated. + /// This defaults to a large value because it is usually better to fail early when + /// system memory is not sufficient to hold the buffer. /// /// TBD /// TBD public ActorMaterializerSettings WithMaxFixedBufferSize(int maxFixedBufferSize) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, maxFixedBufferSize, SyncProcessingLimit); + if (MaxFixedBufferSize == maxFixedBufferSize) return this; + return Copy(maxFixedBufferSize: maxFixedBufferSize); } /// - /// TBD + /// Limit for number of messages that can be processed synchronously in stream to substream communication /// /// TBD /// TBD public ActorMaterializerSettings WithSyncProcessingLimit(int limit) { - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, SubscriptionTimeoutSettings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, limit); + if (SyncProcessingLimit == limit) return this; + return Copy(syncProcessingLimit: limit); } /// - /// TBD + /// Leaked publishers and subscribers are cleaned up when they are not used within a given + /// deadline, configured by . /// /// TBD /// TBD @@ -508,17 +595,49 @@ public ActorMaterializerSettings WithSubscriptionTimeoutSettings(StreamSubscript { if (Equals(settings, SubscriptionTimeoutSettings)) return this; - - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, SupervisionDecider, settings, StreamRefSettings, IsDebugLogging, OutputBurstLimit, IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + return Copy(subscriptionTimeoutSettings: settings); } public ActorMaterializerSettings WithStreamRefSettings(StreamRefSettings settings) { if (settings == null) throw new ArgumentNullException(nameof(settings)); if (ReferenceEquals(settings, this.StreamRefSettings)) return this; - return new ActorMaterializerSettings(InitialInputBufferSize, MaxInputBufferSize, Dispatcher, - SupervisionDecider, SubscriptionTimeoutSettings, settings, IsDebugLogging, OutputBurstLimit, - IsFuzzingMode, IsAutoFusing, MaxFixedBufferSize, SyncProcessingLimit); + return Copy(streamRefSettings: settings); + } + + public override bool Equals(object obj) + { + if (!(obj is ActorMaterializerSettings s)) return false; + return + s.InitialInputBufferSize == InitialInputBufferSize && + s.MaxInputBufferSize == MaxInputBufferSize && + s.Dispatcher == Dispatcher && + s.SupervisionDecider == SupervisionDecider && + s.SubscriptionTimeoutSettings == SubscriptionTimeoutSettings && + s.IsDebugLogging == IsDebugLogging && + s.OutputBurstLimit == OutputBurstLimit && + s.SyncProcessingLimit == SyncProcessingLimit && + s.IsFuzzingMode == IsFuzzingMode && + s.IsAutoFusing == IsAutoFusing && + s.SubscriptionTimeoutSettings == SubscriptionTimeoutSettings && + s.StreamRefSettings == StreamRefSettings; + } + + internal Attributes ToAttributes() + { + return new Attributes(new Attributes.IAttribute[] + { + new Attributes.InputBuffer(InitialInputBufferSize, MaxInputBufferSize), + Attributes.CancellationStrategy.Default, + new ActorAttributes.Dispatcher(Dispatcher), + new ActorAttributes.SupervisionStrategy(SupervisionDecider), + new ActorAttributes.DebugLogging(IsDebugLogging), + new ActorAttributes.StreamSubscriptionTimeout(SubscriptionTimeoutSettings.Timeout, SubscriptionTimeoutSettings.Mode), + new ActorAttributes.OutputBurstLimit(OutputBurstLimit), + new ActorAttributes.FuzzingMode(IsFuzzingMode), + new ActorAttributes.MaxFixedBufferSize(MaxFixedBufferSize), + new ActorAttributes.SyncProcessingLimit(SyncProcessingLimit), + }); } } diff --git a/src/core/Akka.Streams/Attributes.cs b/src/core/Akka.Streams/Attributes.cs index d4b79abf32d..da57c7ab2c6 100644 --- a/src/core/Akka.Streams/Attributes.cs +++ b/src/core/Akka.Streams/Attributes.cs @@ -228,7 +228,7 @@ private AsyncBoundary() { } /// public sealed class CancellationStrategy:IMandatoryAttribute { - internal CancellationStrategy Default { get; } = new CancellationStrategy(new PropagateFailure()); + internal static CancellationStrategy Default { get; } = new CancellationStrategy(new PropagateFailure()); public IStrategy Strategy { get; } @@ -596,6 +596,8 @@ public SupervisionStrategy(Decider decider) public override string ToString() => "SupervisionStrategy"; } + public static Dispatcher IODispatcher { get; } = new Dispatcher("akka.stream.materializer.blocking-io-dispatcher"); + /// /// Enables additional low level troubleshooting logging at DEBUG log level /// diff --git a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs index b9cd2830aaa..b4779e7ef22 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSinks.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSinks.cs @@ -87,7 +87,12 @@ public override object Create(MaterializationContext context, out Task var props = FileSubscriber.Props(_f, ioResultPromise, settings.MaxInputBufferSize, _startPosition, _fileMode); var dispatcher = context.EffectiveAttributes.GetAttribute(DefaultAttributes.IODispatcher.AttributeList.First()) as ActorAttributes.Dispatcher; - var actorRef = mat.ActorOf(context, props.WithDispatcher(dispatcher.Name)); + var actorRef = mat.ActorOf( + context, + props.WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name)); materializer = ioResultPromise.Task; return new ActorSubscriberImpl(actorRef); } @@ -151,7 +156,16 @@ public override object Create(MaterializationContext context, out Task var ioResultPromise = new TaskCompletionSource(); var os = _createOutput(); - var props = OutputStreamSubscriber.Props(os, ioResultPromise, settings.MaxInputBufferSize, _autoFlush); + var maxInputBufferSize = context + .EffectiveAttributes + .GetMandatoryAttribute() + .Max; + var props = OutputStreamSubscriber + .Props(os, ioResultPromise, maxInputBufferSize, _autoFlush) + .WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name); var actorRef = mat.ActorOf(context, props); materializer = ioResultPromise.Task; diff --git a/src/core/Akka.Streams/Implementation/IO/IOSources.cs b/src/core/Akka.Streams/Implementation/IO/IOSources.cs index 3e7445ffdb6..c8a22407372 100644 --- a/src/core/Akka.Streams/Implementation/IO/IOSources.cs +++ b/src/core/Akka.Streams/Implementation/IO/IOSources.cs @@ -160,7 +160,12 @@ public override IPublisher Create(MaterializationContext context, ou { // can throw, i.e. FileNotFound var inputStream = _createInputStream(); - var props = InputStreamPublisher.Props(inputStream, ioResultPromise, _chunkSize); + var props = InputStreamPublisher + .Props(inputStream, ioResultPromise, _chunkSize) + .WithDispatcher(context + .EffectiveAttributes + .GetMandatoryAttribute() + .Name); var actorRef = materializer.ActorOf(context, props); pub = new ActorPublisherImpl(actorRef); } diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index d58e3497e0a..1e8a632931c 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -7,6 +7,7 @@ using System; using System.Threading.Tasks; +using Akka.Dispatch; using Akka.Streams.Stage; using Akka.Streams.Supervision; @@ -20,7 +21,7 @@ public static class DefaultAttributes /// /// TBD /// - public static readonly Attributes IODispatcher = ActorAttributes.CreateDispatcher("akka.stream.default-blocking-io-dispatcher"); + public static readonly Attributes IODispatcher = ActorAttributes.CreateDispatcher(ActorAttributes.IODispatcher.Name); /// /// TBD diff --git a/src/core/Akka.Streams/reference.conf b/src/core/Akka.Streams/reference.conf index 75332c14097..7c69c485a3d 100644 --- a/src/core/Akka.Streams/reference.conf +++ b/src/core/Akka.Streams/reference.conf @@ -22,6 +22,8 @@ akka { # Note: If you change this value also change the fallback value in ActorMaterializerSettings dispatcher = "" + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + # Cleanup leaked publishers and subscribers when they are not used within a given # deadline subscription-timeout { @@ -118,22 +120,12 @@ akka { } } - # Fully qualified config path which holds the dispatcher configuration - # to be used by FlowMaterialiser when creating Actors for IO operations, - # such as FileSource, FileSink and others. - blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - - default-blocking-io-dispatcher { - type = "Dispatcher" - executor = "thread-pool-executor" - throughput = 1 + # Deprecated, left here to not break Akka HTTP which refers to it + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" - thread-pool-executor { - core-pool-size-min = 2 - core-pool-size-factor = 2.0 - core-pool-size-max = 16 - } - } + # Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher' + # instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute + default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" } # configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections) diff --git a/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs new file mode 100644 index 00000000000..a4fc4de9d3e --- /dev/null +++ b/src/core/Akka.Tests/Actor/ActorSystemDispatcherSpec.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.Actor.Setup; +using Akka.Configuration; +using Akka.Dispatch; +using Akka.TestKit; +using Akka.Util; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Actor +{ + public class ActorSystemDispatcherSpec : AkkaSpec + { + private static Config Config => ConfigurationFactory.ParseString(@" + dispatcher-loop-1 = dispatcher-loop-2 + dispatcher-loop-2 = dispatcher-loop-1 +"); + + public ActorSystemDispatcherSpec(ITestOutputHelper output):base(output, Config) + { } + + [Fact] + public void The_ActorSystem_must_not_use_passed_in_SynchronizationContext_if_executor_is_configured_in() + { + var config = + ConfigurationFactory.ParseString("akka.actor.default-dispatcher.executor = fork-join-executor") + .WithFallback(Sys.Settings.Config); + var system2 = ActorSystem.Create("ActorSystemDispatchersSpec-ec-configured", config); + + try + { + var actor = system2.ActorOf(); + var probe = CreateTestProbe(system2); + + actor.Tell("ping", probe); + + probe.ExpectMsg("ping", TimeSpan.FromSeconds(1)); + } + finally + { + Shutdown(system2); + } + } + + [Fact] + public void The_ActorSystem_must_provide_a_single_place_to_override_the_internal_dispatcher() + { + var config = + ConfigurationFactory.ParseString("akka.actor.internal-dispatcher = akka.actor.default-dispatcher") + .WithFallback(Sys.Settings.Config); + var sys = ActorSystem.Create("ActorSystemDispatchersSpec-override-internal-disp", config); + try + { + // that the user guardian runs on the overriden dispatcher instead of internal + // isn't really a guarantee any internal actor has been made running on the right one + // but it's better than no test coverage at all + UserGuardianDispatcher(sys).Should().Be("akka.actor.default-dispatcher"); + } + finally + { + Shutdown(sys); + } + } + + [Fact] + public void The_ActorSystem_must_provide_a_good_error_on_a_dispatcher_alias_loop_in_config() + { + Sys.Dispatchers.Invoking(d => d.Lookup("dispatcher-loop-1")) + .ShouldThrow() + .And.Message + .StartsWith("Could not find a concrete dispatcher config after following").ShouldBeTrue(); + } + + private string UserGuardianDispatcher(ActorSystem system) + { + var impl = (ActorSystemImpl)system; + return ((ActorCell)((ActorRefWithCell)impl.Guardian).Underlying).Dispatcher.Id; + } + + private class PingPongActor : UntypedActor + { + protected override void OnReceive(object message) + { + if((string)message == "ping") + Sender.Tell("pong"); + } + } + + private class EchoActor : UntypedActor + { + protected override void OnReceive(object message) + { + Sender.Tell(message); + } + } + } +} + diff --git a/src/core/Akka/Actor/ActorRefProvider.cs b/src/core/Akka/Actor/ActorRefProvider.cs index 8b877d91c84..4e51f2991ee 100644 --- a/src/core/Akka/Actor/ActorRefProvider.cs +++ b/src/core/Akka/Actor/ActorRefProvider.cs @@ -256,7 +256,7 @@ public LocalActorRefProvider(string systemName, Settings settings, EventStream e /// public EventStream EventStream { get { return _eventStream; } } - private MessageDispatcher DefaultDispatcher { get { return _system.Dispatchers.DefaultGlobalDispatcher; } } + private MessageDispatcher InternalDispatcher => _system.Dispatchers.InternalDispatcher; private SupervisorStrategy UserGuardianSupervisorStrategy { get { return _userGuardianStrategyConfigurator.Create(); } } @@ -296,7 +296,7 @@ private RootGuardianActorRef CreateRootGuardian(ActorSystemImpl system) return Directive.Stop; }); var props = Props.Create(rootGuardianStrategy); - var rootGuardian = new RootGuardianActorRef(system, props, DefaultDispatcher, _defaultMailbox, supervisor, _rootPath, _deadLetters, _extraNames); + var rootGuardian = new RootGuardianActorRef(system, props, InternalDispatcher, _defaultMailbox, supervisor, _rootPath, _deadLetters, _extraNames); return rootGuardian; } @@ -314,9 +314,30 @@ private LocalActorRef CreateUserGuardian(LocalActorRef rootGuardian, string name { var cell = rootGuardian.Cell; cell.ReserveChild(name); - var props = Props.Create(UserGuardianSupervisorStrategy); + // make user provided guardians not run on internal dispatcher + MessageDispatcher dispatcher; + if (_system.GuardianProps.IsEmpty) + { + dispatcher = InternalDispatcher; + } + else + { + var props = _system.GuardianProps.Value; + var dispatcherId = + props.Deploy.Dispatcher == Deploy.DispatcherSameAsParent + ? Dispatchers.DefaultDispatcherId + : props.Dispatcher; + dispatcher = _system.Dispatchers.Lookup(dispatcherId); + } + + var userGuardian = new LocalActorRef( + _system, + _system.GuardianProps.GetOrElse(Props.Create(UserGuardianSupervisorStrategy)), + dispatcher, + _defaultMailbox, + rootGuardian, + RootPath / name); - var userGuardian = new LocalActorRef(_system, props, DefaultDispatcher, _defaultMailbox, rootGuardian, RootPath / name); cell.InitChild(userGuardian); userGuardian.Start(); return userGuardian; @@ -328,7 +349,7 @@ private LocalActorRef CreateSystemGuardian(LocalActorRef rootGuardian, string na cell.ReserveChild(name); var props = Props.Create(() => new SystemGuardianActor(userGuardian), _systemGuardianStrategy); - var systemGuardian = new LocalActorRef(_system, props, DefaultDispatcher, _defaultMailbox, rootGuardian, RootPath / name); + var systemGuardian = new LocalActorRef(_system, props, InternalDispatcher, _defaultMailbox, rootGuardian, RootPath / name); cell.InitChild(systemGuardian); systemGuardian.Start(); return systemGuardian; diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index 8d8658ad3d3..a4be985f742 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor.Internal; using Akka.Actor.Setup; @@ -96,11 +97,16 @@ internal static ProviderSelection GetProvider(string providerClass) /// public sealed class BootstrapSetup : Setup.Setup { - internal BootstrapSetup() : this(Option.None, Option.None) + internal BootstrapSetup() + : this( + Option.None, + Option.None) { } - internal BootstrapSetup(Option config, Option actorRefProvider) + internal BootstrapSetup( + Option config, + Option actorRefProvider) { Config = config; ActorRefProvider = actorRefProvider; @@ -264,7 +270,7 @@ public static ActorSystem Create(string name) private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup) { - var system = new ActorSystemImpl(name, withFallback, setup); + var system = new ActorSystemImpl(name, withFallback, setup, Option.None); system.Start(); return system; } diff --git a/src/core/Akka/Actor/Deploy.cs b/src/core/Akka/Actor/Deploy.cs index 9bba09db75f..becdf5e35df 100644 --- a/src/core/Akka/Actor/Deploy.cs +++ b/src/core/Akka/Actor/Deploy.cs @@ -29,6 +29,9 @@ public class Deploy : IEquatable, ISurrogated /// This deployment does not have a mailbox associated with it. /// public static readonly string NoMailboxGiven = string.Empty; + + internal const string DispatcherSameAsParent = ".."; + /// /// This deployment has an unspecified scope associated with it. /// diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index ca91ab05ce4..c8c2418086e 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -54,7 +54,11 @@ public class ActorSystemImpl : ExtendedActorSystem, ISupportSerializationConfigR /// /// The name given to the actor system. public ActorSystemImpl(string name) - : this(name, ConfigurationFactory.Default(), ActorSystemSetup.Empty) + : this( + name, + ConfigurationFactory.Default(), + ActorSystemSetup.Empty, + Option.None) { } @@ -69,7 +73,11 @@ public ActorSystemImpl(string name) /// Note that the name must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-'). /// /// This exception is thrown if the given is undefined. - public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) + public ActorSystemImpl( + string name, + Config config, + ActorSystemSetup setup, + Option? guardianProps = null) { if(!Regex.Match(name, "^[a-zA-Z0-9][a-zA-Z0-9-]*$").Success) throw new ArgumentException( @@ -79,7 +87,10 @@ public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) if(config is null) throw new ArgumentNullException(nameof(config), $"Cannot create {typeof(ActorSystemImpl)}: Configuration must not be null."); - _name = name; + _name = name; + + GuardianProps = guardianProps ?? Option.None; + ConfigureSettings(config, setup); ConfigureEventStream(); ConfigureLoggers(); @@ -134,6 +145,8 @@ public ActorSystemImpl(string name, Config config, ActorSystemSetup setup) /// public override IInternalActorRef SystemGuardian { get { return _provider.SystemGuardian; } } + public Option GuardianProps { get; } + /// /// Creates a new system actor that lives under the "/system" guardian. /// @@ -246,7 +259,9 @@ private void WarnIfJsonIsDefaultSerializer() /// public override IActorRef ActorOf(Props props, string name = null) { - return _provider.Guardian.Cell.AttachChild(props, false, name); + if(GuardianProps.IsEmpty) + return _provider.Guardian.Cell.AttachChild(props, false, name); + throw new InvalidOperationException($"cannot create top-level actor { (string.IsNullOrEmpty(name) ? "" : $"[{name} ]")}from the outside on ActorSystem with custom user guardian"); } /// @@ -396,6 +411,7 @@ public override bool HasExtension() private void ConfigureSettings(Config config, ActorSystemSetup setup) { + // TODO: on this line, in scala, the config is validated with `Dispatchers.InternalDispatcherId` path removed. _settings = new Settings(this, config, setup); } @@ -448,7 +464,14 @@ private void ConfigureLoggers() private void ConfigureDispatchers() { - _dispatchers = new Dispatchers(this, new DefaultDispatcherPrerequisites(EventStream, Scheduler, Settings, Mailboxes)); + _dispatchers = new Dispatchers( + this, + new DefaultDispatcherPrerequisites( + EventStream, + Scheduler, + Settings, + Mailboxes), + _log); } private void ConfigureActorProducerPipeline() diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index ec802355412..6237292e601 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -305,7 +305,6 @@ akka { # This will be used if you have set "executor = "default-executor"". # Uses the default .NET threadpool default-executor { - } # Same as default executor @@ -315,11 +314,20 @@ akka { # This will be used if you have set "executor = "fork-join-executor"" # Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool fork-join-executor { - dedicated-thread-pool{ #settings for Helios.DedicatedThreadPool - thread-count = 3 #number of threads - #deadlock-timeout = 3s #optional timeout for deadlock detection - threadtype = background #values can be "background" or "foreground" - } + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 8 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 1.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + + # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack + # like peeking mode which "pop". + task-peeking-mode = "FIFO" } # For running in current synchronization contexts @@ -345,7 +353,35 @@ akka { # be a subtype of this type. The empty string signifies no requirement. mailbox-requirement = "" } - + + # Default separate internal dispatcher to run Akka internal tasks and actors on + # protecting them against starvation because of accidental blocking in user actors (which run on the + # default dispatcher) + internal-dispatcher { + type = "Dispatcher" + executor = "fork-join-executor" + throughput = 5 + + fork-join-executor { + parallelism-min = 4 + parallelism-factor = 1.0 + parallelism-max = 64 + } + } + + default-blocking-io-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + throughput = 1 + + # Akka.NET does not have a fine grained control over thread pool executor + # thread-pool-executor { + # core-pool-size-min = 2 + # core-pool-size-factor = 2.0 + # core-pool-size-max = 16 + # } + } + default-mailbox { # FQCN of the MailboxType. The Class of the FQCN must have a public # constructor with @@ -626,15 +662,15 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # on which file IO tasks are scheduled - file-io-dispatcher = "akka.actor.default-dispatcher" + file-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" # The maximum number of bytes (or "unlimited") to transfer in one batch # when using `WriteFile` command which uses `FileChannel.transferTo` to @@ -753,11 +789,11 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" } udp-connected { @@ -848,18 +884,18 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors - worker-dispatcher = "akka.actor.default-dispatcher" + worker-dispatcher = "akka.actor.internal-dispatcher" # Fully qualified config path which holds the dispatcher configuration # for the selector management actors - management-dispatcher = "akka.actor.default-dispatcher" + management-dispatcher = "akka.actor.internal-dispatcher" } dns { # Fully qualified config path which holds the dispatcher configuration # for the manager and resolver router actors. # For actual router configuration see akka.actor.deployment./IO-DNS/* - dispatcher = "akka.actor.default-dispatcher" + dispatcher = "akka.actor.internal-dispatcher" # Name of the subconfig at path akka.io.dns, see inet-address below resolver = "inet-address" diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index 0b52c829763..fe0214fead4 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -58,7 +58,11 @@ public sealed class DefaultDispatcherPrerequisites : IDispatcherPrerequisites /// TBD /// TBD /// TBD - public DefaultDispatcherPrerequisites(EventStream eventStream, IScheduler scheduler, Settings settings, Mailboxes mailboxes) + public DefaultDispatcherPrerequisites( + EventStream eventStream, + IScheduler scheduler, + Settings settings, + Mailboxes mailboxes) { Mailboxes = mailboxes; Settings = settings; diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 1b630aa3031..ee924e08ec2 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; +using Akka.Event; using Helios.Concurrency; using ConfigurationFactory = Akka.Configuration.ConfigurationFactory; @@ -198,6 +199,18 @@ public override void Shutdown() /// /// The registry of all instances available to this . + /// + /// Dispatchers are to be defined in configuration to allow for tuning + /// for different environments. Use the method to create + /// a dispatcher as specified in configuration. + /// + /// A dispatcher config can also be an alias, in that case it is a config string value pointing + /// to the actual dispatcher config. + /// + /// Look in `akka.actor.default-dispatcher` section of the reference.conf + /// for documentation of dispatcher options. + /// + /// Not for user instantiation or extension /// public sealed class Dispatchers { @@ -206,6 +219,20 @@ public sealed class Dispatchers /// public static readonly string DefaultDispatcherId = "akka.actor.default-dispatcher"; + /// + /// The id of a default dispatcher to use for operations known to be blocking. Note that + /// for optimal performance you will want to isolate different blocking resources + /// on different thread pools. + /// + public static readonly string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + + /// + /// INTERNAL API + /// + internal static readonly string InternalDispatcherId = "akka.actor.internal-dispatcher"; + + private const int MaxDispatcherAliasDepth = 20; + /// /// The identifier for synchronized dispatchers. /// @@ -214,6 +241,7 @@ public sealed class Dispatchers private readonly ActorSystem _system; private Config _cachingConfig; private readonly MessageDispatcher _defaultGlobalDispatcher; + private readonly ILoggingAdapter _logger; /// /// The list of all configurators used to create instances. @@ -225,12 +253,15 @@ public sealed class Dispatchers /// Initializes a new instance of the class. /// The system. /// The prerequisites required for some instances. - public Dispatchers(ActorSystem system, IDispatcherPrerequisites prerequisites) + public Dispatchers(ActorSystem system, IDispatcherPrerequisites prerequisites, ILoggingAdapter logger) { _system = system; Prerequisites = prerequisites; _cachingConfig = new CachingConfig(prerequisites.Settings.Config); _defaultGlobalDispatcher = Lookup(DefaultDispatcherId); + _logger = logger; + + InternalDispatcher = Lookup(InternalDispatcherId); } /// Gets the one and only default dispatcher. @@ -239,6 +270,8 @@ public MessageDispatcher DefaultGlobalDispatcher get { return _defaultGlobalDispatcher; } } + internal MessageDispatcher InternalDispatcher { get; } + /// /// The for the default dispatcher. /// @@ -270,8 +303,13 @@ internal void ReloadPrerequisites(IDispatcherPrerequisites prerequisites) public IDispatcherPrerequisites Prerequisites { get; private set; } /// - /// Returns a dispatcher as specified in configuration. Please note that this method _MAY_ - /// create and return a new dispatcher on _EVERY_ call. + /// Returns a dispatcher as specified in configuration. Please note that this + /// method _may_ create and return a NEW dispatcher, _every_ call (depending on the `MessageDispatcherConfigurator` + /// dispatcher config the id points to). + /// + /// A dispatcher id can also be an alias. In the case it is a string value in the config it is treated as the id + /// of the actual dispatcher config to use. If several ids leading to the same actual dispatcher config is used only one + /// instance is created. This means that for dispatchers you expect to be shared they will be. /// /// TBD /// @@ -297,21 +335,40 @@ public bool HasDispatcher(string id) private MessageDispatcherConfigurator LookupConfigurator(string id) { - if (!_dispatcherConfigurators.TryGetValue(id, out var configurator)) + var depth = 0; + while(depth < MaxDispatcherAliasDepth) { + if (_dispatcherConfigurators.TryGetValue(id, out var configurator)) + return configurator; + // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. - MessageDispatcherConfigurator newConfigurator; if (_cachingConfig.HasPath(id)) - newConfigurator = ConfiguratorFrom(Config(id)); - else - throw new ConfigurationException($"Dispatcher {id} not configured."); + { + var valueAtPath = _cachingConfig.GetValue(id); + if (valueAtPath.IsString()) + { + // a dispatcher key can be an alias of another dispatcher, if it is a string + // we treat that string value as the id of a dispatcher to lookup, it will be stored + // both under the actual id and the alias id in the 'dispatcherConfigurators' cache + var actualId = valueAtPath.GetString(); + _logger.Debug($"Dispatcher id [{id}] is an alias, actual dispatcher will be [{actualId}]"); + id = actualId; + depth++; + continue; + } - return _dispatcherConfigurators.TryAdd(id, newConfigurator) ? newConfigurator : _dispatcherConfigurators[id]; + if (valueAtPath.IsObject()) + { + var newConfigurator = ConfiguratorFrom(Config(id)); + return _dispatcherConfigurators.TryAdd(id, newConfigurator) ? newConfigurator : _dispatcherConfigurators[id]; + } + throw new ConfigurationException($"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); + } + throw new ConfigurationException($"Dispatcher {id} not configured."); } - - return configurator; + throw new ConfigurationException($"Could not find a concrete dispatcher config after following {MaxDispatcherAliasDepth} deep. Is there a circular reference in your config? Last followed Id was [{id}]"); } /// diff --git a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs index 366ca54ecc4..5abfea33545 100644 --- a/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs +++ b/src/core/Akka/Helios.Concurrency.DedicatedThreadPool.cs @@ -43,7 +43,11 @@ public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? : this(numThreads, DefaultThreadType, name, deadlockTimeout) { } - public DedicatedThreadPoolSettings(int numThreads, ThreadType threadType, string name = null, TimeSpan? deadlockTimeout = null) + public DedicatedThreadPoolSettings( + int numThreads, + ThreadType threadType, + string name = null, + TimeSpan? deadlockTimeout = null) { Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid()); ThreadType = threadType; diff --git a/src/core/Akka/Properties/AssemblyInfo.cs b/src/core/Akka/Properties/AssemblyInfo.cs index 9ae4b1cc883..1cad13e7549 100644 --- a/src/core/Akka/Properties/AssemblyInfo.cs +++ b/src/core/Akka/Properties/AssemblyInfo.cs @@ -31,6 +31,8 @@ [assembly: InternalsVisibleTo("Akka.Remote.Tests.MultiNode")] [assembly: InternalsVisibleTo("Akka.Remote.TestKit.Tests")] [assembly: InternalsVisibleTo("Akka.Cluster")] +[assembly: InternalsVisibleTo("Akka.Cluster.Sharding")] +[assembly: InternalsVisibleTo("Akka.Cluster.Tools")] [assembly: InternalsVisibleTo("Akka.Cluster.Tests")] [assembly: InternalsVisibleTo("Akka.MultiNodeTestRunner.Shared.Tests")] [assembly: InternalsVisibleTo("Akka.Cluster.Tests.MultiNode")] diff --git a/src/core/Akka/Util/Option.cs b/src/core/Akka/Util/Option.cs index bf16d1008fb..b412f376953 100644 --- a/src/core/Akka/Util/Option.cs +++ b/src/core/Akka/Util/Option.cs @@ -38,6 +38,8 @@ public Option(T value) /// public bool HasValue { get; } + public bool IsEmpty => !HasValue; + /// /// TBD ///