Skip to content

Conversation

@BalmungSan
Copy link
Contributor

@BalmungSan BalmungSan commented Jan 7, 2023

Adds support to converting Streams back and forth with Java 9+ Flow types.

Roadmap

  • Copy and paste the reactive-streams implementation inside core using Flow types instead.
  • Clean up the new versions.
  • Define an appropriate package for the new classes.
  • Add tests for the new classes.
  • Deprecate the reactive-streams module

@armanbilge
Copy link
Member

Looks like reactive streams has also published a testkit for the Java Flow counterparts.
https://www.reactive-streams.org/reactive-streams-tck-flow-1.0.2-javadoc/org/reactivestreams/tck/flow/package-summary.html

@BalmungSan BalmungSan force-pushed the support-java-flow branch 2 times, most recently from 4d14048 to 3d43160 Compare January 8, 2023 02:01
@BalmungSan BalmungSan changed the title WIP: Add interop with Java Flow types Add interop with Java Flow types Jan 8, 2023
@BalmungSan BalmungSan marked this pull request as ready for review January 8, 2023 22:09
@BalmungSan BalmungSan requested a review from armanbilge January 8, 2023 22:09
@BalmungSan
Copy link
Contributor Author

Deprecate the reactive-streams module?

So, do we want to deprecate those or not?
And if so, what should be the message?

@armanbilge
Copy link
Member

So, do we want to deprecate those or not?

Unfortunately I don't think we should "hard" deprecate it, because it leaves Java 8 users out in the cold.

Instead I propose a "soft" deprecation (like we did for Cats Future instances) that only appears in Scaladocs, not to the compiler.

The message can say that they will be removed in the next major version of FS2, and that the converters in this new package are the replacement.

@armanbilge
Copy link
Member

Published as 3.4.0-59-7543784-SNAPSHOT for downstream testing.

@Jasper-M
Copy link
Contributor

Jasper-M commented Jan 9, 2023

The message can say that they will be removed in the next major version of FS2, and that the converters in this new package are the replacement.

How would one interop with an API that is not updated to Java Flow in that scenario?

@BalmungSan
Copy link
Contributor Author

@Jasper-M the same one interop with a Flow API in this moment, using the FlowAdapters provided by reactive-streams: https://www.reactive-streams.org/reactive-streams-flow-adapters-1.0.2-javadoc/org/reactivestreams/FlowAdapters.html
(just that it would be backwards in this case).

The rationale to prefer this approach is:

  1. In theory, all APIs that expose reactive-streams should move to Flow types once they support Java 9.

This means that there will be a migratory period, while libraries move to adopt the new types in the JDK, however this period is expected to be short - due to the full semantic equivalence of the libraries, as well as the Reactive Streams <-> Flow adapter library as well as a TCK compatible directly with the JDK Flow types.
-- https://www.reactive-streams.org/

  1. Right now, if you need to interop only with Flow types, you still need to include the reactive-streams library in your classpath. Whereas, with this new approach you won't be adding a new dependency.

Comment on lines 60 to 63
def fromPublisher[F[_], A](
publisher: Publisher[A],
bufferSize: Int
)(implicit
Copy link
Member

@armanbilge armanbilge Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luis and I discussed this extensively offline. Unfortunately this current API is not usable as-is in the http4s-jdk-http-client, because it needs to run an effect in its Publisher. The workaround would be to unsafeRunSync() said effect.

However, noting that a Publisher[A] is just a @FunctionalInterface for Subscriber[A] => Unit which is a less ergonomic version of Subscriber[A] => F[Unit], I think we should change the signature to something like this:

def fromPublisher[F[_], A](bufferSize: Int)(
  subscribe: Subscription[A] => F[Unit]
)(implicit F: Async[F]): Stream[F, A]

This also has a very nice parallel to existing APIs in FS2.

def readOutputStream[F[_]: Async](
chunkSize: Int
)(
f: OutputStream => F[Unit]
): Stream[F, Byte] = {

Copy link
Contributor Author

@BalmungSan BalmungSan Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a couple of alternative APIs for this kind of use cases.

Comment on lines 74 to 84
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {

/** Creates a [[Stream]] from an [[Publisher]].
*
* @param bufferSize setup the number of elements asked each time from the [[Publisher]].
* A high number can be useful is the publisher is triggering from IO,
* like requesting elements from a database.
* The publisher can use this `bufferSize` to query elements in batch.
* A high number will also lead to more elements in memory.
*/
def toStream[F[_]](bufferSize: Int)(implicit F: Async[F]): Stream[F, A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit skeptical if we should expose this, for the same reasons we don't expose a toIO syntax for Future. Specifically, the signature does not ask for a raw Future, but a suspended one:

object IO {
  def fromFuture[A](fut: IO[Future[A]]): IO[A]
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, AFAIK a Publisher is just a description, you can create one without needing to run any side effects.

@BalmungSan BalmungSan force-pushed the support-java-flow branch 2 times, most recently from 6e547d8 to ef9304a Compare January 9, 2023 17:30
@BalmungSan BalmungSan requested a review from armanbilge January 9, 2023 17:32

import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the implicit classes to here from the package because:

  1. To fix some annoying error with the doctest on 2.12
  2. IMHO, is better not to import both the explicit methods and the syntax when doing import fs2.inteorp.flow._
  3. To follow the standard of other libraries like cats and cats-effect

Co-authored-by: Arman Bilge <armanbilge@gmail.com>
Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work, this is such a nice iteration on the reactive-streams module.

I'm going to publish one more snapshot. Update: 3.5.0-12-6d1e0b0-SNAPSHOT

@armanbilge
Copy link
Member

31cf927 will be a follow-up PR :)

@armanbilge
Copy link
Member

Looks like there is a defaultBufferSize(), should we make use of this anywhere?

@BalmungSan
Copy link
Contributor Author

Looks like there is a defaultBufferSize(), should we make use of this anywhere?

Oh, I didn't see that one.
I actually thought of proposing a default for bufferSize but I wasn't sure which may be a good value.

IMHO, we should use this one on all public APIs

@mpilquist mpilquist merged commit a04e1be into typelevel:main Feb 7, 2023
@BalmungSan BalmungSan deleted the support-java-flow branch February 7, 2023 03:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants