Skip to content

Subscribing to Topic concurrently to closing the Topic leads to non-terminating subscription stream #3642

@TomasMikula

Description

@TomasMikula

When materialization of a Stream returned from Topic#subscribe or subscribeAwait is done concurrently with closing the Topic, the stream returned from the subscription is (sometimes) not terminated, which is a resource leak.

Reproduction

fs2 version: 3.12.2
cats-effect version: 3.6.3
Scala version: 2.13.18

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import fs2.concurrent.Topic

import scala.concurrent.duration.DurationInt

object TopicTest extends IOApp {

  def test(await: Boolean): IO[Unit] =
    for {
      t <- Topic[IO, Int]
      s =
        if (await) Stream.resource(t.subscribeAwait(maxQueued = 1)).flatten
        else t.subscribe(maxQueued = 1)
      fiber <- s.compile.toList.start // let the subscription race with closing
      _ <- t.close
      _ <- fiber.join.timeout(10.seconds) // times out (sometimes), meaning the subscription stream never terminates
    } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    (test(await = false) *> test(await = true))
      .replicateA_(10000)
      .as(ExitCode.Success)

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions