Skip to content

Commit

Permalink
Merge pull request #162 from gigya/develop
Browse files Browse the repository at this point in the history
* fix service dispose
* Status endpoint during drain will return status code 521 (Web server is down)
* Additional logs for AsyncCache (configurable)
  • Loading branch information
bronsh authored Apr 17, 2018
2 parents 5c3d1f9 + 3488398 commit 529c00e
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class HealthEndpoint : ICustomEndpoint
private IServiceEndPointDefinition ServiceEndPointDefinition { get; }
private IServiceInterfaceMapper ServiceInterfaceMapper { get; }
private IActivator Activator { get; }
private const int WebServerIsDown = 521;

public HealthEndpoint(IServiceEndPointDefinition serviceEndPointDefinition, IServiceInterfaceMapper serviceInterfaceMapper, IActivator activator,IServiceDrainListener drainListener)
{
Expand All @@ -54,7 +55,7 @@ public async Task<bool> TryHandle(HttpListenerContext context, WriteResponseDele

if (_drainListener.Token.IsCancellationRequested)
{
await writeResponse($"Begin service drain before shutdown.", HttpStatusCode.ServiceUnavailable).ConfigureAwait(false);
await writeResponse($"Begin service drain before shutdown.",(HttpStatusCode)WebServerIsDown ).ConfigureAwait(false);
}

if (serviceType == null)
Expand Down
20 changes: 15 additions & 5 deletions Gigya.Microdot.Orleans.Hosting/GigyaSiloHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,26 @@ public void Stop()
{
HttpServiceListener.Dispose();

if (Silo != null && Silo.IsStarted)
Silo.StopOrleansSilo();

try
{
GrainClient.Uninitialize();
if (Silo != null && Silo.IsStarted)
Silo.StopOrleansSilo();
}
catch (Exception exc)
catch (System.Net.Sockets.SocketException)
{
Log.Warn("Exception Uninitializing grain client", exception: exc);
//Orleans 1.3.1 thorws this exception most of the time
}
finally
{
try
{
GrainClient.Uninitialize();
}
catch (Exception exc)
{
Log.Warn("Exception Uninitializing grain client", exception: exc);
}
}

}
Expand Down
135 changes: 120 additions & 15 deletions Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using System.Reflection;
using System.Runtime.Caching;
using System.Runtime.Caching.Hosting;
using System.Threading;
Expand All @@ -41,7 +42,7 @@ namespace Gigya.Microdot.ServiceProxy.Caching
public sealed class AsyncCache : IMemoryCacheManager, IServiceProvider, IDisposable
{
private IDateTime DateTime { get; }
private Func<RevokeConfig> GetRevokeConfig { get; }
private Func<CacheConfig> GetRevokeConfig { get; }
private ILog Log { get; }
private MemoryCache MemoryCache { get; set; }
private long LastCacheSizeBytes { get; set; }
Expand Down Expand Up @@ -72,7 +73,7 @@ public sealed class AsyncCache : IMemoryCacheManager, IServiceProvider, IDisposa
internal int CacheKeyCount => RevokeKeyToCacheKeysIndex.Sum(item => item.Value.Count);


public AsyncCache(ILog log, MetricsContext metrics, IDateTime dateTime, IRevokeListener revokeListener, Func<RevokeConfig> getRevokeConfig)
public AsyncCache(ILog log, MetricsContext metrics, IDateTime dateTime, IRevokeListener revokeListener, Func<CacheConfig> getRevokeConfig)
{
DateTime = dateTime;
GetRevokeConfig = getRevokeConfig;
Expand All @@ -94,7 +95,7 @@ public AsyncCache(ILog log, MetricsContext metrics, IDateTime dateTime, IRevokeL

private Task OnRevoke(string revokeKey)
{
var shouldLog = GetRevokeConfig().LogRequests;
var shouldLog = GetRevokeConfig().LogRevokes;

if (string.IsNullOrEmpty(revokeKey))
{
Expand Down Expand Up @@ -136,7 +137,7 @@ private Task OnRevoke(string revokeKey)
catch (Exception ex)
{
Revokes.Meter("Failed", Unit.Events).Mark();
Log.Warn("error while revoking cache", exception: ex, unencryptedTags: new {revokeKey});
Log.Warn("Error while revoking cache", exception: ex, unencryptedTags: new {revokeKey});
}
return Task.FromResult(true);
}
Expand Down Expand Up @@ -165,20 +166,40 @@ private void InitMetrics()
}


public Task GetOrAdd(string key, Func<Task> factory, Type taskResultType, CacheItemPolicyEx policy, params string[] metricsKeys)
public Task GetOrAdd(string key, Func<Task> factory, Type taskResultType, CacheItemPolicyEx policy, string groupName, string logData, string[] metricsKeys)
{
var getValueTask = GetOrAdd(key, () => TaskConverter.ToWeaklyTypedTask(factory(), taskResultType), policy, metricsKeys, taskResultType);
var getValueTask = GetOrAdd(key, () => TaskConverter.ToWeaklyTypedTask(factory(), taskResultType), policy, groupName, logData, metricsKeys, taskResultType);
return TaskConverter.ToStronglyTypedTask(getValueTask, taskResultType);
}


private Task<object> GetOrAdd(string key, Func<Task<object>> factory, CacheItemPolicyEx policy, string[] metricsKeys, Type taskResultType)
private Task<object> GetOrAdd(string key, Func<Task<object>> factory, CacheItemPolicyEx policy, string groupName, string logData, string[] metricsKeys, Type taskResultType)
{
var shouldLog = ShouldLog(groupName);

async Task<object> WrappedFactory(bool removeOnException)
{
try
{
if (shouldLog)
Log.Info(x => x("Cache item is waiting for value to be resolved", unencryptedTags: new
{
cacheKey = key,
cacheGroup = groupName,
cacheData = logData
}));

var result = await factory().ConfigureAwait(false);
if (shouldLog)
{
Log.Info(x => x("Cache item value is resolved", unencryptedTags: new
{
cacheKey = key,
cacheGroup = groupName,
cacheData = logData,
value = GetValueForLogging(result)
}));
}
//Can happen if item removed before task is completed
if(MemoryCache.Contains(key))
{
Expand All @@ -193,14 +214,30 @@ async Task<object> WrappedFactory(bool removeOnException)
{
cacheKeys.Add(key);
}
Log.Info(x=>x("RevokeKey added to reverse index", unencryptedTags: new
{
revokeKey = revokeKey,
cacheKey = key,
cacheGroup = groupName,
cacheData = logData
}));
}
}
}
AwaitingResult.Decrement(metricsKeys);
return result;
}
catch
catch(Exception exception)
{
Log.Info(x=>x("Error resolving value for cache item", unencryptedTags: new
{
cacheKey = key,
cacheGroup = groupName,
cacheData = logData,
removeOnException,
errorMessage = exception.Message
}));

if(removeOnException)
MemoryCache.Remove(key); // Do not cache exceptions.

Expand All @@ -210,7 +247,10 @@ async Task<object> WrappedFactory(bool removeOnException)
}
}

var newItem = new AsyncCacheItem();

var newItem = shouldLog ?
new AsyncCacheItem {GroupName = string.Intern(groupName), LogData = logData} :
new AsyncCacheItem (); // if log is not needed, then do not cache unnecessary details which will blow up the memory

Task<object> resultTask;

Expand All @@ -233,6 +273,13 @@ async Task<object> WrappedFactory(bool removeOnException)
newItem.CurrentValueTask = WrappedFactory(true);
newItem.NextRefreshTime = DateTime.UtcNow + policy.RefreshTime;
resultTask = newItem.CurrentValueTask;
if (shouldLog)
Log.Info(x => x("Item added to cache", unencryptedTags: new
{
cacheKey = key,
cacheGroup = groupName,
cacheData = logData
}));
}
else
{
Expand Down Expand Up @@ -272,25 +319,70 @@ async Task<object> WrappedFactory(bool removeOnException)

return resultTask;
}


private ConcurrentDictionary<Type, FieldInfo> _revocableValueFieldPerType = new ConcurrentDictionary<Type, FieldInfo>();
private string GetValueForLogging(object value)
{
if (value is IRevocable)
{
var revocableValueField = _revocableValueFieldPerType.GetOrAdd(value.GetType(), t=>t.GetField(nameof(Revocable<int>.Value)));
if (revocableValueField != null)
value = revocableValueField.GetValue(value);
}

if (value is ValueType || value is string)
return value.ToString();
else
return null;
}

/// <summary>
/// For revocable items , move over all revoke ids in cache index and remove them.
/// </summary>
private void ItemRemovedCallback(CacheEntryRemovedArguments arguments)
{
{
var cacheItem = arguments.CacheItem.Value as AsyncCacheItem;
var shouldLog = ShouldLog(cacheItem?.GroupName);

if (shouldLog)
Log.Info(x=>x("Item removed from cache", unencryptedTags: new
{
cacheKey = arguments.CacheItem.Key,
removeReason = arguments.RemovedReason.ToString(),
cacheGroup = cacheItem?.GroupName,
cacheData = cacheItem?.LogData
}));

var cachedItem = ((AsyncCacheItem)arguments.CacheItem.Value).CurrentValueTask;
if(cachedItem.Status==TaskStatus.RanToCompletion && (cachedItem.Result as IRevocable)?.RevokeKeys!=null)
if(cachedItem.Status == TaskStatus.RanToCompletion && (cachedItem.Result as IRevocable)?.RevokeKeys!=null)
{
foreach(var revocationKey in ((IRevocable)cachedItem.Result).RevokeKeys)
foreach(var revokeKey in ((IRevocable)cachedItem.Result).RevokeKeys)
{
if (RevokeKeyToCacheKeysIndex.TryGetValue(revocationKey, out HashSet<string> cacheKeys))
if (RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey, out HashSet<string> cacheKeys))
{
lock (cacheKeys)
{
cacheKeys.Remove(arguments.CacheItem.Key);
Log.Info(x => x("RevokeKey removed from reverse index", unencryptedTags: new
{
cacheKey = arguments.CacheItem.Key,
revokeKey = revokeKey,
removeReason = arguments.RemovedReason.ToString(),
cacheGroup = cacheItem?.GroupName,
cacheData = cacheItem?.LogData
}));

if (!cacheKeys.Any())
{
RevokeKeyToCacheKeysIndex.TryRemove(revocationKey, out _);
if (RevokeKeyToCacheKeysIndex.TryRemove(revokeKey, out _) && shouldLog)
Log.Info(x => x("Reverse index for cache item was removed", unencryptedTags: new
{
cacheKey = arguments.CacheItem.Key,
removeReason = arguments.RemovedReason.ToString(),
cacheGroup = cacheItem?.GroupName,
cacheData = cacheItem?.LogData
}));

}
}
}
Expand All @@ -300,6 +392,18 @@ private void ItemRemovedCallback(CacheEntryRemovedArguments arguments)
Items.Meter(arguments.RemovedReason.ToString(), Unit.Items).Mark();
}

private bool ShouldLog(string groupName)
{
if (groupName == null)
return false;

var config = GetRevokeConfig();
if (config.Groups.TryGetValue(groupName, out var groupConfig))
return groupConfig.WriteExtraLogs;

return false;
}


public void Clear()
{
Expand Down Expand Up @@ -339,4 +443,5 @@ public void Dispose()
RevokeDisposable?.Dispose();
}
}

}
11 changes: 11 additions & 0 deletions Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,16 @@ public class AsyncCacheItem
public DateTime NextRefreshTime { get; set; }
public Task<object> CurrentValueTask { get; set; }
public Task RefreshTask { get; set; }

/// <summary>
/// Group name of this cache item (e.g. method name).
/// The group name is used to configure whether extra logData should be written for items of this group.
/// </summary>
public string GroupName { get; set; }

/// <summary>
/// Extra data for log purposes (e.g. arguments list)
/// </summary>
public string LogData { get; set; }
}
}
4 changes: 2 additions & 2 deletions Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public object Memoize(object dataSource, MethodInfo method, object[] args, Cache
var target = new InvocationTarget(method);
string cacheKey = $"{target}#{GetArgumentHash(args)}";

return Cache.GetOrAdd(cacheKey, () => (Task)method.Invoke(dataSource, args), taskResultType, policy, target.TypeName, target.MethodName);
return Cache.GetOrAdd(cacheKey, () => (Task)method.Invoke(dataSource, args), taskResultType, policy, target.MethodName, string.Join(",", args), new []{target.TypeName, target.MethodName});
}


Expand All @@ -76,7 +76,7 @@ private string GetArgumentHash(object[] args)
using (var writer = new StreamWriter(stream) { AutoFlush = true })
using (SHA1 sha = new SHA1CryptoServiceProvider())
{
JsonSerializer.Create().Serialize(writer, args);
JsonSerializer.Create().Serialize(writer, args);
stream.Seek(0, SeekOrigin.Begin);
return Convert.ToBase64String(sha.ComputeHash(stream));
}
Expand Down
18 changes: 18 additions & 0 deletions Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using Gigya.Microdot.Interfaces.Configuration;

namespace Gigya.Microdot.ServiceProxy.Caching
{
[ConfigurationRoot("Cache", RootStrategy.ReplaceClassNameWithPath)]
public class CacheConfig: IConfigObject
{
public bool LogRevokes { get; set; } = false;
public Dictionary<string, CacheGroupConfig> Groups { get; } = new Dictionary<string, CacheGroupConfig>(StringComparer.InvariantCultureIgnoreCase);
}

public class CacheGroupConfig
{
public bool WriteExtraLogs { get; set; } = false;
}
}
10 changes: 0 additions & 10 deletions Gigya.Microdot.ServiceProxy/Caching/RevokeConfig.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<Compile Include="Caching\MetadataProvider.cs" />
<Compile Include="Caching\AsyncCache.cs" />
<Compile Include="Caching\MetricsExtensions.cs" />
<Compile Include="Caching\RevokeConfig.cs" />
<Compile Include="Caching\CacheConfig.cs" />
<Compile Include="CoreFX\DispatchProxy\DispatchProxy.cs" />
<Compile Include="CoreFX\DispatchProxy\DispatchProxyGenerator.cs" />
<Compile Include="DelegatingDispatchProxy.cs" />
Expand Down
2 changes: 1 addition & 1 deletion Gigya.Microdot.SharedLogic/ServiceArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class ServiceArguments
public int? ShutdownWhenPidExits { get; }

/// <summary>
/// Specifies drain time in this time the servcie status will be 503.
/// Specifies drain time in this time the servcie status will be 521.
/// </summary>
public TimeSpan? ServiceDrainTime { get; }
/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions SolutionVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
[assembly: AssemblyCopyright("© 2018 Gigya Inc.")]
[assembly: AssemblyDescription("Microdot Framework")]

[assembly: AssemblyVersion("1.9.5.0")]
[assembly: AssemblyFileVersion("1.9.5.0")]
[assembly: AssemblyInformationalVersion("1.9.5.0")]
[assembly: AssemblyVersion("1.9.6.0")]
[assembly: AssemblyFileVersion("1.9.6.0")]
[assembly: AssemblyInformationalVersion("1.9.6.0")]


// Setting ComVisible to false makes the types in this assembly not visible
Expand Down
Loading

0 comments on commit 529c00e

Please sign in to comment.