Skip to content

Stream handle method#1238

Merged
fwbrasil merged 4 commits into
getkyo:mainfrom
johnhungerford:stream-handle
Jun 6, 2025
Merged

Stream handle method#1238
fwbrasil merged 4 commits into
getkyo:mainfrom
johnhungerford:stream-handle

Conversation

@johnhungerford
Copy link
Copy Markdown
Collaborator

@johnhungerford johnhungerford commented Jun 5, 2025

Addresses discussion of resource handling on discord

Problem

Handling stream effects is currently a little awkward. You have to apply a handler to the stream's emit effect and then construct a new Stream from the transformed effect. To handle resources within the scope of a stream, for instance, you would do:

val original: Stream[Int, Resource] = ???
val handled: Stream[Int, Any] = Stream(Resource.run(original.emit))

Solution

This PR adds a handle method to Stream similar to the handle method on Pending: it accepts one or more functions transforming the underlying emit effect, and constructs a new stream using the transformed effect. You can handle multiple effects as follows:

val original: Stream[Int, Resource & Var[Int] & Abort[String] = ???
val handled: Stream[Int, Any] = original.handle(
  Resource.run(_),
  Var.run(23)(_),
  Abort.run[String](_),
)

The functions are expected to evaluate to a Emit[Chunked[V1]] effect where V1 need not be the same as the original V type. It does not have to evaluate to Unit < ..., as the implementation calls .unit to ensure it will be a valid streaming effect. This means you can pass Abort.run[String](_) without having to worry about the fact that the transformed effect will evaluate to Result[Unit, String].

Notes

Copy link
Copy Markdown
Collaborator

@ahoy-jon ahoy-jon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's great.
The definition of handler in

inline def handle[B](inline f: (=> A < S) => B): B =
def handle1 = v
f(handle1)
end handle

use a call-by-name f:(=> <[A, S]) => B

It's not possible to do the same?

val streamOfResult = stream.handle(Abort.run, ???)

with something like

   def handle[V1, S1](
        f:  (=> Unit < S) => (Unit < (Emit[Chunk[V1]] & S1))
    )(using Frame): Stream[V | V1, S1]

@johnhungerford
Copy link
Copy Markdown
Collaborator Author

@ahoy-jon good point, although do you know why the parameter is call-by-name? I don't really see the point. I notice that in Pending.scala handle is an inline method. Should Stream#handle also be inline? @fwbrasil do you have some guidance here?

@ahoy-jon
Copy link
Copy Markdown
Collaborator

ahoy-jon commented Jun 5, 2025

I don't have the logic behind this call-by-name, but there might be something to do with inlining and allocations.

a potential signature could be:

 def handle[V1 >: V, S1](
        f:  (=> Unit < S) => (Unit < (Emit[Chunk[V1]] & S1))
    )(using Frame, Tag[Emit[Chunk[V1]]], Tag[Emit[Chunk[V]]]): Stream[V1, S1]

I have a case for a Stream[Int, IO & Abort] (for Stream.fromIteratorCatching), that could be done like that:

stream.handle(
 _.fold(
   onSuccess = () => Chunk.empty, 
   onFail = _ => Chunk(-1), 
   onPanic = _ => Chunk(-2)
 ), 
  _.map(chunk => Emit.value(chunk))
)

@fwbrasil
Copy link
Copy Markdown
Collaborator

fwbrasil commented Jun 5, 2025

@ahoy-jon good point, although do you know why the parameter is call-by-name? I don't really see the point. I notice that in Pending.scala handle is an inline method. Should Stream#handle also be inline? @fwbrasil do you have some guidance here?

The by-name param was introduced to allow Abort to handle the case when the computation panics without ever suspending. For example, ((throw ex): Int < Any).handle(Abort.run) produces Result.Panic(ex) instead of just throwing. Ideally, users should ensure code that may fail is suspended but we try to provide some built-in safety in APIs where possible. I think we should do the same for stream.handle.

Regarding inline, I don't think we need it. Streams are typically built once and reused multiple times so the potential improvement is limited.

I have a case for a Stream[Int, IO & Abort] (for Stream.fromIteratorCatching), that could be done like that:

We recommend handle only for effect handling, not regular transformations. I think it's important to avoid confusion and reduce the ways of doing the same thing.

@johnhungerford
Copy link
Copy Markdown
Collaborator Author

I updated Stream#handle to accept functions with call-by-name inputs

@fwbrasil fwbrasil merged commit 60e7329 into getkyo:main Jun 6, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants