When materialization of a Stream returned from Topic#subscribe or subscribeAwait is done concurrently with closing the Topic, the stream returned from the subscription is (sometimes) not terminated, which is a resource leak.
Reproduction
fs2 version: 3.12.2
cats-effect version: 3.6.3
Scala version: 2.13.18
import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import fs2.concurrent.Topic
import scala.concurrent.duration.DurationInt
object TopicTest extends IOApp {
def test(await: Boolean): IO[Unit] =
for {
t <- Topic[IO, Int]
s =
if (await) Stream.resource(t.subscribeAwait(maxQueued = 1)).flatten
else t.subscribe(maxQueued = 1)
fiber <- s.compile.toList.start // let the subscription race with closing
_ <- t.close
_ <- fiber.join.timeout(10.seconds) // times out (sometimes), meaning the subscription stream never terminates
} yield ()
override def run(args: List[String]): IO[ExitCode] =
(test(await = false) *> test(await = true))
.replicateA_(10000)
.as(ExitCode.Success)
}