Skip to content

Commit

Permalink
Improve grain directory cache consistency (#8696)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond authored Nov 2, 2023
1 parent b4ef03e commit 2a0996c
Show file tree
Hide file tree
Showing 21 changed files with 358 additions and 57 deletions.
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/IDs/GrainAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public bool Matches(GrainAddress other)
&& (_activationId.IsDefault || other._activationId.IsDefault || _activationId.Equals(other._activationId));
}

internal static bool MatchesGrainIdAndSilo(GrainAddress address, GrainAddress other)
{
return other is not null && address.GrainId.Equals(other.GrainId) && (address.SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null);
}

public override int GetHashCode() => HashCode.Combine(SiloAddress, _grainId, _activationId);

public override string ToString() => $"[{nameof(GrainAddress)} GrainId {_grainId}, ActivationId: {_activationId}, SiloAddress: {SiloAddress}]";
Expand Down
109 changes: 109 additions & 0 deletions src/Orleans.Core.Abstractions/IDs/GrainAddressCacheUpdate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System;
using System.Diagnostics.CodeAnalysis;

#nullable enable
namespace Orleans.Runtime;

/// <summary>
/// Represents a directive to update an invalid, cached <see cref="GrainAddress"/> to a valid <see cref="GrainAddress"/>.
/// </summary>
[GenerateSerializer, Immutable]
public sealed class GrainAddressCacheUpdate : ISpanFormattable
{
[Id(0)]
private readonly GrainId _grainId;

[Id(1)]
private readonly ActivationId _invalidActivationId;

[Id(2)]
private readonly SiloAddress? _invalidSiloAddress;

[Id(3)]
private readonly MembershipVersion _invalidMembershipVersion = MembershipVersion.MinValue;

[Id(4)]
private readonly ActivationId _validActivationId;

[Id(5)]
private readonly SiloAddress? _validSiloAddress;

[Id(6)]
private readonly MembershipVersion _validMembershipVersion = MembershipVersion.MinValue;

public GrainAddressCacheUpdate(GrainAddress invalidAddress, GrainAddress? validAddress)
{
ArgumentNullException.ThrowIfNull(invalidAddress);

_grainId = invalidAddress.GrainId;
_invalidActivationId = invalidAddress.ActivationId;
_invalidSiloAddress = invalidAddress.SiloAddress;
_invalidMembershipVersion = invalidAddress.MembershipVersion;

if (validAddress is not null)
{
if (invalidAddress.GrainId != validAddress.GrainId)
{
ThrowGrainIdDoesNotMatch(invalidAddress, validAddress);
return;
}

_validActivationId = validAddress.ActivationId;
_validSiloAddress = validAddress.SiloAddress;
_validMembershipVersion = validAddress.MembershipVersion;
}
}

/// <summary>
/// Identifier of the Grain.
/// </summary>
public GrainId GrainId => _grainId;

/// <summary>
/// Identifier of the invalid grain activation.
/// </summary>
public ActivationId InvalidActivationId => _invalidActivationId;

/// <summary>
/// Address of the silo indicated by the invalid grain activation cache entry.
/// </summary>
public SiloAddress? InvalidSiloAddress => _invalidSiloAddress;

/// <summary>
/// Gets the valid grain activation address.
/// </summary>
public GrainAddress? ValidGrainAddress => _validSiloAddress switch
{
null => null,
_ => new()
{
GrainId = _grainId,
ActivationId = _validActivationId,
SiloAddress = _validSiloAddress,
MembershipVersion = _validMembershipVersion,
}
};

/// <summary>
/// Gets the invalid grain activation address.
/// </summary>
public GrainAddress InvalidGrainAddress => new()
{
GrainId = _grainId,
ActivationId = _invalidActivationId,
SiloAddress = _invalidSiloAddress,
MembershipVersion = _invalidMembershipVersion,
};

public override string ToString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]";

string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString();

