Skip to content

Conversation

@armanbilge
Copy link
Member

This PR replaces deprecated Dispatcher[IO] uses with the appropriate sequential or parallel replacements.

Inline comments forthcoming.

* that downstream has been terminated that in turn kills upstream
*/
Dispatcher[F].flatMap { dispatcher =>
Dispatcher.sequential[F].flatMap { dispatcher =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatcher is used only for reading and closing. That seems sequential to me.

Comment on lines 83 to +90
// According to the spec, it's acceptable for a concurrent cancel to not
// be processed immediately, but if you have synchronous `cancel();
// request()`, then the request _must_ be a no op. For this reason, we
// need to make sure that `cancel()` does not return until the
// `cancelled` signal has been set.
// request()`, then the request _must_ be a no op. Fortunately,
// ordering is guaranteed by a sequential d
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
def cancel(): Unit =
dispatcher.unsafeRunSync(cancelled.set(true))
requestDispatcher.unsafeRunAndForget(cancelled.set(true))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is interesting. Previously unsafeRunSync was necessary to achieve this behavior. Now the shared sequential Dispatcher guarantees the required ordering out-of-the box.

Comment on lines 41 to +45
cancelled: SignallingRef[F, Boolean],
sub: Subscriber[A],
stream: Stream[F, A],
dispatcher: Dispatcher[F]
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need two Dispatchers for StreamSubscription, but they can both be sequential.

The first is used in unsafeStart() and actually runs the subscription. So it's blocked on that.

The second is used for handling requests and canceling.

Comment on lines +60 to +70
@deprecated("Use overload which takes only the stream and returns a Resource", "3.4.0")
def apply[F[_]: Async, A](
s: Stream[F, A],
dispatcher: Dispatcher[F]
): StreamUnicastPublisher[F, A] =
new StreamUnicastPublisher(s, dispatcher)
new StreamUnicastPublisher(s, dispatcher, dispatcher)

def apply[F[_]: Async, A](
s: Stream[F, A]
): Resource[F, StreamUnicastPublisher[F, A]] =
(Dispatcher.sequential[F], Dispatcher.sequential[F]).mapN(new StreamUnicastPublisher(s, _, _))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can now have local Dispatchers with special sequential semantics, it seems like we should deprecate the pattern of asking for (presumably more global) dispatchers in favor of building our own dispatchers.

Furthermore, since we now rely on the sequential Dispatcher to achieve the correct semantics, exposing it as an argument is not good design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to come in to this so late in the game after this has merged, but we use this in a Play application to stream CSV files from an fs2 Stream; the Play API requires a class that's hard to construct except by using a StreamUnicastPublisher but it doesn't give us the opportunity to call close. In previous discussions on discord, I was advised to create a Dispatcher for the application and pass it in; we can guarantee the provided Dispatcher gets closed at the end.

I see (superficially) the point of your comment here and the rationale behind deprecating the API that takes a provided Dispatcher, but I don't see how we can migrate to the non-deprecated API without leaking Dispatcher instances. I will try to find you on discord sometime to talk about it a little more.

Copy link
Member Author

@armanbilge armanbilge Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtomko no problem, thanks for the comment.

but I don't see how we can migrate to the non-deprecated API without leaking Dispatcher instances.

I need to think about it more, but so long as you are careful to cancel the subscription, I don't think there will be any leak here. The Resource surrounding a Dispatcher is intended to make sure that any tasks it started are canceled when the resource closes. If a Dispatcher has no running tasks and it goes out of scope, I am fairly certain that the GC will completely clean it up. Hence, no leak.

Also @BalmungSan is currently working on some usability improvements that might help your usecase :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to trace through the code, but it means digging deeply into akka streams and Play. The code we've written just creates the source and passes it to Play's Ok to construct a result:

  def streamToClientAsCsvFile(
      enumerator: Stream[IO, Row],
      dispatcher: Dispatcher[IO],
      filename: String
  ): Result = {
    val publisher = StreamUnicastPublisher(enumerator.through(rowToCsv), dispatcher)
    val source = Source.fromPublisher(publisher)

    Ok.chunked(source)
      .withHeaders(CONTENT_DISPOSITION -> s"attachment; filename = $filename")
      .as("text/csv")
  }

I would like to hope that Play is smart enough to recognize when the connection is closed and close the Source.

Copy link
Contributor

@mtomko mtomko Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ps, thanks for your previous response, it's really appreciated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtomko #3107 is the PR @armanbilge mentioned. However, I am not exactly sure how it helps in your use case.

Copy link
Member Author

@armanbilge armanbilge Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I am not exactly sure how it helps in your use case.

@BalmungSan @mtomko the new stream.subscribe(...) syntax that Luis is adding in #3107 will enable you to bring back the global Dispatcher.

val source = Source.fromPublisher { subscriber =>
  dispatcher.unsafeRunAndForget(enumerator.subscribe(subscriber))
}

Comment on lines -32 to -44
trait UnsafeTestNGSuite extends TestNGSuiteLike {

protected var dispatcher: Dispatcher[IO] = _
private var shutdownDispatcher: IO[Unit] = _

private val mkDispatcher = Dispatcher[IO].allocated
private val t = mkDispatcher.unsafeRunSync()
dispatcher = t._1
shutdownDispatcher = t._2

@AfterClass
def afterAll() = shutdownDispatcher.unsafeRunSync()
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This machinery was no longer necessary since Dispatcher is no longer accepted as an argument.

Comment on lines -202 to +204
dispatcher <- Dispatcher[F]
readDispatcher <- Dispatcher.sequential[F]
writeDispatcher <- Dispatcher.sequential[F]
errorDispatcher <- Dispatcher.sequential[F]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read/write are sequential and error is one-off, but they need to be independent, so they need their own dispatchers.

Comment on lines +180 to 182
ProblemFilters.exclude[DirectMissingMethodProblem]( // something funky in Scala 3.2.0 ...
"fs2.io.net.SocketGroupCompanionPlatform#AsyncSocketGroup.this"
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super weird, can reproduce just by updating to 3.2.0 on main. No idea ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I minimized it in armanbilge/sandbox@d9453ac. IDK what change in dotty caused it, but seems safe: seems like the private constructor was generating public bytecode prior to 3.2.0, but is now properly private. This is flagging up MiMa.

It doesn't affect anything public.

@mpilquist mpilquist merged commit 9ac3e43 into typelevel:main Sep 30, 2022
@armanbilge armanbilge changed the title Update to Cats Effect 3.2.0-RC1 Update to Cats Effect 3.4.0-RC1 Sep 30, 2022
@armanbilge
Copy link
Member Author

Lol, I mis-titled this and the commit as 3.2.0-RC1 instead of 3.4.0-RC1. oh well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants