-
Notifications
You must be signed in to change notification settings - Fork 621
Update to Cats Effect 3.4.0-RC1 #3000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update to Cats Effect 3.4.0-RC1 #3000
Conversation
| * that downstream has been terminated that in turn kills upstream | ||
| */ | ||
| Dispatcher[F].flatMap { dispatcher => | ||
| Dispatcher.sequential[F].flatMap { dispatcher => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dispatcher is used only for reading and closing. That seems sequential to me.
| // According to the spec, it's acceptable for a concurrent cancel to not | ||
| // be processed immediately, but if you have synchronous `cancel(); | ||
| // request()`, then the request _must_ be a no op. For this reason, we | ||
| // need to make sure that `cancel()` does not return until the | ||
| // `cancelled` signal has been set. | ||
| // request()`, then the request _must_ be a no op. Fortunately, | ||
| // ordering is guaranteed by a sequential d | ||
| // See https://github.com/zainab-ali/fs2-reactive-streams/issues/29 | ||
| // and https://github.com/zainab-ali/fs2-reactive-streams/issues/46 | ||
| def cancel(): Unit = | ||
| dispatcher.unsafeRunSync(cancelled.set(true)) | ||
| requestDispatcher.unsafeRunAndForget(cancelled.set(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is interesting. Previously unsafeRunSync was necessary to achieve this behavior. Now the shared sequential Dispatcher guarantees the required ordering out-of-the box.
| cancelled: SignallingRef[F, Boolean], | ||
| sub: Subscriber[A], | ||
| stream: Stream[F, A], | ||
| dispatcher: Dispatcher[F] | ||
| startDispatcher: Dispatcher[F], | ||
| requestDispatcher: Dispatcher[F] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need two Dispatchers for StreamSubscription, but they can both be sequential.
The first is used in unsafeStart() and actually runs the subscription. So it's blocked on that.
The second is used for handling requests and canceling.
| @deprecated("Use overload which takes only the stream and returns a Resource", "3.4.0") | ||
| def apply[F[_]: Async, A]( | ||
| s: Stream[F, A], | ||
| dispatcher: Dispatcher[F] | ||
| ): StreamUnicastPublisher[F, A] = | ||
| new StreamUnicastPublisher(s, dispatcher) | ||
| new StreamUnicastPublisher(s, dispatcher, dispatcher) | ||
|
|
||
| def apply[F[_]: Async, A]( | ||
| s: Stream[F, A] | ||
| ): Resource[F, StreamUnicastPublisher[F, A]] = | ||
| (Dispatcher.sequential[F], Dispatcher.sequential[F]).mapN(new StreamUnicastPublisher(s, _, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can now have local Dispatchers with special sequential semantics, it seems like we should deprecate the pattern of asking for (presumably more global) dispatchers in favor of building our own dispatchers.
Furthermore, since we now rely on the sequential Dispatcher to achieve the correct semantics, exposing it as an argument is not good design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry to come in to this so late in the game after this has merged, but we use this in a Play application to stream CSV files from an fs2 Stream; the Play API requires a class that's hard to construct except by using a StreamUnicastPublisher but it doesn't give us the opportunity to call close. In previous discussions on discord, I was advised to create a Dispatcher for the application and pass it in; we can guarantee the provided Dispatcher gets closed at the end.
I see (superficially) the point of your comment here and the rationale behind deprecating the API that takes a provided Dispatcher, but I don't see how we can migrate to the non-deprecated API without leaking Dispatcher instances. I will try to find you on discord sometime to talk about it a little more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mtomko no problem, thanks for the comment.
but I don't see how we can migrate to the non-deprecated API without leaking
Dispatcherinstances.
I need to think about it more, but so long as you are careful to cancel the subscription, I don't think there will be any leak here. The Resource surrounding a Dispatcher is intended to make sure that any tasks it started are canceled when the resource closes. If a Dispatcher has no running tasks and it goes out of scope, I am fairly certain that the GC will completely clean it up. Hence, no leak.
Also @BalmungSan is currently working on some usability improvements that might help your usecase :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to trace through the code, but it means digging deeply into akka streams and Play. The code we've written just creates the source and passes it to Play's Ok to construct a result:
def streamToClientAsCsvFile(
enumerator: Stream[IO, Row],
dispatcher: Dispatcher[IO],
filename: String
): Result = {
val publisher = StreamUnicastPublisher(enumerator.through(rowToCsv), dispatcher)
val source = Source.fromPublisher(publisher)
Ok.chunked(source)
.withHeaders(CONTENT_DISPOSITION -> s"attachment; filename = $filename")
.as("text/csv")
}I would like to hope that Play is smart enough to recognize when the connection is closed and close the Source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ps, thanks for your previous response, it's really appreciated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mtomko #3107 is the PR @armanbilge mentioned. However, I am not exactly sure how it helps in your use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I am not exactly sure how it helps in your use case.
@BalmungSan @mtomko the new stream.subscribe(...) syntax that Luis is adding in #3107 will enable you to bring back the global Dispatcher.
val source = Source.fromPublisher { subscriber =>
dispatcher.unsafeRunAndForget(enumerator.subscribe(subscriber))
}| trait UnsafeTestNGSuite extends TestNGSuiteLike { | ||
|
|
||
| protected var dispatcher: Dispatcher[IO] = _ | ||
| private var shutdownDispatcher: IO[Unit] = _ | ||
|
|
||
| private val mkDispatcher = Dispatcher[IO].allocated | ||
| private val t = mkDispatcher.unsafeRunSync() | ||
| dispatcher = t._1 | ||
| shutdownDispatcher = t._2 | ||
|
|
||
| @AfterClass | ||
| def afterAll() = shutdownDispatcher.unsafeRunSync() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This machinery was no longer necessary since Dispatcher is no longer accepted as an argument.
| dispatcher <- Dispatcher[F] | ||
| readDispatcher <- Dispatcher.sequential[F] | ||
| writeDispatcher <- Dispatcher.sequential[F] | ||
| errorDispatcher <- Dispatcher.sequential[F] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read/write are sequential and error is one-off, but they need to be independent, so they need their own dispatchers.
| ProblemFilters.exclude[DirectMissingMethodProblem]( // something funky in Scala 3.2.0 ... | ||
| "fs2.io.net.SocketGroupCompanionPlatform#AsyncSocketGroup.this" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super weird, can reproduce just by updating to 3.2.0 on main. No idea ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I minimized it in armanbilge/sandbox@d9453ac. IDK what change in dotty caused it, but seems safe: seems like the private constructor was generating public bytecode prior to 3.2.0, but is now properly private. This is flagging up MiMa.
It doesn't affect anything public.
|
Lol, I mis-titled this and the commit as 3.2.0-RC1 instead of 3.4.0-RC1. oh well. |
This PR replaces deprecated
Dispatcher[IO]uses with the appropriatesequentialorparallelreplacements.Inline comments forthcoming.