Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix response leak that can be caused by an exception during redirect #3095

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpHeaderNames.HOST;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.REDIRECTION_3XX;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

/**
* An operator, which implements redirect logic for {@link StreamingHttpClient}.
Expand Down Expand Up @@ -184,14 +185,19 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
LOGGER.trace("Executing redirect to '{}' for request '{}'", location, request);
}

// Consume any payload of the redirect response
final Single<StreamingHttpResponse> nextResponse = response.messageBody().ignoreElements()
.concat(redirectSingle.requester.request(newRequest));
final RedirectSubscriber redirectSubscriber = new RedirectSubscriber(target, redirectSingle, newRequest,
redirectCount + 1, sequentialCancellable);
terminalDelivered = true; // Mark as "delivered" because we do not own `target` from this point
toSource(response.messageBody().ignoreElements() // Consume any payload of the redirect response
.concat(redirectSingle.requester.request(newRequest)))
.subscribe(new RedirectSubscriber(target, redirectSingle, newRequest, redirectCount + 1,
sequentialCancellable));
toSource(nextResponse).subscribe(redirectSubscriber);
} catch (Throwable cause) {
if (!terminalDelivered) {
safeOnError(target, cause);
// Drain response payload body before propagating the cause
Copy link
Member

@Scottmitch Scottmitch Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I recall asking this previously (sorry if dup-question), but what if the payload body is very large or doesn't complete (malicious client, networking broken, etc.)? Will our timeouts kick-in at this level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how users configure timeouts for their use cases. If they have it at the response payload body level, then yes.
We can add more protection as a separate work item and unify all places, bcz this is not the only place that drains payload.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followup PR sgtm

sequentialCancellable.nextCancellable(response.messageBody().ignoreElements()
.whenOnError(suppressed -> safeOnError(target, addSuppressed(cause, suppressed)))
.subscribe(() -> safeOnError(target, cause)));
} else {
LOGGER.info("Ignoring exception from onSuccess of Subscriber {}.", target, cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the terminalDelivered == true path we still don't drain and I see that we set that flag to true before we call many of the callbacks, eg

terminalDelivered = true;
target.onSuccess(response);

If it is target.onSuccess(..) that threw we will potentially still leak unless the onSuccess call did the draining. That does seem like it should be the responsibility of the onSucess method, but how defensive do we want to be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is that we set the terminalDelivered = true and in the next line we subscribe to nextResponse single, which will first subscribe to response.messageBody().ignoreElements(), register its cancellable in sequentialCancellable, and only then will deal with target. This guarantees that we do not leak the response, it will either terminate or get canceled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may be talking about different cases as I'm talking about those higher up which you didn't modify. For example:

                final String location = redirectLocation(redirectCount, request, response);
                if (location == null) {
                    terminalDelivered = true;
                    target.onSuccess(response); // <- if I throw, is the response body drained or not?
                    return;
                }

There are a few others that follow the same pattern. Trying to determine if the body has been drained is perhaps too defensive but we also don't normally expect onSuccess(..) to throw so maybe it's better to aggressively drain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change otherwise LGTM, but I'd also like to understand how the flow works in the case that @bryce-anderson outlined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, if users have an unexpected exception inside an operator applied later (for example, Single.map(...)), there is a risk they can leak the response as well.
First, users code is outside of our responsibility. They should not throw from operators but propagate cancel/onError via reactive flow.
Second, as we discussed offline we can provide a general filter for all clients to handle this type of problem with the best effort. I will open a follow-up for that.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.servicetalk.http.utils;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpExecutionContext;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import static io.servicetalk.buffer.api.Matchers.contentEqualTo;
Expand Down Expand Up @@ -217,7 +220,7 @@ void connectRequestsDoesNotRedirectByDefault() throws Exception {
StreamingHttpRequest request = client.newRequest(CONNECT, "servicetalk.io")
.setHeader(HOST, "servicetalk.io");
verifyDoesNotRedirect(client, request, SEE_OTHER);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}

@Test
Expand Down Expand Up @@ -383,7 +386,7 @@ private void crossSchemeRedirects(String fromScheme, String toScheme,
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand All @@ -401,7 +404,7 @@ void redirectFromRelativeFormToAbsoluteFormNonRelativeLocation(boolean allowNonR
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand All @@ -419,7 +422,7 @@ void redirectFromAbsoluteFormToAbsoluteFormNonRelativeLocation(boolean allowNonR
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand Down Expand Up @@ -468,7 +471,7 @@ void customLocationMapper() throws Exception {
StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET);
assertThat("Request didn't change", request, not(sameInstance(redirectedRequest)));
verifyHeadersAndMessageBodyRedirected(redirectedRequest);
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
assertThat("LocationMapper was not invoked", locationMapperInvoked.get(), is(true));
}

Expand Down Expand Up @@ -497,7 +500,7 @@ void changePostToGet(int statusCode) throws Exception {
StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET);
assertThat("Request didn't change", request, not(sameInstance(redirectedRequest)));
verifyHeadersAndMessageBodyRedirected(redirectedRequest);
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
}

@ParameterizedTest(name = "{displayName} [{index}] manyHeaders={0}")
Expand Down Expand Up @@ -544,7 +547,7 @@ void configureRedirectOfPayloadBodyForNonRelativeRedirects() throws Exception {
assertThat("Unexpected payload body", redirectedRequest.payloadBody().collect(StringBuilder::new,
(sb, chunk) -> sb.append(chunk.toString(US_ASCII)))
.toFuture().get().toString(), contentEqualTo(REQUEST_PAYLOAD));
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
}

@Test
Expand Down Expand Up @@ -580,43 +583,61 @@ void manuallyRedirectHeadersAndMessageBodyForNonRelativeRedirects() throws Excep
verifyRedirected(client, newRequest(client, GET), true, true);
}

@Test
void redirectRequestTransformerThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void redirectRequestTransformerThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.redirectRequestTransformer((relative, original, response, redirect) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
void redirectPredicateThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void redirectPredicateThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.redirectPredicate((relative, count, request, response) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
void locationMapperThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void locationMapperThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.locationMapper((request, response) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
Expand Down Expand Up @@ -688,16 +709,24 @@ private StreamingHttpRequest verifyRedirected(StreamingHttpClient client,
assertThat("Unexpected request-target of redirected request",
redirectedRequest.requestTarget(), startsWith("/"));
}
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
return redirectedRequest;
}

private void verifyRedirectResponsePayloadsDrained(boolean drained) {
private void verifyRedirectResponsePayloadsDrained(boolean drained, boolean cancelled) {
int n = 0;
for (TestPublisher<Buffer> payload : redirectResponsePayloads) {
assertThat("Redirect response payload (/location-" + ++n +
(drained ? ") was not drained" : ") was unexpectedly drained"),
assertThat("Redirect (/location-" + ++n + ") response payload was " +
(drained ? "not" : "unexpectedly") + " drained",
payload.isSubscribed(), is(drained));

if (drained) {
TestSubscription subscription = new TestSubscription();
payload.onSubscribe(subscription);
assertThat("Redirect (/location-" + ++n + ") response payload subscription was " +
(cancelled ? "not" : "unexpectedly") + " cancelled",
subscription.isCancelled(), is(cancelled));
}
}
}

Expand Down
Loading