Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Dec 5, 2023
1 parent 8f923fe commit 2237758
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 32 deletions.
11 changes: 5 additions & 6 deletions src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ namespace Orleans.Metadata
[Serializable, GenerateSerializer, Immutable]
public sealed class ClusterManifest
{
[NonSerialized]
private ImmutableArray<GrainManifest>? _allGrainManifests;

/// <summary>
/// Initializes a new instance of the <see cref="ClusterManifest"/> class.
/// </summary>
Expand All @@ -26,8 +23,9 @@ public ClusterManifest(
MajorMinorVersion version,
ImmutableDictionary<SiloAddress, GrainManifest> silos)
{
this.Version = version;
this.Silos = silos;
Version = version;
Silos = silos;
AllGrainManifests = silos.Values.ToImmutableArray();
}

/// <summary>
Expand All @@ -45,6 +43,7 @@ public ClusterManifest(
/// <summary>
/// Gets all grain manifests.
/// </summary>
public ImmutableArray<GrainManifest> AllGrainManifests => _allGrainManifests ??= Silos.Values.ToImmutableArray();
[Id(2)]
public ImmutableArray<GrainManifest> AllGrainManifests { get; }
}
}
33 changes: 24 additions & 9 deletions src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -108,7 +109,7 @@ private async Task RunAsync()

try
{
var refreshTask = provider.GetClusterManifestIfNewer(gatewayVersion).AsTask();
var refreshTask = GetClusterManifestUpdate(provider, gatewayVersion);
var task = await Task.WhenAny(cancellationTask, refreshTask).ConfigureAwait(false);

if (ReferenceEquals(task, cancellationTask))
Expand All @@ -117,17 +118,15 @@ private async Task RunAsync()
}

var updateResult = await refreshTask;
var gatewayManifest = updateResult.Manifest;
if (gatewayManifest is null)
updateIncludesAllActiveServers = updateResult.IncludesAllActiveServers;
if (updateResult is null)
{
// There was no newer cluster manifest, so wait for the next refresh interval and try again.
await Task.WhenAny(cancellationTask, Task.Delay(_typeManagementOptions.TypeMapRefreshInterval));
continue;
}

// Do not receive further updates from this gateway until the manifest version has increased.
gatewayVersion = gatewayManifest.Version;
updateIncludesAllActiveServers = updateResult.IncludesAllActiveServers;
gatewayVersion = updateResult.Version;

// If the manifest does not contain all active servers, merge with the existing manifest until it does.
// This prevents reversed progress at the expense of including potentially defunct silos.
Expand All @@ -137,7 +136,7 @@ private async Task RunAsync()
// Merge manifests until the manifest contains all active servers.
var mergedSilos = _current.Silos.ToBuilder();
mergedSilos.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
foreach (var kvp in gatewayManifest.Silos)
foreach (var kvp in updateResult.SiloManifests)
{
mergedSilos[kvp.Key] = kvp.Value;
}
Expand All @@ -146,10 +145,10 @@ private async Task RunAsync()
}
else
{
siloManifests = gatewayManifest.Silos.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
siloManifests = updateResult.SiloManifests.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
}

var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), gatewayManifest.Silos);
var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), siloManifests);
if (!_updates.TryPublish(updatedManifest))
{
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)));
Expand Down Expand Up @@ -186,6 +185,22 @@ private async Task RunAsync()
}
}

private async Task<ClusterManifestUpdate> GetClusterManifestUpdate(IClusterManifestSystemTarget provider, MajorMinorVersion previousVersion)
{
try
{
return await provider.GetClusterManifestUpdate(previousVersion);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Failed to fetch cluster manifest update from {Provider}.", provider);
}

var manifest = await provider.GetClusterManifest();
var result = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers: true);
return result;
}

