diff --git a/src/Orleans.Core.Abstractions/IDs/GrainAddress.cs b/src/Orleans.Core.Abstractions/IDs/GrainAddress.cs index 939f12e617..94c1ab9a9d 100644 --- a/src/Orleans.Core.Abstractions/IDs/GrainAddress.cs +++ b/src/Orleans.Core.Abstractions/IDs/GrainAddress.cs @@ -62,6 +62,11 @@ public bool Matches(GrainAddress other) && (_activationId.IsDefault || other._activationId.IsDefault || _activationId.Equals(other._activationId)); } + internal static bool MatchesGrainIdAndSilo(GrainAddress address, GrainAddress other) + { + return other is not null && address.GrainId.Equals(other.GrainId) && (address.SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null); + } + public override int GetHashCode() => HashCode.Combine(SiloAddress, _grainId, _activationId); public override string ToString() => $"[{nameof(GrainAddress)} GrainId {_grainId}, ActivationId: {_activationId}, SiloAddress: {SiloAddress}]"; diff --git a/src/Orleans.Core.Abstractions/IDs/GrainAddressCacheUpdate.cs b/src/Orleans.Core.Abstractions/IDs/GrainAddressCacheUpdate.cs new file mode 100644 index 0000000000..28b436e87f --- /dev/null +++ b/src/Orleans.Core.Abstractions/IDs/GrainAddressCacheUpdate.cs @@ -0,0 +1,109 @@ +using System; +using System.Diagnostics.CodeAnalysis; + +#nullable enable +namespace Orleans.Runtime; + +/// +/// Represents a directive to update an invalid, cached to a valid . +/// +[GenerateSerializer, Immutable] +public sealed class GrainAddressCacheUpdate : ISpanFormattable +{ + [Id(0)] + private readonly GrainId _grainId; + + [Id(1)] + private readonly ActivationId _invalidActivationId; + + [Id(2)] + private readonly SiloAddress? _invalidSiloAddress; + + [Id(3)] + private readonly MembershipVersion _invalidMembershipVersion = MembershipVersion.MinValue; + + [Id(4)] + private readonly ActivationId _validActivationId; + + [Id(5)] + private readonly SiloAddress? _validSiloAddress; + + [Id(6)] + private readonly MembershipVersion _validMembershipVersion = MembershipVersion.MinValue; + + public GrainAddressCacheUpdate(GrainAddress invalidAddress, GrainAddress? validAddress) + { + ArgumentNullException.ThrowIfNull(invalidAddress); + + _grainId = invalidAddress.GrainId; + _invalidActivationId = invalidAddress.ActivationId; + _invalidSiloAddress = invalidAddress.SiloAddress; + _invalidMembershipVersion = invalidAddress.MembershipVersion; + + if (validAddress is not null) + { + if (invalidAddress.GrainId != validAddress.GrainId) + { + ThrowGrainIdDoesNotMatch(invalidAddress, validAddress); + return; + } + + _validActivationId = validAddress.ActivationId; + _validSiloAddress = validAddress.SiloAddress; + _validMembershipVersion = validAddress.MembershipVersion; + } + } + + /// + /// Identifier of the Grain. + /// + public GrainId GrainId => _grainId; + + /// + /// Identifier of the invalid grain activation. + /// + public ActivationId InvalidActivationId => _invalidActivationId; + + /// + /// Address of the silo indicated by the invalid grain activation cache entry. + /// + public SiloAddress? InvalidSiloAddress => _invalidSiloAddress; + + /// + /// Gets the valid grain activation address. + /// + public GrainAddress? ValidGrainAddress => _validSiloAddress switch + { + null => null, + _ => new() + { + GrainId = _grainId, + ActivationId = _validActivationId, + SiloAddress = _validSiloAddress, + MembershipVersion = _validMembershipVersion, + } + }; + + /// + /// Gets the invalid grain activation address. + /// + public GrainAddress InvalidGrainAddress => new() + { + GrainId = _grainId, + ActivationId = _invalidActivationId, + SiloAddress = _invalidSiloAddress, + MembershipVersion = _invalidMembershipVersion, + }; + + public override string ToString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]"; + + string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString(); + + bool ISpanFormattable.TryFormat(Span destination, out int charsWritten, ReadOnlySpan format, IFormatProvider? provider) + => destination.TryWrite($"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]", out charsWritten); + + public string ToFullString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}, MembershipVersion: {_invalidMembershipVersion}]"; + + [DoesNotReturn] + private static void ThrowGrainIdDoesNotMatch(GrainAddress invalidAddress, GrainAddress validAddress) => throw new ArgumentException($"Invalid grain address grain id {invalidAddress.GrainId} does not match valid grain address grain id {validAddress.GrainId}.", nameof(validAddress)); +} diff --git a/src/Orleans.Core/GrainDirectory/IGrainLocator.cs b/src/Orleans.Core/GrainDirectory/IGrainLocator.cs index 15f36a748f..473e898170 100644 --- a/src/Orleans.Core/GrainDirectory/IGrainLocator.cs +++ b/src/Orleans.Core/GrainDirectory/IGrainLocator.cs @@ -33,11 +33,11 @@ public interface IGrainLocator ValueTask Lookup(GrainId grainId); /// - /// Records a grain placement decision. + /// Updates the cache with a grain placement decision or known activation address. /// - /// The newly placed grain. - /// The placement result. - void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress); + /// The grain identifier. + /// The silo which may host the grain. + void UpdateCache(GrainId grainId, SiloAddress siloAddress); /// /// Invalidates any lookup cache entry associated with the provided grain id. diff --git a/src/Orleans.Core/Messaging/Message.cs b/src/Orleans.Core/Messaging/Message.cs index 1480b02d18..6c82223d3b 100644 --- a/src/Orleans.Core/Messaging/Message.cs +++ b/src/Orleans.Core/Messaging/Message.cs @@ -32,7 +32,7 @@ internal sealed class Message : ISpanFormattable public ushort _interfaceVersion; public GrainInterfaceType _interfaceType; - public List _cacheInvalidationHeader; + public List _cacheInvalidationHeader; public PackedHeaders Headers { get => _headers; set => _headers = value; } @@ -204,7 +204,7 @@ internal void SetInfiniteTimeToLive() _timeToExpiry = default; } - public List CacheInvalidationHeader + public List CacheInvalidationHeader { get => _cacheInvalidationHeader; set @@ -245,15 +245,15 @@ public bool IsExpirableMessage(bool dropExpiredMessages) return Direction != Directions.OneWay && !id.IsSystemTarget(); } - internal void AddToCacheInvalidationHeader(GrainAddress address) + internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAddress validAddress) { - var list = new List(); + var list = new List(); if (CacheInvalidationHeader != null) { list.AddRange(CacheInvalidationHeader); } - list.Add(address); + list.Add(new GrainAddressCacheUpdate(invalidAddress, validAddress)); CacheInvalidationHeader = list; } diff --git a/src/Orleans.Core/Messaging/MessageSerializer.cs b/src/Orleans.Core/Messaging/MessageSerializer.cs index 7131d508ae..2879be7f35 100644 --- a/src/Orleans.Core/Messaging/MessageSerializer.cs +++ b/src/Orleans.Core/Messaging/MessageSerializer.cs @@ -27,7 +27,7 @@ internal sealed class MessageSerializer private const int MessageSizeHint = 4096; private readonly Dictionary _rawResponseCodecs = new(); private readonly CodecProvider _codecProvider; - private readonly IFieldCodec _activationAddressCodec; + private readonly IFieldCodec _activationAddressCodec; private readonly CachingSiloAddressCodec _readerSiloAddressCodec = new(); private readonly CachingSiloAddressCodec _writerSiloAddressCodec = new(); private readonly CachingIdSpanCodec _idSpanCodec = new(); @@ -49,7 +49,7 @@ public MessageSerializer( _maxBodyLength = options.MaxMessageBodySize; _codecProvider = sessionPool.CodecProvider; _requestContextCodec = OrleansGeneratedCodeHelper.GetService>(this, sessionPool.CodecProvider); - _activationAddressCodec = OrleansGeneratedCodeHelper.GetService>(this, sessionPool.CodecProvider); + _activationAddressCodec = OrleansGeneratedCodeHelper.GetService>(this, sessionPool.CodecProvider); _bufferWriter = new(FramingLength, MessageSizeHint, memoryPool.Pool); } @@ -300,12 +300,12 @@ private void Deserialize(ref Reader reader, Message result) } } - private List ReadCacheInvalidationHeaders(ref Reader reader) + internal List ReadCacheInvalidationHeaders(ref Reader reader) { var n = (int)reader.ReadVarUInt32(); if (n > 0) { - var list = new List(n); + var list = new List(n); for (int i = 0; i < n; i++) { list.Add(_activationAddressCodec.ReadValue(ref reader, reader.ReadFieldHeader())); @@ -314,15 +314,15 @@ private List ReadCacheInvalidationHeaders(ref Reader(); + return new List(); } - private void WriteCacheInvalidationHeaders(ref Writer writer, List value) where TBufferWriter : IBufferWriter + internal void WriteCacheInvalidationHeaders(ref Writer writer, List value) where TBufferWriter : IBufferWriter { writer.WriteVarUInt32((uint)value.Count); foreach (var entry in value) { - _activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddress), entry); + _activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddressCacheUpdate), entry); } } diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index 717f6b29ec..6da9ef23e8 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -894,8 +894,8 @@ void ProcessPendingRequests() if (!compatibilityDirector.IsCompatible(message.InterfaceVersion, currentVersion)) { // Add this activation to cache invalidation headers. - message.CacheInvalidationHeader ??= new(); - message.CacheInvalidationHeader.Add(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }); + message.CacheInvalidationHeader ??= new List(); + message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null)); var reason = new DeactivationReason( DeactivationReasonCode.IncompatibleRequest, @@ -1302,7 +1302,7 @@ private void RejectAllQueuedMessages() "RejectAllQueuedMessages: {Count} messages from invalid activation {Activation}.", msgs.Count, this); - _shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address); + _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation( msgs, Address, @@ -1324,7 +1324,7 @@ private void RerouteAllQueuedMessages() } if (_shared.Logger.IsEnabled(LogLevel.Debug)) _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain}", msgs.Count, this); - _shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address); + _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(msgs, Address, ForwardingAddress, DeactivationReason.Description, DeactivationException); } } diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 1e81283b44..f384e9c272 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -312,7 +312,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId) CatalogInstruments.NonExistentActivations.Add(1); - self.directory.InvalidateCacheEntry(grainId); + self.grainLocator.InvalidateCache(grainId); // Unregister the target activation so we don't keep getting spurious messages. // The time delay (one minute, as of this writing) is to handle the unlikely but possible race where diff --git a/src/Orleans.Runtime/Core/InsideRuntimeClient.cs b/src/Orleans.Runtime/Core/InsideRuntimeClient.cs index c618f07dbf..11d48f1ca2 100644 --- a/src/Orleans.Runtime/Core/InsideRuntimeClient.cs +++ b/src/Orleans.Runtime/Core/InsideRuntimeClient.cs @@ -200,9 +200,9 @@ public void SniffIncomingMessage(Message message) { if (message.CacheInvalidationHeader != null) { - foreach (GrainAddress address in message.CacheInvalidationHeader) + foreach (var update in message.CacheInvalidationHeader) { - GrainLocator.InvalidateCache(address); + GrainLocator.UpdateCache(update); } } @@ -392,7 +392,7 @@ public void ReceiveResponse(Message message) if (message.CacheInvalidationHeader is null) { // Remove from local directory cache. Note that SendingGrain is the original target, since message is the rejection response. - // If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatability. + // If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatibility. // It should be retired as we move to use CacheMgmtHeader in all relevant places. this.GrainLocator.InvalidateCache(message.SendingGrain); } diff --git a/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs b/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs index 7adf74dd1c..14f6b659f6 100644 --- a/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs +++ b/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs @@ -41,7 +41,7 @@ internal void Refresh(TimeSpan newExpirationTimer) } } - private static readonly Func ActivationAddressesMatches = (addr, entry) => addr.Matches(entry.Address); + private static readonly Func ActivationAddressesMatches = (addr, entry) => GrainAddress.MatchesGrainIdAndSilo(addr, entry.Address); private readonly LRU cache; /// controls the time the new entry is considered "fresh" (unit: ms) @@ -88,7 +88,7 @@ public bool LookUp(GrainId key, out GrainAddress result, out int version) // Here we do not check whether the found entry is expired. // It will be done by the thread managing the cache. - // This is to avoid situation where the entry was just expired, but the manager still have not run and have not refereshed it. + // This is to avoid situation where the entry was just expired, but the manager still have not run and have not refreshed it. if (!cache.TryGetValue(key, out var tmp)) { result = default; diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index 5d28a003b4..7ab2aa0ced 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -117,7 +117,7 @@ public async Task Unregister(GrainAddress address, UnregistrationCause cause) await GetGrainDirectory(address.GrainId.Type).Unregister(address); // There is the potential for a lookup to race with the Unregister and add the bad entry back to the cache. - if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry == address) + if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry.Equals(address)) { this.cache.Remove(address); } @@ -192,7 +192,7 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); public void InvalidateCache(GrainAddress address) => cache.Remove(address); public bool TryLookupInCache(GrainId grainId, out GrainAddress address) diff --git a/src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs index bde4f7a6c4..5eb980b3bf 100644 --- a/src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs @@ -61,7 +61,7 @@ private GrainAddress SelectAddress(List results, GrainId grainId) private static void ThrowNotClientGrainId(GrainId grainId) => throw new InvalidOperationException($"{grainId} is not a client id"); - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) { } + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) { } public void InvalidateCache(GrainId grainId) { } diff --git a/src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs index 7b87fc2a0c..1b24fcd47a 100644 --- a/src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs @@ -39,7 +39,7 @@ public Task Unregister(GrainAddress address, UnregistrationCause cause) { UnregistrationCause.Force => _forceWorker, UnregistrationCause.NonexistentActivation => _neaWorker, - _ => throw new ArgumentOutOfRangeException($"Unregistration cause {cause} is unknown and is not supported. This is a bug."), + _ => throw new ArgumentOutOfRangeException($"Deregistration cause {cause} is unknown and is not supported. This is a bug."), }; return worker.Unregister(address); @@ -70,7 +70,7 @@ void EnsureInitialized() public static DhtGrainLocator FromLocalGrainDirectory(LocalGrainDirectory localGrainDirectory) => new(localGrainDirectory, localGrainDirectory.RemoteGrainDirectory); - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.CachePlacementDecision(grainId, siloAddress); + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.AddOrUpdateCacheEntry(grainId, siloAddress); public void InvalidateCache(GrainId grainId) => _localGrainDirectory.InvalidateCacheEntry(grainId); public void InvalidateCache(GrainAddress address) => _localGrainDirectory.InvalidateCacheEntry(address); public bool TryLookupInCache(GrainId grainId, out GrainAddress address) => _localGrainDirectory.TryCachedLookup(grainId, out address); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/GrainLocator.cs index 96dab362e5..582a43cd10 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainLocator.cs @@ -1,4 +1,5 @@ #nullable enable +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Orleans.GrainDirectory; @@ -29,8 +30,21 @@ public GrainLocator(GrainLocatorResolver grainLocatorResolver) public void InvalidateCache(GrainAddress address) => GetGrainLocator(address.GrainId.Type).InvalidateCache(address); - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).CachePlacementDecision(grainId, siloAddress); - private IGrainLocator GetGrainLocator(GrainType grainType) => _grainLocatorResolver.GetGrainLocator(grainType); + + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).UpdateCache(grainId, siloAddress); + + public void UpdateCache(GrainAddressCacheUpdate update) + { + if (update.ValidGrainAddress is { } validAddress) + { + Debug.Assert(validAddress.SiloAddress is not null); + UpdateCache(validAddress.GrainId, validAddress.SiloAddress); + } + else + { + InvalidateCache(update.InvalidGrainAddress); + } + } } } diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index cafc4debf5..831c860921 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -61,10 +61,10 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory void InvalidateCacheEntry(GrainId grainId); /// - /// Adds a cache entry for the given activation addrss. + /// Adds or updates a cache entry for the given activation address. /// This method is intended to be called whenever a placement decision is made. /// - void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress); + void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress); /// /// For testing purposes only. diff --git a/src/Orleans.Runtime/GrainDirectory/LRUBasedGrainDirectoryCache.cs b/src/Orleans.Runtime/GrainDirectory/LRUBasedGrainDirectoryCache.cs index 4938d5818f..84845a7882 100644 --- a/src/Orleans.Runtime/GrainDirectory/LRUBasedGrainDirectoryCache.cs +++ b/src/Orleans.Runtime/GrainDirectory/LRUBasedGrainDirectoryCache.cs @@ -6,7 +6,7 @@ namespace Orleans.Runtime.GrainDirectory { internal class LRUBasedGrainDirectoryCache : IGrainDirectoryCache { - private static readonly Func ActivationAddressesMatch = (a, b) => a.Matches(b.Address); + private static readonly Func ActivationAddressesMatch = (a, b) => GrainAddress.MatchesGrainIdAndSilo(a, b.Address); private readonly LRU cache; public LRUBasedGrainDirectoryCache(int maxCacheSize, TimeSpan maxEntryAge) => cache = new(maxCacheSize, maxEntryAge); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index fc2bb41f29..efa073b15e 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -817,7 +817,7 @@ public bool IsSiloInCluster(SiloAddress silo) return this.directoryMembership.MembershipCache.Contains(silo); } - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null; private class DirectoryMembership diff --git a/src/Orleans.Runtime/Messaging/MessageCenter.cs b/src/Orleans.Runtime/Messaging/MessageCenter.cs index 621e6409fa..394702bf49 100644 --- a/src/Orleans.Runtime/Messaging/MessageCenter.cs +++ b/src/Orleans.Runtime/Messaging/MessageCenter.cs @@ -25,7 +25,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable private readonly SiloMessagingOptions messagingOptions; private readonly PlacementService placementService; private readonly ActivationDirectory activationDirectory; - private readonly ILocalGrainDirectory localGrainDirectory; + private readonly GrainLocator _grainLocator; private readonly ILogger log; private readonly Catalog catalog; private bool stopped; @@ -43,8 +43,8 @@ public MessageCenter( RuntimeMessagingTrace messagingTrace, IOptions messagingOptions, PlacementService placementService, - ILocalGrainDirectory localGrainDirectory, - ActivationDirectory activationDirectory) + ActivationDirectory activationDirectory, + GrainLocator grainLocator) { this.catalog = catalog; this.messagingOptions = messagingOptions.Value; @@ -52,8 +52,8 @@ public MessageCenter( this.connectionManager = senderManager; this.messagingTrace = messagingTrace; this.placementService = placementService; - this.localGrainDirectory = localGrainDirectory; this.activationDirectory = activationDirectory; + _grainLocator = grainLocator; this.log = logger; this.messageFactory = messageFactory; this._siloAddress = siloDetails.SiloAddress; @@ -273,11 +273,21 @@ internal void ProcessRequestsToInvalidActivation( { if (rejectMessages) { + GrainAddress validAddress = forwardingAddress switch + { + null => null, + _ => new() + { + GrainId = oldAddress.GrainId, + SiloAddress = forwardingAddress, + } + }; + foreach (var message in messages) { if (oldAddress != null) { - message.AddToCacheInvalidationHeader(oldAddress); + message.AddToCacheInvalidationHeader(oldAddress, validAddress: validAddress); } RejectMessage(message, Message.RejectionTypes.Transient, exc, failedOperation); @@ -286,9 +296,19 @@ internal void ProcessRequestsToInvalidActivation( else { this.messagingTrace.OnDispatcherForwardingMultiple(messages.Count, oldAddress, forwardingAddress, failedOperation, exc); + GrainAddress destination = forwardingAddress switch + { + null => null, + _ => new() + { + GrainId = oldAddress.GrainId, + SiloAddress = forwardingAddress, + } + }; + foreach (var message in messages) { - TryForwardRequest(message, oldAddress, forwardingAddress, failedOperation, exc); + TryForwardRequest(message, oldAddress, destination, failedOperation, exc); } } } @@ -304,7 +324,7 @@ internal void ProcessRequestToInvalidActivation( // Just use this opportunity to invalidate local Cache Entry as well. if (oldAddress != null) { - this.localGrainDirectory.InvalidateCacheEntry(oldAddress); + _grainLocator.InvalidateCache(oldAddress); } // IMPORTANT: do not do anything on activation context anymore, since this activation is invalid already. @@ -314,20 +334,30 @@ internal void ProcessRequestToInvalidActivation( } else { - this.TryForwardRequest(message, oldAddress, forwardingAddress, failedOperation, exc); + GrainAddress destination = forwardingAddress switch + { + null => null, + _ => new() + { + GrainId = oldAddress.GrainId, + SiloAddress = forwardingAddress, + } + }; + this.TryForwardRequest(message, oldAddress, destination, failedOperation, exc); } } - internal void TryForwardRequest(Message message, GrainAddress oldAddress, SiloAddress forwardingAddress, string failedOperation = null, Exception exc = null) + internal void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null) { bool forwardingSucceeded = false; + var forwardingAddress = destination?.SiloAddress; try { this.messagingTrace.OnDispatcherForwarding(message, oldAddress, forwardingAddress, failedOperation, exc); if (oldAddress != null) { - message.AddToCacheInvalidationHeader(oldAddress); + message.AddToCacheInvalidationHeader(oldAddress, validAddress: destination); } forwardingSucceeded = this.TryForwardMessage(message, forwardingAddress); diff --git a/src/Orleans.Runtime/Networking/SiloConnection.cs b/src/Orleans.Runtime/Networking/SiloConnection.cs index a8aa4c2433..7580107287 100644 --- a/src/Orleans.Runtime/Networking/SiloConnection.cs +++ b/src/Orleans.Runtime/Networking/SiloConnection.cs @@ -130,7 +130,7 @@ protected override void OnReceivedMessage(Message msg) // Invalidate the remote caller's activation cache entry. if (msg.TargetSilo != null) { - rejection.AddToCacheInvalidationHeader(new GrainAddress { GrainId = msg.TargetGrain, SiloAddress = msg.TargetSilo }); + rejection.AddToCacheInvalidationHeader(new GrainAddress { GrainId = msg.TargetGrain, SiloAddress = msg.TargetSilo }, validAddress: null); } this.Send(rejection); diff --git a/src/Orleans.Runtime/Placement/PlacementService.cs b/src/Orleans.Runtime/Placement/PlacementService.cs index f43eb7913c..1494a6580d 100644 --- a/src/Orleans.Runtime/Placement/PlacementService.cs +++ b/src/Orleans.Runtime/Placement/PlacementService.cs @@ -164,24 +164,31 @@ private bool CachedAddressIsValid(Message message, GrainAddress cachedAddress) // Verify that the result from the cache has not been invalidated by the message being addressed. return message.CacheInvalidationHeader switch { - { Count: > 0 } invalidAddresses => CachedAddressIsValidCore(message, cachedAddress, invalidAddresses), + { Count: > 0 } cacheUpdates => CachedAddressIsValidCore(message, cachedAddress, cacheUpdates), _ => true }; [MethodImpl(MethodImplOptions.NoInlining)] - bool CachedAddressIsValidCore(Message message, GrainAddress cachedAddress, List invalidAddresses) + bool CachedAddressIsValidCore(Message message, GrainAddress cachedAddress, List cacheUpdates) { var resultIsValid = true; if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Invalidating {Count} cached entries for message {Message}", invalidAddresses.Count, message); + _logger.LogDebug("Invalidating {Count} cached entries for message {Message}", cacheUpdates.Count, message); } - foreach (var address in invalidAddresses) + foreach (var update in cacheUpdates) { - // Invalidate the cache entries while we are examining them. - _grainLocator.InvalidateCache(address); - if (cachedAddress.Matches(address)) + // Invalidate/update cache entries while we are examining them. + var invalidAddress = update.InvalidGrainAddress; + var validAddress = update.ValidGrainAddress; + _grainLocator.UpdateCache(update); + + if (cachedAddress.Matches(validAddress)) + { + resultIsValid = true; + } + else if (cachedAddress.Matches(invalidAddress)) { resultIsValid = false; } @@ -371,7 +378,7 @@ private async Task GetOrPlaceActivationAsync(Message firstMessage) } _placementService._grainLocator.InvalidateCache(targetGrain); - _placementService._grainLocator.CachePlacementDecision(targetGrain, siloAddress); + _placementService._grainLocator.UpdateCache(targetGrain, siloAddress); return siloAddress; } diff --git a/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs b/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs index 90c369b5de..dbb214944e 100644 --- a/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs +++ b/test/NonSilo.Tests/Directory/MockLocalGrainDirectory.cs @@ -118,7 +118,7 @@ public Task UnregisterAfterNonexistingActivation(GrainAddress address, SiloAddre throw new NotImplementedException(); } - public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => throw new NotImplementedException(); + public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => throw new NotImplementedException(); public void InvalidateCacheEntry(GrainId grainId) => throw new NotImplementedException(); public bool TryCachedLookup(GrainId grainId, out GrainAddress address) => throw new NotImplementedException(); diff --git a/test/NonSilo.Tests/Serialization/MessageSerializerTests.cs b/test/NonSilo.Tests/Serialization/MessageSerializerTests.cs index af8caea53f..23656398f1 100644 --- a/test/NonSilo.Tests/Serialization/MessageSerializerTests.cs +++ b/test/NonSilo.Tests/Serialization/MessageSerializerTests.cs @@ -1,12 +1,16 @@ using System.Buffers; using System.Buffers.Binary; using System.IO.Pipelines; +using System.Net; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.CodeGeneration; using Orleans.Configuration; using Orleans.Runtime; using Orleans.Runtime.Messaging; +using Orleans.Serialization.Buffers; +using Orleans.Serialization.Codecs; +using Orleans.Serialization.Session; using TestExtensions; using Xunit; using Xunit.Abstractions; @@ -20,6 +24,8 @@ public class MessageSerializerTests private readonly TestEnvironmentFixture fixture; private readonly MessageFactory messageFactory; private readonly MessageSerializer messageSerializer; + private readonly SerializerSessionPool _serializerSessionPool; + private readonly IFieldCodec _grainAddressCodec; public MessageSerializerTests(ITestOutputHelper output, TestEnvironmentFixture fixture) { @@ -27,6 +33,8 @@ public MessageSerializerTests(ITestOutputHelper output, TestEnvironmentFixture f this.fixture = fixture; this.messageFactory = this.fixture.Services.GetRequiredService(); this.messageSerializer = this.fixture.Services.GetRequiredService(); + _serializerSessionPool = fixture.Services.GetRequiredService(); + _grainAddressCodec = fixture.Services.GetRequiredService>(); } [Fact, TestCategory("Functional")] @@ -135,5 +143,133 @@ private Message RoundTripMessage(Message message) Assert.Equal(0, requiredBytes); return deserializedMessage; } + + [Fact, TestCategory("BVT")] + public void MessageTest_CacheInvalidationHeader_RoundTripCompatibility() + { + var newSilo = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 55555), 55555); + + var oldActivations = new List + { + GrainAddress.NewActivationAddress(SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 11111), 111111), GrainId.Create("test", "1")), + GrainAddress.NewActivationAddress(SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 22222), 222222), GrainId.Create("test", "2")), + new() { SiloAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 33333), 33333), GrainId = GrainId.Create("test", "3") }, + }; + + var newActivations = new List + { + GrainAddress.NewActivationAddress(newSilo, GrainId.Create("test", "1")), + GrainAddress.NewActivationAddress(newSilo, GrainId.Create("test", "2")), + new() { SiloAddress = newSilo, GrainId = GrainId.Create("test", "3") }, + }; + + var newUpdates = oldActivations.Zip(newActivations).Select(x => new GrainAddressCacheUpdate(x.First, x.Second)).ToList(); + + // Old to new + { + using var writer1Session = _serializerSessionPool.GetSession(); + var writer = Writer.CreatePooled(writer1Session); + var stub = new MessageSerializerBackwardsCompatibilityStub(_grainAddressCodec); + var fromOld = oldActivations.ToList(); + stub.WriteCacheInvalidationHeaders(ref writer, fromOld); + writer.Commit(); + + using var reader1Session = _serializerSessionPool.GetSession(); + var reader = Reader.Create(writer.Output.AsReadOnlySequence(), reader1Session); + var toNew = messageSerializer.ReadCacheInvalidationHeaders(ref reader); + Assert.NotNull(toNew); + Assert.Equal(fromOld.Count, toNew.Count); + for (var i = 0; i < fromOld.Count; i++) + { + // Only the invalid grain address can be represented. + Assert.Equal(fromOld[i], toNew[i].InvalidGrainAddress); + Assert.Null(toNew[i].ValidGrainAddress); + } + + writer.Dispose(); + } + + // New to new + { + using var writer1Session = _serializerSessionPool.GetSession(); + var writer = Writer.CreatePooled(writer1Session); + var fromNew = newUpdates.ToList(); + messageSerializer.WriteCacheInvalidationHeaders(ref writer, fromNew); + writer.Commit(); + + using var reader1Session = _serializerSessionPool.GetSession(); + var reader = Reader.Create(writer.Output.AsReadOnlySequence(), reader1Session); + var toNew = messageSerializer.ReadCacheInvalidationHeaders(ref reader); + Assert.NotNull(toNew); + Assert.Equal(fromNew.Count, toNew.Count); + for (var i = 0; i < fromNew.Count; i++) + { + // Full fidelity is expected + Assert.Equal(fromNew[i].InvalidGrainAddress, toNew[i].InvalidGrainAddress); + Assert.Equal(fromNew[i].ValidGrainAddress, toNew[i].ValidGrainAddress); + } + + writer.Dispose(); + } + + // New to old + { + using var writer1Session = _serializerSessionPool.GetSession(); + var writer = Writer.CreatePooled(writer1Session); + var fromNew = newUpdates.ToList(); + messageSerializer.WriteCacheInvalidationHeaders(ref writer, fromNew); + writer.Commit(); + + using var reader1Session = _serializerSessionPool.GetSession(); + var reader = Reader.Create(writer.Output.AsReadOnlySequence(), reader1Session); + var stub = new MessageSerializerBackwardsCompatibilityStub(_grainAddressCodec); + var toOld = stub.ReadCacheInvalidationHeaders(ref reader); + Assert.NotNull(toOld); + Assert.Equal(fromNew.Count, toOld.Count); + for (var i = 0; i < fromNew.Count; i++) + { + // Only the invalid grain address can be represented. + Assert.Equal(fromNew[i].InvalidGrainAddress, toOld[i]); + } + + writer.Dispose(); + } + } + + private class MessageSerializerBackwardsCompatibilityStub + { + private readonly IFieldCodec _grainAddressCodec; + + public MessageSerializerBackwardsCompatibilityStub(IFieldCodec grainAddressCodec) + { + _grainAddressCodec = grainAddressCodec; + } + + internal List ReadCacheInvalidationHeaders(ref Reader reader) + { + var n = (int)reader.ReadVarUInt32(); + if (n > 0) + { + var list = new List(n); + for (int i = 0; i < n; i++) + { + list.Add(_grainAddressCodec.ReadValue(ref reader, reader.ReadFieldHeader())); + } + + return list; + } + + return new List(); + } + + internal void WriteCacheInvalidationHeaders(ref Writer writer, List value) where TBufferWriter : IBufferWriter + { + writer.WriteVarUInt32((uint)value.Count); + foreach (var entry in value) + { + _grainAddressCodec.WriteField(ref writer, 0, typeof(GrainAddress), entry); + } + } + } } }