From d9bd035bf4f535f31c04720ea27253dbf23920d4 Mon Sep 17 00:00:00 2001 From: Dimitri Ho Date: Wed, 20 Nov 2024 19:55:12 +0100 Subject: [PATCH] Prevent deadlock when publisher is canceled --- .../src/main/scala/fs2/concurrent/Topic.scala | 2 +- .../scala/fs2/concurrent/TopicSuite.scala | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index a7d9bf12c7..2061070b16 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -208,7 +208,7 @@ object Topic { } def publish: Pipe[F, A, Nothing] = { in => - (in ++ Stream.exec(close.void)) + in.onFinalize(close.void) .evalMap(publish1) .takeWhile(_.isRight) .drain diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala index 6f731d41eb..c26fd73dd7 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala @@ -185,4 +185,25 @@ class TopicSuite extends Fs2Suite { TestControl.executeEmbed(program) // will fail if program is deadlocked } + + test("publisher cancellation does not deadlock") { + val program = + Topic[IO, String] + .flatMap { topic => + val publisher = + Stream + .constant("1") + .covary[IO] + .evalTap(_ => IO.canceled) + .through(topic.publish) + + Stream + .resource(topic.subscribeAwait(1)) + .flatMap(subscriber => subscriber.concurrently(publisher)) + .compile + .drain + } + + TestControl.executeEmbed(program) // will fail if program is deadlocked + } }