/// <inheritdoc />
public ValueTask DisposeAsync()
{
Expand Down
33 changes: 27 additions & 6 deletions src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Orleans.Metadata;

Expand All @@ -15,25 +17,44 @@ internal interface IClusterManifestSystemTarget : ISystemTarget
ValueTask<ClusterManifest> GetClusterManifest();

/// <summary>
/// Gets the current cluster manifest if it is newer than the provided <paramref name="version"/>.
/// Gets an updated cluster manifest if newer than the provided <paramref name="previousVersion"/>.
/// </summary>
/// <returns>The current cluster manifest, or <see langword="null"/> if it is not newer than the provided version.</returns>
ValueTask<ClusterManifestUpdate> GetClusterManifestIfNewer(MajorMinorVersion version);
ValueTask<ClusterManifestUpdate> GetClusterManifestUpdate(MajorMinorVersion previousVersion);
}

/// <summary>
/// Represents an update to the cluster manifest.
/// </summary>
[GenerateSerializer, Immutable]
public readonly struct ClusterManifestUpdate
public class ClusterManifestUpdate
{
public ClusterManifestUpdate(ClusterManifest manifest, bool includesAllActiveServers)
public ClusterManifestUpdate(
MajorMinorVersion manifestVersion,
ImmutableDictionary<SiloAddress, GrainManifest> siloManifests,
bool includesAllActiveServers)
{
Manifest = manifest;
Version = manifestVersion;
SiloManifests = siloManifests;
IncludesAllActiveServers = includesAllActiveServers;
}

/// <summary>
/// Gets the version of this instance.
/// </summary>
[Id(0)]
public ClusterManifest Manifest { get; }
public MajorMinorVersion Version { get; }

/// <summary>
/// Gets the manifests for each silo in the cluster.
/// </summary>
[Id(1)]
public ImmutableDictionary<SiloAddress, GrainManifest> SiloManifests { get; }

/// <summary>
/// Gets a value indicating whether this update includes all active servers.
/// </summary>
[Id(2)]
public bool IncludesAllActiveServers { get; }
}
}
25 changes: 14 additions & 11 deletions src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ internal class ClusterManifestSystemTarget : SystemTarget, IClusterManifestSyste
private readonly IClusterMembershipService _clusterMembershipService;
private readonly IClusterManifestProvider _clusterManifestProvider;
private readonly ClusterManifestUpdate _noUpdate = default;
private MembershipVersion _activeServersMembershipVersion;
private bool _containsAllActiveServers;
private MembershipVersion _cachedMembershipVersion;
private ClusterManifestUpdate _cachedUpdate;

public ClusterManifestSystemTarget(
IClusterMembershipService clusterMembershipService,
Expand All @@ -30,37 +30,40 @@ public ClusterManifestSystemTarget(
}

public ValueTask<ClusterManifest> GetClusterManifest() => new(_clusterManifestProvider.Current);
public ValueTask<ClusterManifestUpdate> GetClusterManifestIfNewer(MajorMinorVersion version)
public ValueTask<ClusterManifestUpdate> GetClusterManifestUpdate(MajorMinorVersion version)
{
var result = _clusterManifestProvider.Current;
var manifest = _clusterManifestProvider.Current;

// Only return an updated manifest if it is newer than the provided version.
if (result.Version <= version)
if (manifest.Version <= version)
{
return new (_noUpdate);
}

// Maintain a cache of whether the current manifest contains all active servers so that it
// does not need to be recomputed each time.
var membershipSnapshot = _clusterMembershipService.CurrentSnapshot;
if (membershipSnapshot.Version > _activeServersMembershipVersion)
if (_cachedUpdate is null
|| membershipSnapshot.Version > _cachedMembershipVersion
|| manifest.Version > _cachedUpdate.Version)
{
_containsAllActiveServers = true;
var includesAllActiveServers = true;
foreach (var server in membershipSnapshot.Members)
{
if (server.Value.Status == SiloStatus.Active)
{
if (!result.Silos.ContainsKey(server.Key))
if (!manifest.Silos.ContainsKey(server.Key))
{
_containsAllActiveServers = false;
includesAllActiveServers = false;
}
}
}

_activeServersMembershipVersion = membershipSnapshot.Version;
_cachedUpdate = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers);
_cachedMembershipVersion = membershipSnapshot.Version;
}

return new (new ClusterManifestUpdate(result, _containsAllActiveServers));
return new (_cachedUpdate);
}

public ValueTask<GrainManifest> GetSiloManifest() => new(_siloManifest);
Expand Down

0 comments on commit 2237758

Please sign in to comment.