-
Notifications
You must be signed in to change notification settings - Fork 621
Add interop with Java Flow types #3102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Looks like reactive streams has also published a testkit for the Java Flow counterparts. |
4d14048 to
3d43160
Compare
3d43160 to
df588d3
Compare
df588d3 to
dcc58ca
Compare
So, do we want to deprecate those or not? |
ec27e2b to
7543784
Compare
Unfortunately I don't think we should "hard" deprecate it, because it leaves Java 8 users out in the cold. Instead I propose a "soft" deprecation (like we did for Cats The message can say that they will be removed in the next major version of FS2, and that the converters in this new package are the replacement. |
|
Published as |
How would one interop with an API that is not updated to Java Flow in that scenario? |
|
@Jasper-M the same one interop with a The rationale to prefer this approach is:
|
| def fromPublisher[F[_], A]( | ||
| publisher: Publisher[A], | ||
| bufferSize: Int | ||
| )(implicit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Luis and I discussed this extensively offline. Unfortunately this current API is not usable as-is in the http4s-jdk-http-client, because it needs to run an effect in its Publisher. The workaround would be to unsafeRunSync() said effect.
However, noting that a Publisher[A] is just a @FunctionalInterface for Subscriber[A] => Unit which is a less ergonomic version of Subscriber[A] => F[Unit], I think we should change the signature to something like this:
def fromPublisher[F[_], A](bufferSize: Int)(
subscribe: Subscription[A] => F[Unit]
)(implicit F: Async[F]): Stream[F, A]This also has a very nice parallel to existing APIs in FS2.
fs2/io/jvm/src/main/scala/fs2/io/ioplatform.scala
Lines 67 to 71 in 19e691f
| def readOutputStream[F[_]: Async]( | |
| chunkSize: Int | |
| )( | |
| f: OutputStream => F[Unit] | |
| ): Stream[F, Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed a couple of alternative APIs for this kind of use cases.
| implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { | ||
|
|
||
| /** Creates a [[Stream]] from an [[Publisher]]. | ||
| * | ||
| * @param bufferSize setup the number of elements asked each time from the [[Publisher]]. | ||
| * A high number can be useful is the publisher is triggering from IO, | ||
| * like requesting elements from a database. | ||
| * The publisher can use this `bufferSize` to query elements in batch. | ||
| * A high number will also lead to more elements in memory. | ||
| */ | ||
| def toStream[F[_]](bufferSize: Int)(implicit F: Async[F]): Stream[F, A] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit skeptical if we should expose this, for the same reasons we don't expose a toIO syntax for Future. Specifically, the signature does not ask for a raw Future, but a suspended one:
object IO {
def fromFuture[A](fut: IO[Future[A]]): IO[A]
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, AFAIK a Publisher is just a description, you can create one without needing to run any side effects.
502046c to
ec217bc
Compare
core/jvm/src/main/scala/fs2/interop/flow/StreamUnicastPublisher.scala
Outdated
Show resolved
Hide resolved
ec217bc to
d6651d8
Compare
6e547d8 to
ef9304a
Compare
55841a0 to
ba142a6
Compare
|
|
||
| import java.util.concurrent.Flow.{Publisher, Subscriber} | ||
|
|
||
| object syntax { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the implicit classes to here from the package because:
- To fix some annoying error with the doctest on
2.12 - IMHO, is better not to import both the explicit methods and the syntax when doing
import fs2.inteorp.flow._ - To follow the standard of other libraries like cats and cats-effect
d08a4d9 to
29df7a8
Compare
ee3f349 to
c11a05a
Compare
c11a05a to
61061d9
Compare
reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Arman Bilge <armanbilge@gmail.com>
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work, this is such a nice iteration on the reactive-streams module.
I'm going to publish one more snapshot. Update: 3.5.0-12-6d1e0b0-SNAPSHOT
31cf927 to
6d1e0b0
Compare
|
31cf927 will be a follow-up PR :) |
|
Looks like there is a |
Oh, I didn't see that one. IMHO, we should use this one on all public APIs |
8835630 to
550f7cb
Compare
Adds support to converting
Streamsback and forth with Java 9+Flowtypes.Roadmap
reactive-streamsimplementation insidecoreusingFlowtypes instead.reactive-streamsmodule