bool ISpanFormattable.TryFormat(Span<char> destination, out int charsWritten, ReadOnlySpan<char> format, IFormatProvider? provider)
=> destination.TryWrite($"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]", out charsWritten);

public string ToFullString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}, MembershipVersion: {_invalidMembershipVersion}]";

[DoesNotReturn]
private static void ThrowGrainIdDoesNotMatch(GrainAddress invalidAddress, GrainAddress validAddress) => throw new ArgumentException($"Invalid grain address grain id {invalidAddress.GrainId} does not match valid grain address grain id {validAddress.GrainId}.", nameof(validAddress));
}
8 changes: 4 additions & 4 deletions src/Orleans.Core/GrainDirectory/IGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public interface IGrainLocator
ValueTask<GrainAddress?> Lookup(GrainId grainId);

/// <summary>
/// Records a grain placement decision.
/// Updates the cache with a grain placement decision or known activation address.
/// </summary>
/// <param name="grainId">The newly placed grain.</param>
/// <param name="siloAddress">The placement result.</param>
void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress);
/// <param name="grainId">The grain identifier.</param>
/// <param name="siloAddress">The silo which may host the grain.</param>
void UpdateCache(GrainId grainId, SiloAddress siloAddress);

/// <summary>
/// Invalidates any lookup cache entry associated with the provided grain id.
Expand Down
10 changes: 5 additions & 5 deletions src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal sealed class Message : ISpanFormattable
public ushort _interfaceVersion;
public GrainInterfaceType _interfaceType;

public List<GrainAddress> _cacheInvalidationHeader;
public List<GrainAddressCacheUpdate> _cacheInvalidationHeader;

public PackedHeaders Headers { get => _headers; set => _headers = value; }

Expand Down Expand Up @@ -204,7 +204,7 @@ internal void SetInfiniteTimeToLive()
_timeToExpiry = default;
}

public List<GrainAddress> CacheInvalidationHeader
public List<GrainAddressCacheUpdate> CacheInvalidationHeader
{
get => _cacheInvalidationHeader;
set
Expand Down Expand Up @@ -245,15 +245,15 @@ public bool IsExpirableMessage(bool dropExpiredMessages)
return Direction != Directions.OneWay && !id.IsSystemTarget();
}

internal void AddToCacheInvalidationHeader(GrainAddress address)
internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAddress validAddress)
{
var list = new List<GrainAddress>();
var list = new List<GrainAddressCacheUpdate>();
if (CacheInvalidationHeader != null)
{
list.AddRange(CacheInvalidationHeader);
}

list.Add(address);
list.Add(new GrainAddressCacheUpdate(invalidAddress, validAddress));
CacheInvalidationHeader = list;
}

Expand Down
14 changes: 7 additions & 7 deletions src/Orleans.Core/Messaging/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed class MessageSerializer
private const int MessageSizeHint = 4096;
private readonly Dictionary<Type, ResponseCodec> _rawResponseCodecs = new();
private readonly CodecProvider _codecProvider;
private readonly IFieldCodec<GrainAddress> _activationAddressCodec;
private readonly IFieldCodec<GrainAddressCacheUpdate> _activationAddressCodec;
private readonly CachingSiloAddressCodec _readerSiloAddressCodec = new();
private readonly CachingSiloAddressCodec _writerSiloAddressCodec = new();
private readonly CachingIdSpanCodec _idSpanCodec = new();
Expand All @@ -49,7 +49,7 @@ public MessageSerializer(
_maxBodyLength = options.MaxMessageBodySize;
_codecProvider = sessionPool.CodecProvider;
_requestContextCodec = OrleansGeneratedCodeHelper.GetService<DictionaryCodec<string, object>>(this, sessionPool.CodecProvider);
_activationAddressCodec = OrleansGeneratedCodeHelper.GetService<IFieldCodec<GrainAddress>>(this, sessionPool.CodecProvider);
_activationAddressCodec = OrleansGeneratedCodeHelper.GetService<IFieldCodec<GrainAddressCacheUpdate>>(this, sessionPool.CodecProvider);
_bufferWriter = new(FramingLength, MessageSizeHint, memoryPool.Pool);
}

Expand Down Expand Up @@ -300,12 +300,12 @@ private void Deserialize<TInput>(ref Reader<TInput> reader, Message result)
}
}

private List<GrainAddress> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInput> reader)
internal List<GrainAddressCacheUpdate> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInput> reader)
{
var n = (int)reader.ReadVarUInt32();
if (n > 0)
{
var list = new List<GrainAddress>(n);
var list = new List<GrainAddressCacheUpdate>(n);
for (int i = 0; i < n; i++)
{
list.Add(_activationAddressCodec.ReadValue(ref reader, reader.ReadFieldHeader()));
Expand All @@ -314,15 +314,15 @@ private List<GrainAddress> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInpu
return list;
}

return new List<GrainAddress>();
return new List<GrainAddressCacheUpdate>();
}

private void WriteCacheInvalidationHeaders<TBufferWriter>(ref Writer<TBufferWriter> writer, List<GrainAddress> value) where TBufferWriter : IBufferWriter<byte>
internal void WriteCacheInvalidationHeaders<TBufferWriter>(ref Writer<TBufferWriter> writer, List<GrainAddressCacheUpdate> value) where TBufferWriter : IBufferWriter<byte>
{
writer.WriteVarUInt32((uint)value.Count);
foreach (var entry in value)
{
_activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddress), entry);
_activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddressCacheUpdate), entry);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,8 @@ void ProcessPendingRequests()
if (!compatibilityDirector.IsCompatible(message.InterfaceVersion, currentVersion))
{
// Add this activation to cache invalidation headers.
message.CacheInvalidationHeader ??= new();
message.CacheInvalidationHeader.Add(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress });
message.CacheInvalidationHeader ??= new List<GrainAddressCacheUpdate>();
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null));

var reason = new DeactivationReason(
DeactivationReasonCode.IncompatibleRequest,
Expand Down Expand Up @@ -1302,7 +1302,7 @@ private void RejectAllQueuedMessages()
"RejectAllQueuedMessages: {Count} messages from invalid activation {Activation}.",
msgs.Count,
this);
_shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address);
_shared.InternalRuntime.GrainLocator.InvalidateCache(Address);
_shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(
msgs,
Address,
Expand All @@ -1324,7 +1324,7 @@ private void RerouteAllQueuedMessages()
}

if (_shared.Logger.IsEnabled(LogLevel.Debug)) _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain}", msgs.Count, this);
_shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address);
_shared.InternalRuntime.GrainLocator.InvalidateCache(Address);
_shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(msgs, Address, ForwardingAddress, DeactivationReason.Description, DeactivationException);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)

CatalogInstruments.NonExistentActivations.Add(1);

self.directory.InvalidateCacheEntry(grainId);
self.grainLocator.InvalidateCache(grainId);

