diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index a9dbc1daec..249501501c 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -386,13 +386,14 @@ public void onComplete() { // the final response, which may trigger continuation for the next request in pipeline. responseSent.set(true); } - Cancellable c = null; + Cancellable resetFlushStrategy = null; final FlushStrategy flushStrategy = determineFlushStrategyForApi(response); if (flushStrategy != null) { - c = updateFlushStrategy((prev, isOriginal) -> isOriginal ? flushStrategy : prev); + resetFlushStrategy = updateFlushStrategy( + (prev, isOriginal) -> isOriginal ? flushStrategy : prev); } Publisher pub = handleResponse(protocol(), requestMethod, response); - return (c == null ? pub : pub.beforeFinally(c::cancel)) + return (resetFlushStrategy == null ? pub : pub.beforeFinally(resetFlushStrategy::cancel)) // No need to make a copy of the context while consuming response message body. .shareContextOnSubscribe(); })); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java index 75e8413076..a6ddc76b8d 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java @@ -43,8 +43,8 @@ protected Publisher writeAndRead(final Publisher requestStream, return Publisher.defer(() -> { final Cancellable resetFlushStrategy = connection.updateFlushStrategy( (prev, isOriginal) -> isOriginal ? flushStrategy : prev); - return connection.write(requestStream).mergeDelayError(connection.read()) - .afterFinally(resetFlushStrategy::cancel); + return connection.write(requestStream.beforeFinally(resetFlushStrategy::cancel)) + .mergeDelayError(connection.read()); }); } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java index 9602e31f2c..4548669314 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java @@ -46,8 +46,8 @@ protected Publisher writeAndRead(Publisher requestStream, return Publisher.defer(() -> { final Cancellable resetFlushStrategy = connection.updateFlushStrategy( (prev, isOriginal) -> isOriginal ? flushStrategy : prev); - return connection.write(requestStream, connection::defaultFlushStrategy, - WriteDemandEstimators::newDefaultEstimator).afterFinally(resetFlushStrategy::cancel); + return connection.write(requestStream.beforeFinally(resetFlushStrategy::cancel), + connection::defaultFlushStrategy, WriteDemandEstimators::newDefaultEstimator); }); } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnClientTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnClientTest.java new file mode 100644 index 0000000000..5ae22ebdc8 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnClientTest.java @@ -0,0 +1,103 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.buffer.api.BufferAllocator; +import io.servicetalk.client.api.TransportObserverConnectionFactoryFilter; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.http.api.HttpRequest; +import io.servicetalk.http.api.HttpServerContext; +import io.servicetalk.http.api.ReservedStreamingHttpConnection; +import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpConnection; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.netty.FlushStrategyOnServerTest.OutboundWriteEventsInterceptor; +import io.servicetalk.transport.netty.internal.ExecutionContextExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.netty.FlushStrategyOnServerTest.assertFlushOnEach; +import static io.servicetalk.http.netty.FlushStrategyOnServerTest.assertFlushOnEnd; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +class FlushStrategyOnClientTest { + + @RegisterExtension + static final ExecutionContextExtension SERVER_CTX = + ExecutionContextExtension.cached("server-io", "server-executor") + .setClassLevel(true); + @RegisterExtension + static final ExecutionContextExtension CLIENT_CTX = + ExecutionContextExtension.cached("client-io", "client-executor") + .setClassLevel(true); + + @Test + void sequentialPipelinedRequestsHaveTheirOwnStrategy() throws Exception { + OutboundWriteEventsInterceptor interceptor = new OutboundWriteEventsInterceptor(); + BlockingQueue receivedRequests = new LinkedBlockingQueue<>(); + CountDownLatch canReturnResponse = new CountDownLatch(1); + try (HttpServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX) + .listenBlockingAndAwait(((ctx, request, responseFactory) -> { + receivedRequests.add(request); + canReturnResponse.await(); + return responseFactory.ok(); + })); + StreamingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX) + .appendConnectionFactoryFilter(new TransportObserverConnectionFactoryFilter<>(interceptor)) + .buildStreaming(); + ReservedStreamingHttpConnection connection = client.reserveConnection(client.get("/")).toFuture().get()) { + + // First request goes through aggregation to enforce "flushOnEnd()" + Future firstResponse = connection.request(newRequest(connection, "/first") + .toRequest().toFuture().get().toStreamingRequest()).toFuture(); + assertThat(receivedRequests.take().requestTarget(), is("/first")); + assertFlushOnEnd(interceptor); + + // Second request is sent as streaming and expected to use "flushOnEach()" strategy + Future secondResponse = connection.request(newRequest(connection, "/second")) + .toFuture(); + assertFlushOnEach(interceptor); + canReturnResponse.countDown(); + assertThat(receivedRequests.take().requestTarget(), is("/second")); + + assertResponse(firstResponse); + assertResponse(secondResponse); + } + } + + private static StreamingHttpRequest newRequest(StreamingHttpConnection connection, String path) { + BufferAllocator alloc = connection.executionContext().bufferAllocator(); + return connection.post(path) + .payloadBody(Publisher.from(alloc.fromAscii("foo"), alloc.fromAscii("bar"))); + } + + private static void assertResponse(Future responseFuture) throws Exception { + StreamingHttpResponse response = responseFuture.get(); + assertThat(response.status(), is(OK)); + Buffer payload = response.toResponse().toFuture().get().payloadBody(); + assertThat(payload.readableBytes(), is(0)); + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java index 0ee77d0773..0b9d030861 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,26 +16,18 @@ package io.servicetalk.http.netty; import io.servicetalk.http.api.BlockingHttpClient; -import io.servicetalk.http.api.DefaultHttpExecutionContext; -import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.HttpExecutionStrategy; -import io.servicetalk.http.api.HttpHeaders; -import io.servicetalk.http.api.HttpHeadersFactory; +import io.servicetalk.http.api.HttpRequest; import io.servicetalk.http.api.HttpResponse; -import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpService; -import io.servicetalk.http.netty.NettyHttpServer.NettyHttpServerContext; -import io.servicetalk.tcp.netty.internal.ReadOnlyTcpServerConfig; -import io.servicetalk.tcp.netty.internal.TcpServerBinder; -import io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer; -import io.servicetalk.tcp.netty.internal.TcpServerConfig; +import io.servicetalk.transport.api.ConnectionInfo; +import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.api.ServerContext; +import io.servicetalk.transport.api.TransportObserver; import io.servicetalk.transport.netty.internal.ExecutionContextExtension; +import io.servicetalk.transport.netty.internal.NoopTransportObserver; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -43,8 +35,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import javax.annotation.Nullable; -import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.concurrent.api.Publisher.from; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder; @@ -52,17 +44,11 @@ import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone; import static io.servicetalk.http.api.HttpHeaderNames.TRANSFER_ENCODING; import static io.servicetalk.http.api.HttpHeaderValues.CHUNKED; -import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; -import static io.servicetalk.http.api.HttpRequestMethod.GET; import static io.servicetalk.http.api.HttpSerializers.appSerializerUtf8FixLen; -import static io.servicetalk.http.api.StreamingHttpRequests.newTransportRequest; import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder; -import static io.servicetalk.http.netty.NettyHttpServer.initChannel; -import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.fail; class FlushStrategyOnServerTest { @@ -78,10 +64,10 @@ class FlushStrategyOnServerTest { private static final String USE_AGGREGATED_RESP = "aggregated-resp"; private static final String USE_EMPTY_RESP_BODY = "empty-resp-body"; - private OutboundWriteEventsInterceptor interceptor; - private HttpHeadersFactory headersFactory; - + private final OutboundWriteEventsInterceptor interceptor = new OutboundWriteEventsInterceptor(); + @Nullable private ServerContext serverContext; + @Nullable private BlockingHttpClient client; private enum Param { @@ -94,10 +80,7 @@ private enum Param { } } - private void setUp(final Param param) { - this.interceptor = new OutboundWriteEventsInterceptor(); - this.headersFactory = DefaultHttpHeadersFactory.INSTANCE; - + private void setUp(final Param param) throws Exception { final StreamingHttpService service = (ctx, request, responseFactory) -> { StreamingHttpResponse resp = responseFactory.ok(); if (request.headers().get(USE_EMPTY_RESP_BODY) == null) { @@ -109,38 +92,23 @@ private void setUp(final Param param) { return succeeded(resp); }; - final DefaultHttpExecutionContext httpExecutionContext = new DefaultHttpExecutionContext( - SERVER_CTX.bufferAllocator(), SERVER_CTX.ioExecutor(), SERVER_CTX.executor(), param.executionStrategy); - - final ReadOnlyHttpServerConfig config = new HttpServerConfig().asReadOnly(); - final ReadOnlyTcpServerConfig tcpReadOnly = new TcpServerConfig().asReadOnly(); - - try { - serverContext = TcpServerBinder.bind(localAddress(0), tcpReadOnly, - httpExecutionContext, null, - (channel, observer) -> { - channel.config().setAutoRead(true); - return initChannel(channel, httpExecutionContext, config, - new TcpServerChannelInitializer(tcpReadOnly, observer, httpExecutionContext) - .andThen(channel1 -> channel1.pipeline().addLast(interceptor)), service, - true, observer); - }, - connection -> connection.process(true), null, null) - .map(delegate -> new NettyHttpServerContext(delegate, service, httpExecutionContext)) - .toFuture().get(); - } catch (Exception e) { - fail(e); - } - + serverContext = BuilderUtils.newServerBuilder(SERVER_CTX) + .executionStrategy(param.executionStrategy) + .transportObserver(interceptor) + .listenStreamingAndAwait(service); client = newClientBuilder(serverContext, CLIENT_CTX).buildBlocking(); } @AfterEach void tearDown() throws Exception { try { - client.close(); + if (client != null) { + client.close(); + } } finally { - serverContext.close(); + if (serverContext != null) { + serverContext.close(); + } } } @@ -257,12 +225,20 @@ void aggregatedStreamingEmptyResponse(final Param param) throws Exception { } private void assertFlushOnEnd() throws Exception { + assertFlushOnEnd(interceptor); + } + + static void assertFlushOnEnd(OutboundWriteEventsInterceptor interceptor) throws Exception { // aggregated response: headers, single (or empty) payload, and empty buffer instead of trailers assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), greaterThan(0)); assertThat("Unexpected writes", interceptor.pendingEvents(), is(0)); } private void assertFlushOnEach() throws Exception { + assertFlushOnEach(interceptor); + } + + static void assertFlushOnEach(OutboundWriteEventsInterceptor interceptor) throws Exception { // headers assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), is(1)); // one chunk; chunk header payload and CRLF @@ -279,22 +255,20 @@ private void sendARequest(final boolean useAggregatedResp) throws Exception { } private void sendARequest(boolean useAggregatedResp, boolean useEmptyRespBody) throws Exception { - HttpHeaders headers = headersFactory.newHeaders(); - headers.set(TRANSFER_ENCODING, CHUNKED); + assert client != null; + HttpRequest request = client.get("/") + .setHeader(TRANSFER_ENCODING, CHUNKED) + .payloadBody(client.executionContext().bufferAllocator().fromAscii("Hello")); if (useAggregatedResp) { - headers.set(USE_AGGREGATED_RESP, "true"); + request.setHeader(USE_AGGREGATED_RESP, "true"); } if (useEmptyRespBody) { - headers.set(USE_EMPTY_RESP_BODY, "true"); + request.setHeader(USE_EMPTY_RESP_BODY, "true"); } - - StreamingHttpRequest req = newTransportRequest(GET, "/", HTTP_1_1, headers, DEFAULT_ALLOCATOR, - from(DEFAULT_ALLOCATOR.fromAscii("Hello"), headersFactory.newTrailers()), false, - headersFactory); - client.request(req.toRequest().toFuture().get()); + client.request(request); } - static class OutboundWriteEventsInterceptor extends ChannelOutboundHandlerAdapter { + static class OutboundWriteEventsInterceptor implements TransportObserver, ConnectionObserver { private static final Object MSG = new Object(); private static final Object FLUSH = new Object(); @@ -302,15 +276,13 @@ static class OutboundWriteEventsInterceptor extends ChannelOutboundHandlerAdapte private final BlockingQueue writeEvents = new LinkedBlockingDeque<>(); @Override - public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { + public void onDataWrite(int size) { writeEvents.add(MSG); - ctx.write(msg, promise); } @Override - public void flush(final ChannelHandlerContext ctx) { + public void onFlush() { writeEvents.add(FLUSH); - ctx.flush(); } int takeWritesTillFlush() throws Exception { @@ -328,5 +300,32 @@ int takeWritesTillFlush() throws Exception { int pendingEvents() { return writeEvents.size(); } + + @Override + public ConnectionObserver onNewConnection(@Nullable Object localAddress, Object remoteAddress) { + return this; + } + + @Override + public void onDataRead(int size) { + } + + @Override + public DataObserver connectionEstablished(final ConnectionInfo info) { + return NoopTransportObserver.NoopDataObserver.INSTANCE; + } + + @Override + public MultiplexedObserver multiplexedConnectionEstablished(final ConnectionInfo info) { + return NoopTransportObserver.NoopMultiplexedObserver.INSTANCE; + } + + @Override + public void connectionClosed(final Throwable error) { + } + + @Override + public void connectionClosed() { + } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOverrideTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOverrideTest.java index 36b807320b..54e5edd2e3 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOverrideTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOverrideTest.java @@ -121,7 +121,7 @@ void overrideFlush() throws Throwable { c.cancel(); // revert to flush on each. // No more custom strategies. - Collection secondReqChunks = conn.request(conn.get("")) + Collection secondReqChunks = conn.request(conn.get("/")) .flatMapPublisher(StreamingHttpResponse::messageBody).toFuture().get(); clientStrategy.verifyNoMoreInteractions(); service.getLastUsedStrategy();