From c751f7cb714a430186389027e78ca686f91af099 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 8 Oct 2024 14:28:49 -0600 Subject: [PATCH] Remove the second wrapper --- .../netty/RetryingHttpRequesterFilter.java | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index 71b640490b..2021d0da8e 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -21,8 +21,6 @@ import io.servicetalk.client.api.LoadBalancerReadyEvent; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscoverer; -import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; @@ -31,6 +29,7 @@ import io.servicetalk.concurrent.api.RetryStrategies; import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; +import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.HttpExecutionStrategies; @@ -45,6 +44,7 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponses; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; @@ -57,6 +57,7 @@ import java.util.function.UnaryOperator; import javax.annotation.Nullable; +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Completable.failed; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; @@ -199,9 +200,7 @@ public Completable apply(final int count, final Throwable t) { } // Unwrap a WrappedResponseException before asking the policy for a policy. - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, - returnFailedResponses && t instanceof WrappedResponseException ? - ((WrappedResponseException) t).exception : t); + final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); if (backOffPolicy != NO_RETRIES) { final int offsetCount = count - lbNotReadyCount; Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); @@ -272,8 +271,7 @@ protected Single request(final StreamingHttpRequester del } else { // Drain response payload body before packaging it respSingle = resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed( - returnFailedResponses ? new WrappedResponseException(resp, exception) : exception)); + .concat(Single.failed(exception)); } return respSingle.shareContextOnSubscribe(); }); @@ -284,11 +282,11 @@ protected Single request(final StreamingHttpRequester del // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). single = single.retryWhen(retryStrategy(request, executionContext(), true)); if (returnFailedResponses) { - single = single.onErrorResume(WrappedResponseException.class, t -> Single.succeeded( - // The previous message was already drained but we can't just 'set' it because it then - // does a weird flow control thing. Therefore, we cheat by transforming in a way that - // simply discards the original. - t.response.transformMessageBody(ignored -> Publisher.empty()))); + single = single.onErrorResume(HttpResponseException.class, t -> { + HttpResponseMetaData metaData = t.metaData(); + return Single.succeeded(StreamingHttpResponses.newResponse(metaData.status(), metaData.version(), + metaData.headers(), DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE)); + }); } return single; } @@ -1083,23 +1081,4 @@ public RetryingHttpRequesterFilter build() { returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); } } - - private static final class WrappedResponseException extends Exception { - - private static final long serialVersionUID = 3905983622734400759L; - - final StreamingHttpResponse response; - final HttpResponseException exception; - - WrappedResponseException(final StreamingHttpResponse response, final HttpResponseException exception) { - this.response = response; - this.exception = exception; - } - - @Override - public synchronized Throwable fillInStackTrace() { - // just a carrier, the stack traces are not important. - return this; - } - } }