Skip to content

Commit

Permalink
Merge pull request #3408 from typelevel/topic/udp-suite-retry
Browse files Browse the repository at this point in the history
Add retries to UdpSuite
  • Loading branch information
mpilquist authored Mar 18, 2024
2 parents ee70b2a + ce52e21 commit 1279244
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import cats.syntax.all._

import com.comcast.ip4s._

import scala.concurrent.duration._

class UdpSuite extends Fs2Suite with UdpSuitePlatform {
def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
socket
.write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend)))

group("udp") {
test("echo one") {
val msg = Chunk.array("Hello, world!".getBytes)
Expand All @@ -38,15 +44,11 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
.flatMap { serverSocket =>
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
val server = serverSocket.reads
.evalMap(packet => serverSocket.write(packet))
.drain
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
Stream(Datagram(serverAddress, msg))
.through(clientSocket.writes)
.drain ++ Stream.eval(clientSocket.read)
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).evalMap { clientSocket =>
sendAndReceive(clientSocket, Datagram(serverAddress, msg))
}
server.mergeHaltBoth(client)
client.concurrently(server)
}
}
.compile
Expand All @@ -69,21 +71,17 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
.flatMap { serverSocket =>
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
val server = serverSocket.reads
.evalMap(packet => serverSocket.write(packet))
.drain
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
Stream
.emits(msgs.map(msg => Datagram(serverAddress, msg)))
.flatMap { msg =>
Stream.exec(clientSocket.write(msg)) ++ Stream.eval(clientSocket.read)
}
.evalMap(msg => sendAndReceive(clientSocket, msg))
}
val clients = Stream
.constant(client)
.take(numClients.toLong)
.parJoin(numParallelClients)
server.mergeHaltBoth(clients)
clients.concurrently(server)
}
}
.compile
Expand All @@ -110,15 +108,13 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
.exec(
v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface))
) ++
serverSocket.reads
.evalMap(packet => serverSocket.write(packet))
.drain
serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
Stream(Datagram(SocketAddress(group.address, serverPort), msg))
.through(clientSocket.writes)
.drain ++ Stream.eval(clientSocket.read)
}
server.mergeHaltBoth(client)
client.concurrently(server)
}
}
.compile
Expand Down

0 comments on commit 1279244

Please sign in to comment.