From f88defa62b5f91136c477f80bee3e9dfada9732b Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 27 Sep 2024 18:45:03 -0500 Subject: [PATCH] Updated Custom Field Definition repo to throw if it cannot acquire lock --- .../CustomFieldDefinitionRepository.cs | 60 ++++++++++--------- .../CustomFieldTests.cs | 10 ++-- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/Foundatio.Repositories.Elasticsearch/CustomFields/CustomFieldDefinitionRepository.cs b/src/Foundatio.Repositories.Elasticsearch/CustomFields/CustomFieldDefinitionRepository.cs index 90055455..7c0dc502 100644 --- a/src/Foundatio.Repositories.Elasticsearch/CustomFields/CustomFieldDefinitionRepository.cs +++ b/src/Foundatio.Repositories.Elasticsearch/CustomFields/CustomFieldDefinitionRepository.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -46,7 +46,7 @@ public CustomFieldDefinitionRepository(CustomFieldDefinitionIndex index, ILockPr public async Task> GetFieldMappingAsync(string entityType, string tenantKey) { string cacheKey = GetMappingCacheKey(entityType, tenantKey); - var cachedMapping = await _cache.GetAsync>(cacheKey); + var cachedMapping = await _cache.GetAsync>(cacheKey).AnyContext(); if (cachedMapping.HasValue) return cachedMapping.Value; @@ -55,16 +55,16 @@ public async Task> GetFieldMappingAsy var fields = await FindAsync(q => q .FieldEquals(cf => cf.EntityType, entityType) .FieldEquals(cf => cf.TenantKey, tenantKey), - o => o.PageLimit(1000)); + o => o.PageLimit(1000)).AnyContext(); do { foreach (var customField in fields.Documents) fieldMapping[customField.Name] = customField; - } while (await fields.NextPageAsync()); + } while (await fields.NextPageAsync().AnyContext()); if (fieldMapping.Count > 0) - await _cache.AddAsync(cacheKey, fieldMapping, TimeSpan.FromMinutes(15)); + await _cache.AddAsync(cacheKey, fieldMapping, TimeSpan.FromMinutes(15)).AnyContext(); return fieldMapping; } @@ -99,9 +99,11 @@ public Task AddFieldAsync(string entityType, string tenan public override async Task AddAsync(IEnumerable documents, ICommandOptions options = null) { - var fieldScopes = documents.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType)); - var lockKeys = fieldScopes.Select(f => GetLockName(f.Key.EntityType, f.Key.TenantKey, f.Key.IndexType)); - await using var _ = await _lockProvider.AcquireAsync(lockKeys, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + var fieldScopes = documents.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType)).ToArray(); + string[] lockKeys = fieldScopes.Select(f => GetLockName(f.Key.EntityType, f.Key.TenantKey, f.Key.IndexType)).ToArray(); + await using var lck = await _lockProvider.AcquireAsync(lockKeys, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)).AnyContext(); + if (lck is null) + throw new Exception($"Failed to acquire lock: {String.Join(", ", lockKeys)}"); foreach (var fieldScope in fieldScopes) { @@ -110,15 +112,15 @@ public override async Task AddAsync(IEnumerable documents var usedNames = new HashSet(StringComparer.OrdinalIgnoreCase); var availableSlots = new Queue(); - var availableSlotsCache = await _cache.GetListAsync(slotFieldScopeKey); - var usedNamesCache = await _cache.GetListAsync(namesFieldScopeKey); + var availableSlotsCache = await _cache.GetListAsync(slotFieldScopeKey).AnyContext(); + var usedNamesCache = await _cache.GetListAsync(namesFieldScopeKey).AnyContext(); if (availableSlotsCache.HasValue && usedNamesCache.HasValue && availableSlotsCache.Value.Count > 0) { - foreach (var availableSlot in availableSlotsCache.Value.OrderBy(s => s)) + foreach (int availableSlot in availableSlotsCache.Value.OrderBy(s => s)) availableSlots.Enqueue(availableSlot); - foreach (var usedName in usedNamesCache.Value.ToArray()) + foreach (string usedName in usedNamesCache.Value.ToArray()) usedNames.Add(usedName); _logger.LogTrace("Got cached list of {SlotCount} available slots for {FieldScope}", availableSlots.Count, slotFieldScopeKey); @@ -133,13 +135,13 @@ public override async Task AddAsync(IEnumerable documents .Include(cf => cf.IndexSlot) .Include(cf => cf.Name) .Include(cf => cf.IsDeleted), - o => o.IncludeSoftDeletes().PageLimit(1000).QueryLogLevel(Microsoft.Extensions.Logging.LogLevel.Information)); + o => o.IncludeSoftDeletes().PageLimit(1000).QueryLogLevel(Microsoft.Extensions.Logging.LogLevel.Information)).AnyContext(); do { usedSlots.AddRange(existingFields.Documents.Select(d => d.IndexSlot)); usedNames.AddRange(existingFields.Documents.Where(d => !d.IsDeleted).Select(d => d.Name)); - } while (await existingFields.NextPageAsync()); + } while (await existingFields.NextPageAsync().AnyContext()); int slotBatchSize = fieldScope.Count() + 25; int slot = 1; @@ -152,8 +154,8 @@ public override async Task AddAsync(IEnumerable documents } _logger.LogTrace("Found {FieldCount} fields with {SlotCount} used slots for {FieldScope}", existingFields.Total, usedSlots.Count, slotFieldScopeKey); - await _cache.ListAddAsync(slotFieldScopeKey, availableSlots.ToArray(), TimeSpan.FromMinutes(5)); - await _cache.ListAddAsync(namesFieldScopeKey, usedNames.ToArray(), TimeSpan.FromMinutes(5)); + await _cache.ListAddAsync(slotFieldScopeKey, availableSlots.ToArray(), TimeSpan.FromMinutes(5)).AnyContext(); + await _cache.ListAddAsync(namesFieldScopeKey, usedNames.ToArray(), TimeSpan.FromMinutes(5)).AnyContext(); } foreach (var doc in fieldScope) @@ -167,13 +169,13 @@ public override async Task AddAsync(IEnumerable documents int availableSlot = availableSlots.Dequeue(); doc.IndexSlot = availableSlot; - await _cache.ListRemoveAsync(slotFieldScopeKey, new[] { availableSlot }); - await _cache.ListAddAsync(namesFieldScopeKey, new[] { doc.Name }); + await _cache.ListRemoveAsync(slotFieldScopeKey, [availableSlot]).AnyContext(); + await _cache.ListAddAsync(namesFieldScopeKey, [doc.Name]).AnyContext(); _logger.LogTrace("New field {FieldName} using slot {IndexSlot} for {FieldScope}", doc.Name, doc.IndexSlot, slotFieldScopeKey); } } - await base.AddAsync(documents, options); + await base.AddAsync(documents, options).AnyContext(); } protected override Task ValidateAndThrowAsync(CustomFieldDefinition document) @@ -209,7 +211,7 @@ private async Task OnDocumentsChanged(object source, DocumentsChangeEventArgs query) { - await base.InvalidateCacheByQueryAsync(query); + await base.InvalidateCacheByQueryAsync(query).AnyContext(); var conditions = query.GetFieldConditions(); var entityTypeCondition = conditions.FirstOrDefault(c => c.Field == InferField(d => d.EntityType) && c.Operator == ComparisonOperator.Equals); if (entityTypeCondition == null || String.IsNullOrEmpty(entityTypeCondition.Value?.ToString())) return; - await _cache.RemoveAsync(GetMappingCacheKey(entityTypeCondition.Value.ToString(), GetTenantKey(query))); + await _cache.RemoveAsync(GetMappingCacheKey(entityTypeCondition.Value.ToString(), GetTenantKey(query))).AnyContext(); } protected override async Task InvalidateCacheAsync(IReadOnlyCollection> documents, ChangeType? changeType = null) { - await base.InvalidateCacheAsync(documents, changeType); + await base.InvalidateCacheAsync(documents, changeType).AnyContext(); if (documents.Count == 0) { - await _cache.RemoveByPrefixAsync("customfield"); - _logger.LogInformation("Cleared all custom field mappings from cache due to change {ChangeType}.", changeType); + await _cache.RemoveByPrefixAsync("customfield").AnyContext(); + _logger.LogInformation("Cleared all custom field mappings from cache due to change {ChangeType}", changeType); } var cacheKeys = documents.Select(d => GetMappingCacheKey(d.Value.EntityType, d.Value.TenantKey)).Distinct().ToList(); - foreach (var cacheKey in cacheKeys) - await _cache.RemoveByPrefixAsync(cacheKey); + foreach (string cacheKey in cacheKeys) + await _cache.RemoveByPrefixAsync(cacheKey).AnyContext(); } } diff --git a/tests/Foundatio.Repositories.Elasticsearch.Tests/CustomFieldTests.cs b/tests/Foundatio.Repositories.Elasticsearch.Tests/CustomFieldTests.cs index b9b0ea3b..f52ed3d5 100644 --- a/tests/Foundatio.Repositories.Elasticsearch.Tests/CustomFieldTests.cs +++ b/tests/Foundatio.Repositories.Elasticsearch.Tests/CustomFieldTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -226,7 +226,7 @@ public async Task CanAddNewFieldsAndReserveSlotsConcurrentlyAcrossTenantsAndFiel Log.SetLogLevel(LogLevel.Information); const int COUNT = 1000; - await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (index, ct) => + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (index, _) => { var customField = await _customFieldDefinitionRepository.AddAsync(new CustomFieldDefinition { @@ -330,7 +330,7 @@ public async Task CanHandleWrongFieldValueType() [Fact] public async Task CanUseCalculatedFieldType() { - await _customFieldDefinitionRepository.AddAsync(new[] { + await _customFieldDefinitionRepository.AddAsync([ new CustomFieldDefinition { EntityType = nameof(EmployeeWithCustomFields), TenantKey = "1", @@ -347,11 +347,11 @@ await _customFieldDefinitionRepository.AddAsync(new[] { EntityType = nameof(EmployeeWithCustomFields), TenantKey = "1", Name = "Calculated", - IndexType = CalculatedIntegerFieldType.IndexType, + IndexType = IntegerFieldType.IndexType, ProcessMode = CustomFieldProcessMode.AlwaysProcess, Data = new Dictionary { { "Expression", "source.Data.Field1 + source.Data.Field2" } } } - }); + ]); var employee = EmployeeWithCustomFieldsGenerator.Generate(age: 19); employee.CompanyId = "1";