Skip to content

Commit

Permalink
summary: SocketsHttpHandler plus Deadlock Refactor
Browse files Browse the repository at this point in the history
feat: Use `SocketsHttpHandler` to configure `HttpClient` in .NET 6+. (#2931)
fix: Refactoring to reduce the likelihood of a deadlock in `HttpClient.SendAsync()`. (#2931)
chore: Minor refactoring and logging updates. (#2931)
  • Loading branch information
tippmar-nr authored Jan 7, 2025
1 parent a69ba47 commit eb3afda
Show file tree
Hide file tree
Showing 32 changed files with 499 additions and 178 deletions.
11 changes: 8 additions & 3 deletions src/Agent/NewRelic/Agent/Core/AgentShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ static void Initialize()
}

private static bool _initialized = false;
private static object _initLock = new object();
private static SemaphoreSlim _lockSemaphore = new SemaphoreSlim(1, 1);

static bool TryInitialize(string method)
{
if (Monitor.IsEntered(_initLock)) return false;
if (_lockSemaphore.CurrentCount == 0) return false;
if (DeferInitialization(method)) return false;

lock (_initLock)
_lockSemaphore.Wait();
try
{
if (!_initialized)
{
Expand All @@ -38,6 +39,10 @@ static bool TryInitialize(string method)

return true;
}
finally
{
_lockSemaphore.Release();
}
}

private static HashSet<string> _deferInitializationOnTheseMethods = new HashSet<string>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ protected void InternalHarvest(string transactionId = null)
var customEvents = originalCustomEvents.Where(node => node != null).Select(node => node.Data).ToList();

// if we don't have any events to publish then don't
if (customEvents.Count <= 0)
return;
var eventCount = customEvents.Count;
if (eventCount > 0)
{
var responseStatus = DataTransportService.Send(customEvents, transactionId);

var responseStatus = DataTransportService.Send(customEvents, transactionId);
HandleResponse(responseStatus, customEvents);
}

HandleResponse(responseStatus, customEvents);
Log.Finest($"Custom Event harvest finished. {eventCount} event(s) sent.");

Log.Finest("Custom Event harvest finished.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ protected void InternalHarvest(string transactionId = null)
var eventHarvestData = new EventHarvestData(originalErrorEvents.Size, originalErrorEvents.GetAddAttemptsCount());

// if we don't have any events to publish then don't
if (aggregatedEvents.Count <= 0)
return;

var responseStatus = DataTransportService.Send(eventHarvestData, aggregatedEvents, transactionId);
var eventCount = aggregatedEvents.Count;
if (eventCount > 0)
{
var responseStatus = DataTransportService.Send(eventHarvestData, aggregatedEvents, transactionId);

HandleResponse(responseStatus, aggregatedEvents);
HandleResponse(responseStatus, aggregatedEvents);
}

Log.Finest("Error Event harvest finished.");
Log.Finest($"Error Event harvest finished. {eventCount} event(s) sent.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ protected void InternalHarvest(string transactionId = null)
_readerWriterLock.ExitWriteLock();
}

if (errorTraceWireModels.Count <= 0)
return;

var responseStatus = DataTransportService.Send(errorTraceWireModels, transactionId);
// if we don't have any events to publish then don't
var traceCount = errorTraceWireModels.Count;
if (traceCount > 0)
{
var responseStatus = DataTransportService.Send(errorTraceWireModels, transactionId);

HandleResponse(responseStatus, errorTraceWireModels);
HandleResponse(responseStatus, errorTraceWireModels);
}

Log.Finest("Error Trace harvest finished.");
Log.Finest($"Error Trace harvest finished. {traceCount} trace(s) sent.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
33 changes: 16 additions & 17 deletions src/Agent/NewRelic/Agent/Core/Aggregators/LogEventAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,27 @@ protected void InternalHarvest(string transactionId = null)
Interlocked.Add(ref _logsDroppedCount, originalLogEvents.GetAndResetDroppedItemCount());

// if we don't have any events to publish then don't
if (aggregatedEvents.Count <= 0)
var eventCount = aggregatedEvents.Count;
if (eventCount > 0)
{
return;
}
// matches metadata so that utilization and this match
var hostname = !string.IsNullOrEmpty(_configuration.UtilizationFullHostName)
? _configuration.UtilizationFullHostName
: _configuration.UtilizationHostName;

// matches metadata so that utilization and this match
var hostname = !string.IsNullOrEmpty(_configuration.UtilizationFullHostName)
? _configuration.UtilizationFullHostName
: _configuration.UtilizationHostName;
var modelsCollection = new LogEventWireModelCollection(
_configuration.ApplicationNames.ElementAt(0),
_configuration.EntityGuid,
hostname,
_configuration.LabelsEnabled ? _labelsService.GetFilteredLabels(_configuration.LabelsExclude) : [],
aggregatedEvents);

var modelsCollection = new LogEventWireModelCollection(
_configuration.ApplicationNames.ElementAt(0),
_configuration.EntityGuid,
hostname,
_configuration.LabelsEnabled ? _labelsService.GetFilteredLabels(_configuration.LabelsExclude) : [],
aggregatedEvents);
var responseStatus = DataTransportService.Send(modelsCollection, transactionId);

var responseStatus = DataTransportService.Send(modelsCollection, transactionId);

HandleResponse(responseStatus, aggregatedEvents);
HandleResponse(responseStatus, aggregatedEvents);
}

Log.Finest("Log Event harvest finished.");
Log.Finest($"Log Event harvest finished. {eventCount} event(s) sent.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
13 changes: 9 additions & 4 deletions src/Agent/NewRelic/Agent/Core/Aggregators/MetricAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using NewRelic.Agent.Core.DataTransport;
using NewRelic.Agent.Core.Events;
using NewRelic.Agent.Core.Metrics;
Expand Down Expand Up @@ -75,12 +76,16 @@ protected void InternalHarvest(string transactionId = null)
var oldMetrics = GetStatsCollectionForHarvest();

oldMetrics.MergeUnscopedStats(MetricNames.SupportabilityMetricHarvestTransmit, MetricDataWireModel.BuildCountData());
var metricsToSend = oldMetrics.ConvertToJsonForSending(_metricNameService);
var metricsToSend = oldMetrics.ConvertToJsonForSending(_metricNameService).ToList();

var responseStatus = DataTransportService.Send(metricsToSend, transactionId);
HandleResponse(responseStatus, metricsToSend);
var metricCount = metricsToSend.Count;
if (metricCount > 0)
{
var responseStatus = DataTransportService.Send(metricsToSend, transactionId);
HandleResponse(responseStatus, metricsToSend);
}

Log.Debug("Metric harvest finished.");
Log.Finest($"Metric harvest finished. {metricCount} metric(s) sent.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
16 changes: 8 additions & 8 deletions src/Agent/NewRelic/Agent/Core/Aggregators/SpanEventAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ protected void InternalHarvest(string transactionId = null)

var eventHarvestData = new EventHarvestData(spanEventsPriorityQueue.Size, spanEventsPriorityQueue.GetAddAttemptsCount());
var wireModels = spanEventsPriorityQueue.Where(node => null != node).Select(node => node.Data).ToList();

// if we don't have any events to publish then don't
if (wireModels.Count <= 0)
return;

var responseStatus = DataTransportService.Send(eventHarvestData, wireModels, transactionId);

HandleResponse(responseStatus, wireModels);
// if we don't have any events to publish then don't
var eventCount = wireModels.Count;
if (eventCount > 0)
{
var responseStatus = DataTransportService.Send(eventHarvestData, wireModels, transactionId);
HandleResponse(responseStatus, wireModels);
}

Log.Finest("Span Event harvest finished.");
Log.Finest($"Span Event harvest finished. {eventCount} event(s) sent.");
}

private void ReduceReservoirSize(int newSize)
Expand Down
15 changes: 8 additions & 7 deletions src/Agent/NewRelic/Agent/Core/Aggregators/SqlTraceAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ protected void InternalHarvest(string transactionId = null)
.Take(_configuration.SqlTracesPerPeriod)
.ToList();

if (!slowestTraces.Any())
return;

var responseStatus = DataTransportService.Send(slowestTraces, transactionId);

HandleResponse(responseStatus, slowestTraces);
// if we don't have any traces to publish then don't
var traceCount = slowestTraces.Count;
if (traceCount > 0)
{
var responseStatus = DataTransportService.Send(slowestTraces, transactionId);
HandleResponse(responseStatus, slowestTraces);
}

Log.Finest("SQL Trace harvest finished.");
Log.Finest($"SQL Trace harvest finished. {traceCount} trace(s) sent.");
}

private void HandleResponse(DataTransportResponseStatus responseStatus, ICollection<SqlTraceWireModel> traces)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ protected void InternalHarvest(string transactionId = null)
var eventHarvestData = new EventHarvestData(originalTransactionEvents.Size, originalTransactionEvents.GetAddAttemptsCount());

// if we don't have any events to publish then don't
if (aggregatedEvents.Count <= 0)
return;
var eventCount = aggregatedEvents.Count;
if (eventCount > 0)
{
var responseStatus = DataTransportService.Send(eventHarvestData, aggregatedEvents, transactionId);

var responseStatus = DataTransportService.Send(eventHarvestData, aggregatedEvents, transactionId);
HandleResponse(responseStatus, aggregatedEvents);
}

HandleResponse(responseStatus, aggregatedEvents);
Log.Finest($"Transaction Event harvest finished. {eventCount} event(s) sent.");

Log.Finest("Transaction Event harvest finished.");
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public override void Collect(TransactionTraceWireModelComponents transactionTrac

protected void InternalHarvest(string transactionId = null)
{
Log.Finest("Transaction Trace harvest starting.");

var traceSamples = _transactionCollectors
.Where(t => t != null)
.SelectMany(t => t.GetCollectedSamples())
Expand All @@ -59,13 +61,17 @@ protected void InternalHarvest(string transactionId = null)
.Select(t => t.CreateWireModel())
.ToList();

if (!traceWireModels.Any())
return;
// if we don't have any traces to publish then don't
var traceCount = traceWireModels.Count;
if (traceCount > 0)
{
LogUnencodedTraceData(traceWireModels);

LogUnencodedTraceData(traceWireModels);
var responseStatus = DataTransportService.Send(traceWireModels, transactionId);
HandleResponse(responseStatus, traceSamples);
}

var responseStatus = DataTransportService.Send(traceWireModels, transactionId);
HandleResponse(responseStatus, traceSamples);
Log.Finest($"Transaction Trace harvest finished. {traceCount} trace(s) sent.");
}

private void HandleResponse(DataTransportResponseStatus responseStatus, ICollection<TransactionTraceWireModelComponents> traceSamples)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void TestConnection()
wc.DownloadString(testAddress);
}
#else
_lazyHttpClient.Value.GetAsync(testAddress).GetAwaiter().GetResult();
_lazyHttpClient.Value.GetAsync(testAddress).ConfigureAwait(false).GetAwaiter().GetResult();
#endif
Log.Info("Connection test to \"{0}\" succeeded", testAddress);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

#if !NETFRAMEWORK
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using NewRelic.Agent.Configuration;
using NewRelic.Agent.Core.Config;
using NewRelic.Agent.Core.DataTransport.Client.Interfaces;
using NewRelic.Agent.Extensions.Logging;

namespace NewRelic.Agent.Core.DataTransport.Client
{
Expand All @@ -31,19 +30,20 @@ public void Dispose()

public async Task<IHttpResponseMessageWrapper> SendAsync(HttpRequestMessage message)
{
var cts = new CancellationTokenSource(_timeoutMilliseconds);
return new HttpResponseMessageWrapper(await _httpClient.SendAsync(message, cts.Token));
}

public TimeSpan Timeout
{
get
using var cts = new CancellationTokenSource(_timeoutMilliseconds);
try
{
return _httpClient.Timeout;
// .ConfigureAwait(false) is used to avoid deadlocks.
var httpResponseMessage = await _httpClient.SendAsync(message, cts.Token).ConfigureAwait(false);
return new HttpResponseMessageWrapper(httpResponseMessage);
}
set
catch (Exception e)
{
_httpClient.Timeout = value;
Log.Debug(cts.IsCancellationRequested
? $"HttpClient.SendAsync() timed out after {_timeoutMilliseconds}ms."
: $"HttpClient.SendAsync() threw an unexpected exception: {e}");

throw;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public HttpContentWrapper(HttpContent httpContent)

public Stream ReadAsStream()
{
return _httpContent.ReadAsStreamAsync().GetAwaiter().GetResult();
return _httpContent.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

public IHttpContentHeadersWrapper Headers => new HttpContentHeadersWrapper(_httpContent.Headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public string GetContent()
using (responseStream)
using (var reader = new StreamReader(responseStream, Encoding.UTF8))
{
var responseBody = reader.ReadLineAsync().GetAwaiter().GetResult();
var responseBody = reader.ReadLineAsync().ConfigureAwait(false).GetAwaiter().GetResult();

if (responseBody != null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Net;
Expand All @@ -8,6 +8,6 @@ namespace NewRelic.Agent.Core.DataTransport.Client.Interfaces
{
public interface IHttpClientFactory
{
public IHttpClient CreateClient(IWebProxy proxy, IConfiguration configuration);
public IHttpClient GetOrCreateClient(IWebProxy proxy, IConfiguration configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ namespace NewRelic.Agent.Core.DataTransport.Client.Interfaces
public interface IHttpClientWrapper : IDisposable
{
Task<IHttpResponseMessageWrapper> SendAsync(HttpRequestMessage message);

TimeSpan Timeout { get; set; }
}
}
#endif
Loading

0 comments on commit eb3afda

Please sign in to comment.