Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Full cluster shutdown (akka/akka#29838) #7293

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ public override string ToString()
private readonly MessageBufferMap<EntityId> _messageBuffers = new();

private IActorRef? _handOffStopper;
private bool _preparingForShutdown = false;
private readonly ICancelable? _passivateIdleTask;
private readonly Lease? _lease;
private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used
Expand Down Expand Up @@ -1030,6 +1031,14 @@ protected override bool Receive(object message)

protected override void PreStart()
{
Cluster.Get(system: Context.System).Subscribe(
subscriber: Self,
initialStateMode: ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
to: new []
{
typeof(ClusterEvent.MemberPreparingForShutdown),
typeof(ClusterEvent.MemberReadyForShutdown)
});
AcquireLeaseIfNeeded();
}

Expand Down Expand Up @@ -1110,6 +1119,10 @@ private bool AwaitingLease(object message)
case LeaseLost ll:
ReceiveLeaseLost(ll);
return true;

case ClusterEvent.IMemberEvent evt:
ReceiveMemberEvent(evt);
return true;
}

if (_verboseDebug)
Expand All @@ -1121,6 +1134,18 @@ private bool AwaitingLease(object message)
return true;
}

private void ReceiveMemberEvent(ClusterEvent.IMemberEvent evt)
{
if (evt is ClusterEvent.MemberReadyForShutdown or ClusterEvent.MemberPreparingForShutdown)
{
if (!_preparingForShutdown)
{
Log.Info("{0}: Preparing for shutdown", _typeName);
_preparingForShutdown = true;
}
}
}

private void TryGetLease(Lease lease)
{
Log.Info("{0}: Acquiring lease {1}", _typeName, lease.Settings);
Expand Down Expand Up @@ -1164,6 +1189,9 @@ private bool AwaitingRememberedEntities(object message)
case RememberEntityTimeout _:
LoadingEntityIdsFailed();
return true;
case ClusterEvent.IMemberEvent me:
ReceiveMemberEvent(me);
return true;
}

if (_verboseDebug)
Expand Down Expand Up @@ -1218,6 +1246,9 @@ private bool Idle(object message)
case Terminated t:
ReceiveTerminated(t.ActorRef);
return true;
case ClusterEvent.IMemberEvent me:
ReceiveMemberEvent(me);
return true;
case EntityTerminated t:
ReceiveEntityTerminated(t.Ref);
return true;
Expand Down Expand Up @@ -1328,6 +1359,9 @@ bool WaitingForRememberEntitiesStore(object message)
case ShardRegion.StartEntity se:
StartEntity(se.EntityId, Sender);
return true;
case ClusterEvent.IMemberEvent me:
ReceiveMemberEvent(me);
return true;
case Terminated t:
ReceiveTerminated(t.ActorRef);
return true;
Expand Down Expand Up @@ -1593,7 +1627,16 @@ private void HandOff(IActorRef replyTo)

// does conversion so only do once
var activeEntities = _entities.ActiveEntities;
if (activeEntities.Count > 0)
if (_preparingForShutdown)
{
Log.Info("{0}: HandOff shard [{1}] while preparing for shutdown. Stopping right away.", _typeName, _shardId);
foreach (var entity in activeEntities)
{
entity.Tell(_handOffStopMessage);
replyTo.Tell(new ShardStopped(_shardId));
Context.Stop(Self);
}
} else if (activeEntities.Count > 0 && !_preparingForShutdown)
{
var entityHandOffTimeout =
(_settings.TuningParameters.HandOffTimeout - TimeSpan.FromSeconds(5)).Max(
Expand Down
22 changes: 20 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ private void Done(bool ok)
internal ILoggingAdapter Log { get; }
internal bool VerboseDebug { get; }
internal CoordinatorState State { get; set; }

private bool _preparingForShutdown = false;

public ShardCoordinator(
string typeName,
Expand Down Expand Up @@ -1668,7 +1668,15 @@ Action unstashOneGetShardHomeRequest

_rebalanceTask = context.System.Scheduler.ScheduleTellRepeatedlyCancelable(Settings.TuningParameters.RebalanceInterval, Settings.TuningParameters.RebalanceInterval, context.Self, RebalanceTick.Instance, ActorRefs.NoSender);

_cluster.Subscribe(context.Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, new[] { typeof(ClusterEvent.ClusterShuttingDown) });
_cluster.Subscribe(
context.Self,
ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
new[]
{
typeof(ClusterEvent.ClusterShuttingDown),
typeof(ClusterEvent.MemberReadyForShutdown),
typeof(ClusterEvent.MemberPreparingForShutdown)
});
}


Expand Down Expand Up @@ -1916,6 +1924,16 @@ internal bool Active(object message)
_context.Become(ShuttingDown);
return true;

case ClusterEvent.MemberPreparingForShutdown:
case ClusterEvent.MemberReadyForShutdown:
if (!_preparingForShutdown)
{
_preparingForShutdown = true;
Log.Info("{0}: Shard coordinator detected prepare for full cluster shutdown. No new rebalances will take place.", TypeName);
_rebalanceTask.Cancel();
}
return true;

case GetCurrentRegions _:
var reply = new CurrentRegions(State.Regions.Keys.Select(r =>
string.IsNullOrEmpty(r.Path.Address.Host) ? _cluster.SelfAddress : r.Path.Address).ToImmutableHashSet());
Expand Down
26 changes: 25 additions & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ internal static Props ProxyProps(
private bool _loggedFullBufferWarning;
private const int RetryCountThreshold = 5;
private bool _gracefulShutdownInProgress;
private bool _preparingForShutdown = false;

private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System);
private readonly TaskCompletionSource<Done> _gracefulShutdownProgress = new();
Expand Down Expand Up @@ -868,6 +869,15 @@ private void HandleShardRegionCommand(IShardRegionCommand command)
break;

case GracefulShutdown _:
if (_preparingForShutdown)
{
_log.Debug("{0}: Skipping graceful shutdown of region and all its shards as cluster is preparing for shutdown",
_typeName);
_gracefulShutdownProgress.TrySetResult(Done.Instance);
Context.Stop(Self);
return;
}

_log.Debug("{0}: Starting graceful shutdown of region and all its shards", _typeName);

var coordShutdown = CoordinatedShutdown.Get(Context.System);
Expand Down Expand Up @@ -1162,6 +1172,12 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa
break;
case ShardCoordinator.BeginHandOff bho:
{
if (_preparingForShutdown)
{
_log.Debug("{0}: Ignoring begin handoff as preparing to shutdown", _typeName);
break;
}

var shard = bho.Shard;
_log.Debug("{0}: BeginHandOff shard [{1}]", _typeName, shard);
if (_regionByShard.TryGetValue(shard, out var regionRef))
Expand All @@ -1179,8 +1195,8 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa
}

Sender.Tell(new ShardCoordinator.BeginHandOffAck(shard));
}
break;
}
case ShardCoordinator.HandOff ho:
{
var shard = ho.Shard;
Expand Down Expand Up @@ -1354,6 +1370,14 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e)
Context.Stop(Self);
}

break;
case ClusterEvent.MemberReadyForShutdown:
case ClusterEvent.MemberPreparingForShutdown:
if (!_preparingForShutdown)
{
_preparingForShutdown = true;
_log.Info("{0}: Preparing for shutdown", _typeName);
}
break;
case ClusterEvent.IMemberEvent _:
// these are expected, no need to warn about them
Expand Down
Loading
Loading