// Unregister the target activation so we don't keep getting spurious messages.
// The time delay (one minute, as of this writing) is to handle the unlikely but possible race where
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ public void SniffIncomingMessage(Message message)
{
if (message.CacheInvalidationHeader != null)
{
foreach (GrainAddress address in message.CacheInvalidationHeader)
foreach (var update in message.CacheInvalidationHeader)
{
GrainLocator.InvalidateCache(address);
GrainLocator.UpdateCache(update);
}
}

Expand Down Expand Up @@ -392,7 +392,7 @@ public void ReceiveResponse(Message message)
if (message.CacheInvalidationHeader is null)
{
// Remove from local directory cache. Note that SendingGrain is the original target, since message is the rejection response.
// If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatability.
// If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatibility.
// It should be retired as we move to use CacheMgmtHeader in all relevant places.
this.GrainLocator.InvalidateCache(message.SendingGrain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal void Refresh(TimeSpan newExpirationTimer)
}
}

private static readonly Func<GrainAddress, GrainDirectoryCacheEntry, bool> ActivationAddressesMatches = (addr, entry) => addr.Matches(entry.Address);
private static readonly Func<GrainAddress, GrainDirectoryCacheEntry, bool> ActivationAddressesMatches = (addr, entry) => GrainAddress.MatchesGrainIdAndSilo(addr, entry.Address);

private readonly LRU<GrainId, GrainDirectoryCacheEntry> cache;
/// controls the time the new entry is considered "fresh" (unit: ms)
Expand Down Expand Up @@ -88,7 +88,7 @@ public bool LookUp(GrainId key, out GrainAddress result, out int version)

// Here we do not check whether the found entry is expired.
// It will be done by the thread managing the cache.
// This is to avoid situation where the entry was just expired, but the manager still have not run and have not refereshed it.
// This is to avoid situation where the entry was just expired, but the manager still have not run and have not refreshed it.
if (!cache.TryGetValue(key, out var tmp))
{
result = default;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public async Task Unregister(GrainAddress address, UnregistrationCause cause)
await GetGrainDirectory(address.GrainId.Type).Unregister(address);

// There is the potential for a lookup to race with the Unregister and add the bad entry back to the cache.
if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry == address)
if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry.Equals(address))
{
this.cache.Remove(address);
}
Expand Down Expand Up @@ -192,7 +192,7 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh

private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}");

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public void InvalidateCache(GrainId grainId) => cache.Remove(grainId);
public void InvalidateCache(GrainAddress address) => cache.Remove(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private GrainAddress SelectAddress(List<GrainAddress> results, GrainId grainId)

private static void ThrowNotClientGrainId(GrainId grainId) => throw new InvalidOperationException($"{grainId} is not a client id");

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) { }
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) { }

public void InvalidateCache(GrainId grainId) { }

Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Task Unregister(GrainAddress address, UnregistrationCause cause)
{
UnregistrationCause.Force => _forceWorker,
UnregistrationCause.NonexistentActivation => _neaWorker,
_ => throw new ArgumentOutOfRangeException($"Unregistration cause {cause} is unknown and is not supported. This is a bug."),
_ => throw new ArgumentOutOfRangeException($"Deregistration cause {cause} is unknown and is not supported. This is a bug."),
};

return worker.Unregister(address);
Expand Down Expand Up @@ -70,7 +70,7 @@ void EnsureInitialized()
public static DhtGrainLocator FromLocalGrainDirectory(LocalGrainDirectory localGrainDirectory)
=> new(localGrainDirectory, localGrainDirectory.RemoteGrainDirectory);

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.CachePlacementDecision(grainId, siloAddress);
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.AddOrUpdateCacheEntry(grainId, siloAddress);
public void InvalidateCache(GrainId grainId) => _localGrainDirectory.InvalidateCacheEntry(grainId);
public void InvalidateCache(GrainAddress address) => _localGrainDirectory.InvalidateCacheEntry(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address) => _localGrainDirectory.TryCachedLookup(grainId, out address);
Expand Down
18 changes: 16 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/GrainLocator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#nullable enable
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Orleans.GrainDirectory;
Expand Down Expand Up @@ -29,8 +30,21 @@ public GrainLocator(GrainLocatorResolver grainLocatorResolver)

public void InvalidateCache(GrainAddress address) => GetGrainLocator(address.GrainId.Type).InvalidateCache(address);

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).CachePlacementDecision(grainId, siloAddress);

private IGrainLocator GetGrainLocator(GrainType grainType) => _grainLocatorResolver.GetGrainLocator(grainType);

public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).UpdateCache(grainId, siloAddress);

public void UpdateCache(GrainAddressCacheUpdate update)
{
if (update.ValidGrainAddress is { } validAddress)
{
Debug.Assert(validAddress.SiloAddress is not null);
UpdateCache(validAddress.GrainId, validAddress.SiloAddress);
}
else
{
InvalidateCache(update.InvalidGrainAddress);
}
}
}
}
Loading

0 comments on commit 2a0996c

Please sign in to comment.