Skip to content

Commit

Permalink
fix for close detection for tcp session remote channel
Browse files Browse the repository at this point in the history
  • Loading branch information
compscidr committed Nov 7, 2024
1 parent 89a16ad commit e5a177d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class AnonymousTcpSession(
val len = super.handleReturnTrafficLoop(maxRead)
if (len == 0 && tcpStateMachine.tcpState.value == TcpState.CLOSE_WAIT) {
logger.warn("We're in CLOSE_WAIT, and we have no more data to recv from remote side, sending FIN")
close()
val finPacket = teardown()
if (finPacket != null) {
returnQueue.add(finPacket)
}
}
return len
}
Expand All @@ -73,6 +76,7 @@ class AnonymousTcpSession(
Thread.currentThread().name = "Outgoing handler: ${getKey()}"
try {
logger.debug("TCP connecting to {}:{}", initialIpHeader.destinationAddress, initialTransportHeader.destinationPort)
channel.socket().keepAlive = false
channel.socket().connect(
InetSocketAddress(initialIpHeader.destinationAddress, initialTransportHeader.destinationPort.toInt()),
1000,
Expand Down Expand Up @@ -111,21 +115,29 @@ class AnonymousTcpSession(

try {
while (channel.isOpen) {
do {
val maxRead = tcpStateMachine.availableOutgoingBufferSpace()
val len =
if (maxRead > 0) {
handleReturnTrafficLoop(maxRead)
} else {
logger.warn("No more space in outgoing buffer, waiting for more space")
0
}
} while (channel.isOpen && len > -1)
val maxRead = tcpStateMachine.availableOutgoingBufferSpace()
val len =
if (maxRead > 0) {
handleReturnTrafficLoop(maxRead)
} else {
logger.warn("No more space in outgoing buffer, waiting for more space")
0
}
if (len < 0) {
break
}
}
} catch (e: Exception) {
logger.warn("Remote Tcp channel closed")
// incomingQueue.clear() // prevent us from handling any incoming packets because we can't send them anywhere
close()
val finPacket = teardown()
if (finPacket != null) {
returnQueue.add(finPacket)
}
} catch (e: Exception) {
logger.warn("Remote Tcp channel closed ${e.message}")
val finPacket = teardown()
if (finPacket != null) {
returnQueue.add(finPacket)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ abstract class TcpSession(
* else do nothing.
*/
fun teardown(swapSourceAndDestination: Boolean = true): Packet? {
logger.debug("Tcp session CLOSE function called in tcpState: ${tcpStateMachine.tcpState.value}")
logger.debug("Tcp session CLOSE function called in tcpState: ${tcpStateMachine.tcpState.value} swap?: $swapSourceAndDestination")
if (tcpStateMachine.transmissionControlBlock == null) {
logger.debug("No TCB, returning to CLOSED")
tcpStateMachine.tcpState.value = TcpState.CLOSED
Expand Down
12 changes: 6 additions & 6 deletions src/main/kotlin/com/jasonernst/kanonproxy/tcp/TcpStateMachine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ class TcpStateMachine(
session.channel.write(buffer)
}
} catch (e: Exception) {
val packet = session.teardown()
val packet = session.teardown(swapSourceDestination)
if (packet != null) {
return@runBlocking listOf(packet)
} else {
Expand Down Expand Up @@ -782,7 +782,7 @@ class TcpStateMachine(
ackNumber = transmissionControlBlock!!.rcv_nxt,
transmissionControlBlock = transmissionControlBlock,
)
val finPacket = session.teardown()
val finPacket = session.teardown(swapSourceDestination)
if (finPacket != null) {
return@runBlocking listOf(ackPacket, finPacket)
} else {
Expand Down Expand Up @@ -989,7 +989,7 @@ class TcpStateMachine(
session.channel.write(buffer)
}
} catch (e: Exception) {
val packet = session.teardown()
val packet = session.teardown(swapSourceDestination)
if (packet != null) {
return@runBlocking listOf(packet)
} else {
Expand Down Expand Up @@ -1043,7 +1043,7 @@ class TcpStateMachine(
transmissionControlBlock = transmissionControlBlock,
)
flushpackets.add(ackPacket)
val finPacket = session.teardown()
val finPacket = session.teardown(swapSourceDestination)
if (finPacket != null) {
flushpackets.add(finPacket)
}
Expand Down Expand Up @@ -1241,7 +1241,7 @@ class TcpStateMachine(
session.channel.write(buffer)
}
} catch (e: Exception) {
val packet = session.teardown()
val packet = session.teardown(swapSourceDestination)
if (packet != null) {
return@runBlocking listOf(packet)
} else {
Expand Down Expand Up @@ -1480,7 +1480,7 @@ class TcpStateMachine(
session.channel.write(buffer)
}
} catch (e: Exception) {
val packet = session.teardown()
val packet = session.teardown(swapSourceDestination)
if (packet != null) {
return@runBlocking listOf(packet)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/test/kotlin/com/jasonernst/kanonproxy/tcp/TcpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TcpClient(
TcpOptionMaximumSegmentSize.defaultIpv6MSS
}

override val tcpStateMachine = TcpStateMachine(MutableStateFlow(TcpState.CLOSED), mtu, this, swapSourceDestination = true)
override val tcpStateMachine = TcpStateMachine(MutableStateFlow(TcpState.CLOSED), mtu, this, swapSourceDestination = false)

// this is where the state machine will write into for us to receive it here
override val channel: ByteChannel = BidirectionalByteChannel()
Expand Down

0 comments on commit e5a177d

Please sign in to comment.