Skip to content

Commit

Permalink
Enhance StreamResponse handling and update dependencies (#121)
Browse files Browse the repository at this point in the history
Updated `ResponseBuilderDefaults` to include `StreamResponse` in
`SpecialTypes`. Refactored `SetBodyCoreAsync` in
`DefaultResponseBuilder.cs` for readability and removed unnecessary
`using` statements. Modified `RequestCoreAsync` in
`HttpWebRequestInvoker.cs` and `BuildResponseAsync` in
`InMemoryRequestInvoker.cs` to handle `StreamResponse` types with proper
disposal. Updated `Elastic.Transport.csproj` to reference
`System.Text.Json` version `8.0.5`.
  • Loading branch information
stevejgordon authored Oct 30, 2024
1 parent 6f6f064 commit f4c42c8
Show file tree
Hide file tree
Showing 10 changed files with 635 additions and 229 deletions.
142 changes: 79 additions & 63 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal static class ResponseBuilderDefaults

public static readonly Type[] SpecialTypes =
{
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse), typeof(StreamResponse)
};
}

Expand Down Expand Up @@ -66,11 +66,8 @@ IReadOnlyDictionary<TcpState, int> tcpStats
// Only attempt to set the body if the response may have content
if (MayHaveBody(statusCode, requestData.Method, contentLength))
response = SetBody<TResponse>(details, requestData, responseStream, mimeType);
else
responseStream.Dispose();

response ??= new TResponse();

response.ApiCallDetails = details;
return response;
}
Expand Down Expand Up @@ -101,11 +98,8 @@ public override async Task<TResponse> ToResponseAsync<TResponse>(
if (MayHaveBody(statusCode, requestData.Method, contentLength))
response = await SetBodyAsync<TResponse>(details, requestData, responseStream, mimeType,
cancellationToken).ConfigureAwait(false);
else
responseStream.Dispose();

response ??= new TResponse();

response.ApiCallDetails = details;
return response;
}
Expand Down Expand Up @@ -211,6 +205,8 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
var disableDirectStreaming = requestData.PostData?.DisableDirectStreaming ?? requestData.ConnectionSettings.DisableDirectStreaming;
var requiresErrorDeserialization = RequiresErrorDeserialization(details, requestData);

var ownsStream = false;

if (disableDirectStreaming || NeedsToEagerReadStream<TResponse>() || requiresErrorDeserialization)
{
var inMemoryStream = requestData.MemoryStreamFactory.Create();
Expand All @@ -221,90 +217,111 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
responseStream.CopyTo(inMemoryStream, BufferSize);

bytes = SwapStreams(ref responseStream, ref inMemoryStream);
ownsStream = true;
details.ResponseBodyInBytes = bytes;
}

using (responseStream)
if (TrySetSpecialType<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var response))
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;
if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
{
ConditionalDisposal(responseStream, ownsStream, response);
return null;
}

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;
if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
if (!requestData.ValidateResponseContentType(mimeType))
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}
ConditionalDisposal(responseStream, ownsStream, response);
return default;
}

if (!requestData.ValidateResponseContentType(mimeType))
return default;
var beforeTicks = Stopwatch.GetTimestamp();

var beforeTicks = Stopwatch.GetTimestamp();
if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);

if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
// Note the exception this handles is ONLY thrown after a check if the stream length is zero.
// When the length is zero, `default` is returned by Deserialize(Async) instead.
ConditionalDisposal(responseStream, ownsStream, response);
return default;
}

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
static void ConditionalDisposal(Stream responseStream, bool ownsStream, TResponse response)
{
// We only dispose of the responseStream if we created it (i.e. it is a MemoryStream) we
// created via MemoryStreamFactory.
if (ownsStream && (response is null || !response.LeaveOpen))
responseStream.Dispose();
}
}

private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse cs)
private static bool TrySetSpecialType<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse response)
where TResponse : TransportResponse, new()
{
cs = null;
response = null;
var responseType = typeof(TResponse);
if (!SpecialTypes.Contains(responseType)) return false;

if (responseType == typeof(StringResponse))
cs = new StringResponse(bytes.Utf8String()) as TResponse;
response = new StringResponse(bytes.Utf8String()) as TResponse;
else if (responseType == typeof(StreamResponse))
cs = new StreamResponse(responseStream, mimeType) as TResponse;
response = new StreamResponse(responseStream, mimeType) as TResponse;
else if (responseType == typeof(BytesResponse))
cs = new BytesResponse(bytes) as TResponse;
response = new BytesResponse(bytes) as TResponse;
else if (responseType == typeof(VoidResponse))
cs = VoidResponse.Default as TResponse;
response = VoidResponse.Default as TResponse;
else if (responseType == typeof(DynamicResponse))
{
//if not json store the result under "body"
Expand All @@ -314,17 +331,17 @@ private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, St
{
["body"] = new DynamicValue(bytes.Utf8String())
};
cs = new DynamicResponse(dictionary) as TResponse;
response = new DynamicResponse(dictionary) as TResponse;
}
else
{
using var ms = memoryStreamFactory.Create(bytes);
var body = LowLevelRequestResponseSerializer.Instance.Deserialize<DynamicDictionary>(ms);
cs = new DynamicResponse(body) as TResponse;
response = new DynamicResponse(body) as TResponse;
}
}

return cs != null;
return response != null;
}

private static bool NeedsToEagerReadStream<TResponse>()
Expand All @@ -336,7 +353,6 @@ private static bool NeedsToEagerReadStream<TResponse>()
private static byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
{
var bytes = ms.ToArray();
responseStream.Dispose();
responseStream = ms;
responseStream.Position = 0;
return bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
Exception ex = null;
string mimeType = null;
long contentLength = -1;
IDisposable receive = DiagnosticSources.SingletonDisposable;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
Expand Down Expand Up @@ -118,7 +118,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult();
#endif

receive = responseMessage;
receivedResponse = responseMessage;
statusCode = (int)responseMessage.StatusCode;
}

Expand Down Expand Up @@ -154,13 +154,10 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
ex = e;
}

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
TResponse response;

using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receive)
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
try
{
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
Expand All @@ -169,9 +166,18 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receive.Dispose();
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;
Expand All @@ -185,6 +191,13 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req

return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
throw;
}
}

private static Dictionary<string, IEnumerable<string>>? ParseHeaders(RequestData requestData, HttpResponseMessage responseMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
Exception ex = null;
string mimeType = null;
long contentLength = -1;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
Expand Down Expand Up @@ -146,6 +147,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
httpWebResponse = (HttpWebResponse)request.GetResponse();
}

receivedResponse = httpWebResponse;

HandleResponse(httpWebResponse, out statusCode, out responseStream, out mimeType);
responseHeaders = ParseHeaders(requestData, httpWebResponse, responseHeaders);
contentLength = httpWebResponse.ContentLength;
Expand All @@ -161,28 +164,50 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
{
unregisterWaitHandle?.Invoke();
}
responseStream ??= Stream.Null;

TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
try
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
}
}

return response;
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}

return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
throw;
}
}

private static Dictionary<string, IEnumerable<string>> ParseHeaders(RequestData requestData, HttpWebResponse responseMessage, Dictionary<string, IEnumerable<string>> responseHeaders)
Expand Down
Loading

0 comments on commit f4c42c8

Please sign in to comment.