Skip to content

Commit

Permalink
Allow terminating a connection with a close frame
Browse files Browse the repository at this point in the history
Adds a method that sends a close frame and terminates the connection without waiting for a response.
  • Loading branch information
riyafa committed Jun 19, 2018
1 parent b833561 commit d5ccea2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,9 @@ public interface WebSocketConnection {
* @return Future to represent the completion of closure asynchronously.
*/
ChannelFuture terminateConnection();

/**
* Send a close frame and close the connection without waiting for a response.
*/
ChannelFuture terminateConnection(int statusCode, String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,35 @@ public ChannelFuture finishConnectionClosure(int statusCode, String reason) {

@Override
public ChannelFuture terminateConnection() {
frameHandler.setCloseInitialized(true);
return ctx.close();
}

public int getCloseInitiatedStatusCode() {
@Override
public ChannelFuture terminateConnection(int statusCode, String reason) {
ChannelPromise closePromise = ctx.newPromise();
ctx.writeAndFlush(new CloseWebSocketFrame(statusCode, reason)).addListener(writeFuture -> {
frameHandler.setCloseInitialized(true);
Throwable writeCause = writeFuture.cause();
if (writeFuture.isSuccess() && writeCause != null) {
closePromise.setFailure(writeCause);
ctx.close();
return;
}
ctx.close().addListener(closeFuture -> {
Throwable closeCause = closeFuture.cause();
if (!closeFuture.isSuccess() && closeCause != null) {
closePromise.setFailure(closeCause);
} else {
closePromise.setSuccess();
}
});

});
return closePromise;
}

int getCloseInitiatedStatusCode() {
return this.closeInitiatedStatusCode;
}

Expand All @@ -179,7 +204,7 @@ public DefaultWebSocketSession getDefaultWebSocketSession() {
return session;
}

public ByteBuf getNettyBuf(ByteBuffer buffer) {
private ByteBuf getNettyBuf(ByteBuffer buffer) {
return Unpooled.wrappedBuffer(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public class WebSocketInboundFrameHandler extends ChannelInboundHandlerAdapter {
private final boolean securedConnection;
private final String target;
private final String interfaceId;
private final WebSocketFramesBlockingHandler blockingHandler;
private DefaultWebSocketConnection webSocketConnection;
private ChannelHandlerContext ctx;
private boolean caughtException;
private ChannelPromise closePromise;
private boolean closeFrameReceived;
private WebSocketFrameType continuationFrameType;
private final WebSocketFramesBlockingHandler blockingHandler;
private boolean caughtException;
private boolean closeFrameReceived;
private boolean closeInitialized;

public WebSocketInboundFrameHandler(WebSocketConnectorFuture connectorFuture,
WebSocketFramesBlockingHandler blockingHandler, boolean isServer,
Expand All @@ -80,13 +81,14 @@ public WebSocketInboundFrameHandler(WebSocketConnectorFuture connectorFuture,
this.securedConnection = securedConnection;
this.target = target;
this.interfaceId = interfaceId;
closeInitialized = false;
}

/**
* Set channel promise for WebSocket connection close.
*
* @param closePromise {@link ChannelPromise} to indicate the receiving of close frame echo
* back from the remote endpoint.
* back from the remote endpoint.
*/
public void setClosePromise(ChannelPromise closePromise) {
this.closePromise = closePromise;
Expand All @@ -112,6 +114,10 @@ public boolean isCloseFrameReceived() {
return closeFrameReceived;
}

public void setCloseInitialized(boolean closeInitialized) {
this.closeInitialized = closeInitialized;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
Expand All @@ -131,7 +137,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Web

@Override
public void channelInactive(ChannelHandlerContext ctx) throws WebSocketConnectorException {
if (!caughtException && webSocketConnection != null && !this.isCloseFrameReceived() && closePromise == null) {
if (!caughtException && webSocketConnection != null && !this.isCloseFrameReceived() && closePromise == null &&
!closeInitialized) {
// Notify abnormal closure.
DefaultWebSocketMessage webSocketCloseMessage =
new DefaultWebSocketCloseMessage(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE);
Expand Down Expand Up @@ -209,7 +216,7 @@ private void notifyTextMessage(WebSocketFrame frame, String text, boolean finalF
private void notifyBinaryMessage(WebSocketFrame frame, ByteBuf content, boolean finalFragment)
throws WebSocketConnectorException {
DefaultWebSocketMessage webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content,
finalFragment);
finalFragment);
setupCommonProperties(webSocketBinaryMessage);
connectorFuture.notifyWebSocketListener((WebSocketBinaryMessage) webSocketBinaryMessage);
}
Expand Down Expand Up @@ -265,7 +272,7 @@ private void setupCommonProperties(DefaultWebSocketMessage webSocketMessage) {
webSocketMessage.setSessionlID(webSocketConnection.getId());
webSocketMessage.setIsServerMessage(isServer);
webSocketMessage.setProperty(Constants.LISTENER_PORT,
((InetSocketAddress) ctx.channel().localAddress()).getPort());
((InetSocketAddress) ctx.channel().localAddress()).getPort());
webSocketMessage.setProperty(Constants.LOCAL_ADDRESS, ctx.channel().localAddress());
webSocketMessage.setProperty(
Constants.LOCAL_NAME, ((InetSocketAddress) ctx.channel().localAddress()).getHostName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void onError(Throwable t, HttpCarbonResponse response) {
}

@Test(description = "Test connection termination using WebSocketConnection without sending a close frame.")
public void testConnectionTermination() throws Throwable {
public void testConnectionTerminationWithoutCloseFrame() throws Throwable {
WebSocketConnection webSocketConnection =
getWebSocketConnectionSync(new WebSocketTestClientConnectorListener());
CountDownLatch countDownLatch = new CountDownLatch(1);
Expand All @@ -198,6 +198,20 @@ public void testConnectionTermination() throws Throwable {
Assert.assertTrue(closeFuture.isSuccess());
}

@Test(description = "Test connection termination using WebSocketConnection with a close frame.")
public void testConnectionTerminationWithCloseFrame() throws Throwable {
WebSocketConnection webSocketConnection =
getWebSocketConnectionSync(new WebSocketTestClientConnectorListener());
CountDownLatch countDownLatch = new CountDownLatch(1);
ChannelFuture closeFuture = webSocketConnection.terminateConnection(1011, "Unexpected failure").addListener(
future -> countDownLatch.countDown());
countDownLatch.await(WEBSOCKET_TEST_IDLE_TIMEOUT, SECONDS);

Assert.assertNull(closeFuture.cause());
Assert.assertTrue(closeFuture.isDone());
Assert.assertTrue(closeFuture.isSuccess());
}

@Test
public void testClientInitiatedClosure() throws Throwable {
WebSocketConnection webSocketConnection =
Expand Down

0 comments on commit d5ccea2

Please sign in to comment.