From d5ccea21c80c14de59d0db95be42e9c460098791 Mon Sep 17 00:00:00 2001 From: riyafa Date: Mon, 18 Jun 2018 14:40:42 +0530 Subject: [PATCH] Allow terminating a connection with a close frame Adds a method that sends a close frame and terminates the connection without waiting for a response. --- .../websocket/WebSocketConnection.java | 5 ++++ .../websocket/DefaultWebSocketConnection.java | 29 +++++++++++++++++-- .../WebSocketInboundFrameHandler.java | 21 +++++++++----- .../WebSocketClientFunctionalityTestCase.java | 16 +++++++++- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketConnection.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketConnection.java index 7b41c6d43..63801f8ae 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketConnection.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketConnection.java @@ -133,4 +133,9 @@ public interface WebSocketConnection { * @return Future to represent the completion of closure asynchronously. */ ChannelFuture terminateConnection(); + + /** + * Send a close frame and close the connection without waiting for a response. + */ + ChannelFuture terminateConnection(int statusCode, String reason); } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java index 281cbcdfa..19ec58a84 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java @@ -167,10 +167,35 @@ public ChannelFuture finishConnectionClosure(int statusCode, String reason) { @Override public ChannelFuture terminateConnection() { + frameHandler.setCloseInitialized(true); return ctx.close(); } - public int getCloseInitiatedStatusCode() { + @Override + public ChannelFuture terminateConnection(int statusCode, String reason) { + ChannelPromise closePromise = ctx.newPromise(); + ctx.writeAndFlush(new CloseWebSocketFrame(statusCode, reason)).addListener(writeFuture -> { + frameHandler.setCloseInitialized(true); + Throwable writeCause = writeFuture.cause(); + if (writeFuture.isSuccess() && writeCause != null) { + closePromise.setFailure(writeCause); + ctx.close(); + return; + } + ctx.close().addListener(closeFuture -> { + Throwable closeCause = closeFuture.cause(); + if (!closeFuture.isSuccess() && closeCause != null) { + closePromise.setFailure(closeCause); + } else { + closePromise.setSuccess(); + } + }); + + }); + return closePromise; + } + + int getCloseInitiatedStatusCode() { return this.closeInitiatedStatusCode; } @@ -179,7 +204,7 @@ public DefaultWebSocketSession getDefaultWebSocketSession() { return session; } - public ByteBuf getNettyBuf(ByteBuffer buffer) { + private ByteBuf getNettyBuf(ByteBuffer buffer) { return Unpooled.wrappedBuffer(buffer); } } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java index ee98cbbb5..a275f84ac 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java @@ -63,13 +63,14 @@ public class WebSocketInboundFrameHandler extends ChannelInboundHandlerAdapter { private final boolean securedConnection; private final String target; private final String interfaceId; + private final WebSocketFramesBlockingHandler blockingHandler; private DefaultWebSocketConnection webSocketConnection; private ChannelHandlerContext ctx; - private boolean caughtException; private ChannelPromise closePromise; - private boolean closeFrameReceived; private WebSocketFrameType continuationFrameType; - private final WebSocketFramesBlockingHandler blockingHandler; + private boolean caughtException; + private boolean closeFrameReceived; + private boolean closeInitialized; public WebSocketInboundFrameHandler(WebSocketConnectorFuture connectorFuture, WebSocketFramesBlockingHandler blockingHandler, boolean isServer, @@ -80,13 +81,14 @@ public WebSocketInboundFrameHandler(WebSocketConnectorFuture connectorFuture, this.securedConnection = securedConnection; this.target = target; this.interfaceId = interfaceId; + closeInitialized = false; } /** * Set channel promise for WebSocket connection close. * * @param closePromise {@link ChannelPromise} to indicate the receiving of close frame echo - * back from the remote endpoint. + * back from the remote endpoint. */ public void setClosePromise(ChannelPromise closePromise) { this.closePromise = closePromise; @@ -112,6 +114,10 @@ public boolean isCloseFrameReceived() { return closeFrameReceived; } + public void setCloseInitialized(boolean closeInitialized) { + this.closeInitialized = closeInitialized; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; @@ -131,7 +137,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Web @Override public void channelInactive(ChannelHandlerContext ctx) throws WebSocketConnectorException { - if (!caughtException && webSocketConnection != null && !this.isCloseFrameReceived() && closePromise == null) { + if (!caughtException && webSocketConnection != null && !this.isCloseFrameReceived() && closePromise == null && + !closeInitialized) { // Notify abnormal closure. DefaultWebSocketMessage webSocketCloseMessage = new DefaultWebSocketCloseMessage(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE); @@ -209,7 +216,7 @@ private void notifyTextMessage(WebSocketFrame frame, String text, boolean finalF private void notifyBinaryMessage(WebSocketFrame frame, ByteBuf content, boolean finalFragment) throws WebSocketConnectorException { DefaultWebSocketMessage webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content, - finalFragment); + finalFragment); setupCommonProperties(webSocketBinaryMessage); connectorFuture.notifyWebSocketListener((WebSocketBinaryMessage) webSocketBinaryMessage); } @@ -265,7 +272,7 @@ private void setupCommonProperties(DefaultWebSocketMessage webSocketMessage) { webSocketMessage.setSessionlID(webSocketConnection.getId()); webSocketMessage.setIsServerMessage(isServer); webSocketMessage.setProperty(Constants.LISTENER_PORT, - ((InetSocketAddress) ctx.channel().localAddress()).getPort()); + ((InetSocketAddress) ctx.channel().localAddress()).getPort()); webSocketMessage.setProperty(Constants.LOCAL_ADDRESS, ctx.channel().localAddress()); webSocketMessage.setProperty( Constants.LOCAL_NAME, ((InetSocketAddress) ctx.channel().localAddress()).getHostName()); diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/client/WebSocketClientFunctionalityTestCase.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/client/WebSocketClientFunctionalityTestCase.java index dfd4726d0..f1fed1201 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/client/WebSocketClientFunctionalityTestCase.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/client/WebSocketClientFunctionalityTestCase.java @@ -185,7 +185,7 @@ public void onError(Throwable t, HttpCarbonResponse response) { } @Test(description = "Test connection termination using WebSocketConnection without sending a close frame.") - public void testConnectionTermination() throws Throwable { + public void testConnectionTerminationWithoutCloseFrame() throws Throwable { WebSocketConnection webSocketConnection = getWebSocketConnectionSync(new WebSocketTestClientConnectorListener()); CountDownLatch countDownLatch = new CountDownLatch(1); @@ -198,6 +198,20 @@ public void testConnectionTermination() throws Throwable { Assert.assertTrue(closeFuture.isSuccess()); } + @Test(description = "Test connection termination using WebSocketConnection with a close frame.") + public void testConnectionTerminationWithCloseFrame() throws Throwable { + WebSocketConnection webSocketConnection = + getWebSocketConnectionSync(new WebSocketTestClientConnectorListener()); + CountDownLatch countDownLatch = new CountDownLatch(1); + ChannelFuture closeFuture = webSocketConnection.terminateConnection(1011, "Unexpected failure").addListener( + future -> countDownLatch.countDown()); + countDownLatch.await(WEBSOCKET_TEST_IDLE_TIMEOUT, SECONDS); + + Assert.assertNull(closeFuture.cause()); + Assert.assertTrue(closeFuture.isDone()); + Assert.assertTrue(closeFuture.isSuccess()); + } + @Test public void testClientInitiatedClosure() throws Throwable { WebSocketConnection webSocketConnection =