Skip to content

Commit

Permalink
Merge pull request #2554 from armanbilge/topic/node-server-tweaks
Browse files Browse the repository at this point in the history
Improve Node.js server shutdown and error handling
  • Loading branch information
mpilquist authored Oct 23, 2021
2 parents 18e0eb9 + 5f700d3 commit 0f20c99
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.comcast.ip4s.Host
import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.Port
import com.comcast.ip4s.SocketAddress
import fs2.internal.jsdeps.node.eventsMod
import fs2.internal.jsdeps.node.netMod
import fs2.internal.jsdeps.node.nodeStrings
import fs2.internal.jsdeps.std
Expand Down Expand Up @@ -81,40 +82,38 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
(for {
dispatcher <- Dispatcher[F]
queue <- Queue.unbounded[F, netMod.Socket].toResource
error <- F.deferred[Throwable].toResource
queue <- Queue.unbounded[F, Option[netMod.Socket]].toResource
server <- Resource.make(
F
.delay(
netMod.createServer(
netMod.ServerOpts().setPauseOnConnect(true).setAllowHalfOpen(true),
sock => dispatcher.unsafeRunAndForget(queue.offer(sock))
sock => dispatcher.unsafeRunAndForget(queue.offer(Some(sock)))
)
)
)(server =>
F.async_[Unit] { cb =>
F.async[Unit] { cb =>
if (server.listening)
server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))
F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) >> queue
.offer(None)
.as(None)
else
cb(Right(()))
F.delay(cb(Right(()))).as(None)
}
)
_ <- registerListener[std.Error](server, nodeStrings.error)(_.once_error(_, _)) { e =>
dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e)))
}
_ <- error.get
.race(
F
.async_[Unit] { cb =>
server.listen(
address.foldLeft(
netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble))
)((opts, host) => opts.setHost(host.toString)),
() => cb(Right(()))
)
_ <- F
.async_[Unit] { cb =>
server.once_error(nodeStrings.error, e => cb(Left(js.JavaScriptException(e))))
server.listen(
address.foldLeft(
netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble))
)((opts, host) => opts.setHost(host.toString)),
() => {
server.asInstanceOf[eventsMod.EventEmitter].removeAllListeners("error")
cb(Right(()))
}
)
.rethrow
)
}
.toResource
ipAddress <- F
.delay(server.address())
Expand All @@ -124,10 +123,9 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
}
.toResource
sockets = Stream
.fromQueueUnterminated(queue)
.fromQueueNoneTerminated(queue)
.evalTap(setSocketOptions(options))
.flatMap(sock => Stream.resource(Socket.forAsync(sock)))
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit])))
} yield (ipAddress, sockets)).adaptError { case IOException(ex) => ex }

}
Expand Down

0 comments on commit 0f20c99

Please sign in to comment.