Skip to content

[kernel][core] more consistent sync/async collection operations#1086

Merged
fwbrasil merged 8 commits into
mainfrom
collection-ops
Mar 17, 2025
Merged

[kernel][core] more consistent sync/async collection operations#1086
fwbrasil merged 8 commits into
mainfrom
collection-ops

Conversation

@fwbrasil
Copy link
Copy Markdown
Collaborator

Problem

Kyo's Async method aren't very convenient to deal with collections.

Solution

Provide a new set of methods resembling the collection operations in the kernel's Kyo companion object.

Notes

  • There was another Kyo.scala method in the internal package of the kernel. I renamed it to KyoInternal.scala to avoid confusion
  • I'll provide more details on the changes in code comments.

val reader = new BufferedReader(new InputStreamReader(stream))
reader.lines()
.filter(line => line.endsWith(".class"))
Files.list(Path.of(getClass.getResource(".").getPath().toString().replace("test-", "")))
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working on this change, I noticed that the kyo-bench tests weren't executing. For some reason this code wasn't able to find the classes anymore. I thought the issue was the package rename to kyo.bench.arena but even moving the files back doesn't fix it. I updated the code to find the class files but removing the test- prefix. This makes the tests work in sbt but not in VSCode but it seems ok.

val javaBin = System.getProperty("java.home") + "/bin/java"
val classpath = System.getProperty("java.class.path")
val command = List(javaBin, "-cp", classpath, "kyo.bench.TestHttpServer", concurrency.toString)
val command = List(javaBin, "-cp", classpath, "kyo.bench.arena.TestHttpServer", concurrency.toString)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was broken since the package rename

import AllowUnsafe.embrace.danger
given Frame = Frame.internal
IO.Unsafe.evalOrThrow(System.property[Int]("kyo.async.concurrency.default", Runtime.getRuntime().availableProcessors() * 2))
end defaultConcurrency
Copy link
Copy Markdown
Collaborator Author

@fwbrasil fwbrasil Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following a more explicit approach where the user has to override the behavior in each method call otherwise the execution falls back to cores * 2. The most problematic cases I've seen in production in similar scenarios are collections with hundreds of elements so this default bound should be enough to prevent issues.

I'm not 100% happy with the approach but it seems a good improvement that we can iterate on later. It seems worth exploring how it'd look like with the concurrency maintained via a Local as well.

* @return
* Chunk containing results in the original sequence order
*/
def foreachIndexed[E, A, B: Flat, S](
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new collection methods similar to the Kyo companion object start here. I've decided to make all of them rely only foreachIndexed but there's potential for optimization leveraging the characteristics of each method.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreachDiscard seems like the one case this is very true.

def collect[E, A, B: Flat, S](
using isolate: Isolate.Stateful[S, Abort[E] & Async]
)(seq: Seq[A], concurrency: Int = defaultConcurrency)(
f: A => Maybe[B] < (Abort[E] & Async & S)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if I should use a PartialFunction like kyo-combinators does or a function returning Maybe. I think the version with Maybe is more flexible since it allows the "filtering" to also perform effects. cc/ @johnhungerford

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial functions are really great for most simple cases, but I agree that Maybe is more flexible. If we have to choose one or the other, Maybe is probably better.

* Chunk containing results in the original sequence order
*/
def parallel[E, A: Flat, S](
def collectAll[E, A: Flat, S](
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed following the naming pattern in ZIO.


/** Executes two computations in parallel and returns their results as a tuple.
*/
inline def zip[E, A1: Flat, A2: Flat, S](
Copy link
Copy Markdown
Collaborator Author

@fwbrasil fwbrasil Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed the vararg parallel methods to zip like in the Kyo companion


/** Executes eight computations in parallel and returns their results as a tuple.
*/
inline def zip[E, A1: Flat, A2: Flat, A3: Flat, A4: Flat, A5: Flat, A6: Flat, A7: Flat, A8: Flat, S](
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zip for up to 8 elements now

* A Fiber that completes with the result of the first Fiber to complete
*/
def race[E, A: Flat, S](
private[kyo] def race[E, A: Flat, S](
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the same methods in Async and Fiber can be confusing so I've decided to remove some and only keep the ones needed to implement the Async methods as private[kyo]. It limits the functionality but it's also easy to go back to a Fiber via Async.run.

/** Folds a Result into a value, with separate handling for errors and successes.
*
* @param ifError
* @param onError
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor unrelated fix

* A computation that directly produces the given value without suspension
*/
inline def pure[A, S](inline v: A): A < S = v
inline def lift[A, S](inline v: A): A < S = v
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pure was a bit opaque for people not used to FP.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are choosing similar naming to ZIO, what do you think about succeed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think succeed makes more sense in a monad that includes error handling by default. In this case, lift seems more appropriate.

inline def unit[S]: Unit < S = ()

/** Zips two effects into a tuple.
*
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some cleanup to avoid the file getting too large with the new versions with up to 8 elements

* A new effect that produces a Chunk of repeated values
*/
def fill[A, S](n: Int)(v: Safepoint ?=> A < S)(using Frame, Safepoint): Chunk[A] < S =
def repeat[A, S](n: Int)(v: Safepoint ?=> A < S)(using Frame, Safepoint): Chunk[A] < S =
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kyo.repeat/Async.repeat seem to be more clear

@fwbrasil fwbrasil requested a review from hearnadam March 14, 2025 02:42
case size =>
isolate.capture { state =>
val groupSize = Math.ceil(size.toDouble / Math.max(1, concurrency)).toInt
Fiber.foreachIndexed(seq.grouped(groupSize).toSeq)((idx, seq) =>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add support for non-seq collections. We may need a typeclass if we want to support only immutable collections for compatibility with multi-shot continuations.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multi-shot continuations aren't a concern here since there's IO suspension. I'm not a fan of supporting mutable collections but I think other effect systems allow them, right? It might be important for usability.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to support more collections for usability.

*/
def foreachIndexed[E, A, B: Flat, S](
using isolate: Isolate.Stateful[S, Abort[E] & Async]
)(seq: Seq[A], concurrency: Int = defaultConcurrency)(f: (Int, A) => B < (Abort[E] & Async & S))(using
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an IndexedFunction type to avoid boxing?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can wait the compiler to implement specialization unless we see it introducing overhead. The JIT can sometimes avoid the allocation.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

* @return
* Chunk containing results of all iterations
*/
def repeat[E, A: Flat, S](
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems useful. Should we call it fill?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation was actually called fill in the Kyo companion. I've renamed it repeat because I thought it was a better name indicating that the computation will be executed multiple times. I'd be ok with changing it back to fill here and in the Kyo companion.

Copy link
Copy Markdown
Collaborator

@hearnadam hearnadam Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replicate? I find repeat somewhat overlapping with Scheduling/repeating a kyo without saving a collection.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up renaming it back to fill. The similarity with Scala collection APIs should be helpful too

)(seq: Seq[A < (Abort[E] & Async & S)])(
private[kyo] def foreachIndexed[A, B: Flat, E, S](
using isolate: Isolate.Contextual[S, IO]
)(seq: Iterable[A])(f: (Int, A) => B < (Abort[E] & Async & S))(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no longer seq

def race[E, A: Flat, S](
using isolate: Isolate.Stateful[S, Abort[E] & Async]
)(seq: Seq[A < (Abort[E] & Async & S)])(
)(seq: Iterable[A < (Abort[E] & Async & S)])(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as elsewhere, could use rename of seq

case size =>
isolate.capture { state =>
val groupSize = Math.ceil(size.toDouble / Math.max(1, concurrency)).toInt
Fiber.foreachIndexed(seq.grouped(groupSize).toSeq)((idx, group) =>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be converted to a Seq now?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, group returns an Iterator. I tried accepting IterableOnce instead of Iterable, which would support Iterator, but then it's not possible to get the size to initialize data structures in Fiber.* methods since it allows traversing only once.

@fwbrasil fwbrasil merged commit 566ddea into main Mar 17, 2025
@fwbrasil fwbrasil deleted the collection-ops branch March 17, 2025 00:19
fwbrasil added a commit that referenced this pull request Mar 18, 2025
### Problem

I noticed a few builds failing after
#1086. This error should only happen
if a `Safepoint` from one thread leaks to the execution in another
thread via implicits, introducing concurrency issues since the `Trace`
management is meant to used by its owning thread only and isn't
synchronized:

```
[info] - no buffer *** FAILED *** (6 milliseconds)
[info]   java.lang.NullPointerException: Cannot invoke "kyo.kernel.internal.Trace.frames()" because "newTrace" is null
[info]   at kyo.kernel.internal.Trace$Owner.copyTrace(Trace.scala:60)
[info]   at kyo.Fiber$package$Fiber$$anon$33$$anon$34.loop$8(Fiber.scala:497)
[info]   at kyo.Fiber$package$Fiber$$anon$33$$anon$34.apply(Fiber.scala:661)
[info]   at kyo.Fiber$package$Fiber$$anon$33$$anon$34.apply(Fiber.scala:495)
[info]   at kyo.Async$package$Async$$anon$15.apply(Async.scala:363)
[info]   at kyo.scheduler.IOTask.partialLoop$1(IOTask.scala:42)
[info]   at kyo.scheduler.IOTask.eval(IOTask.scala:66)
[info]   at kyo.scheduler.IOTask.run(IOTask.scala:78)
[info]   at kyo.scheduler.Worker.runTask(Worker.scala:257)
[info]   at kyo.scheduler.Worker.run(Worker.scala:219)
```

https://github.com/getkyo/kyo/actions/runs/13888866130/job/38857394040#step:6:5899


### Solution

I believe the cause of this new issue is that I removed an `IO`
suspension that seemed to be unnecessary in `Fiber.foreachIndexed`. The
suspension was actually hiding potential safepoint leaks since
suspensions automatically restore the correct safepoint. This PR keeps
`Fiber.foreachIndexed` without the additional suspension, removes
similar suspensions from other methods, and ensures the methods can't
use a leaked safepoint by aways getting the instance from the thread
local via `Safepoint.get`.

I can't reproduce the issue locally to validate and writing a unit test
is challenging without exposing new `Safepoint` APIs but this seems a
solid attempt at fixing the issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants