[kernel][core] more consistent sync/async collection operations#1086
Conversation
| val reader = new BufferedReader(new InputStreamReader(stream)) | ||
| reader.lines() | ||
| .filter(line => line.endsWith(".class")) | ||
| Files.list(Path.of(getClass.getResource(".").getPath().toString().replace("test-", ""))) |
There was a problem hiding this comment.
When working on this change, I noticed that the kyo-bench tests weren't executing. For some reason this code wasn't able to find the classes anymore. I thought the issue was the package rename to kyo.bench.arena but even moving the files back doesn't fix it. I updated the code to find the class files but removing the test- prefix. This makes the tests work in sbt but not in VSCode but it seems ok.
| val javaBin = System.getProperty("java.home") + "/bin/java" | ||
| val classpath = System.getProperty("java.class.path") | ||
| val command = List(javaBin, "-cp", classpath, "kyo.bench.TestHttpServer", concurrency.toString) | ||
| val command = List(javaBin, "-cp", classpath, "kyo.bench.arena.TestHttpServer", concurrency.toString) |
There was a problem hiding this comment.
this was broken since the package rename
| import AllowUnsafe.embrace.danger | ||
| given Frame = Frame.internal | ||
| IO.Unsafe.evalOrThrow(System.property[Int]("kyo.async.concurrency.default", Runtime.getRuntime().availableProcessors() * 2)) | ||
| end defaultConcurrency |
There was a problem hiding this comment.
I'm following a more explicit approach where the user has to override the behavior in each method call otherwise the execution falls back to cores * 2. The most problematic cases I've seen in production in similar scenarios are collections with hundreds of elements so this default bound should be enough to prevent issues.
I'm not 100% happy with the approach but it seems a good improvement that we can iterate on later. It seems worth exploring how it'd look like with the concurrency maintained via a Local as well.
| * @return | ||
| * Chunk containing results in the original sequence order | ||
| */ | ||
| def foreachIndexed[E, A, B: Flat, S]( |
There was a problem hiding this comment.
The new collection methods similar to the Kyo companion object start here. I've decided to make all of them rely only foreachIndexed but there's potential for optimization leveraging the characteristics of each method.
There was a problem hiding this comment.
foreachDiscard seems like the one case this is very true.
| def collect[E, A, B: Flat, S]( | ||
| using isolate: Isolate.Stateful[S, Abort[E] & Async] | ||
| )(seq: Seq[A], concurrency: Int = defaultConcurrency)( | ||
| f: A => Maybe[B] < (Abort[E] & Async & S) |
There was a problem hiding this comment.
I wasn't sure if I should use a PartialFunction like kyo-combinators does or a function returning Maybe. I think the version with Maybe is more flexible since it allows the "filtering" to also perform effects. cc/ @johnhungerford
There was a problem hiding this comment.
Partial functions are really great for most simple cases, but I agree that Maybe is more flexible. If we have to choose one or the other, Maybe is probably better.
| * Chunk containing results in the original sequence order | ||
| */ | ||
| def parallel[E, A: Flat, S]( | ||
| def collectAll[E, A: Flat, S]( |
There was a problem hiding this comment.
Renamed following the naming pattern in ZIO.
|
|
||
| /** Executes two computations in parallel and returns their results as a tuple. | ||
| */ | ||
| inline def zip[E, A1: Flat, A2: Flat, S]( |
There was a problem hiding this comment.
I've renamed the vararg parallel methods to zip like in the Kyo companion
|
|
||
| /** Executes eight computations in parallel and returns their results as a tuple. | ||
| */ | ||
| inline def zip[E, A1: Flat, A2: Flat, A3: Flat, A4: Flat, A5: Flat, A6: Flat, A7: Flat, A8: Flat, S]( |
There was a problem hiding this comment.
zip for up to 8 elements now
| * A Fiber that completes with the result of the first Fiber to complete | ||
| */ | ||
| def race[E, A: Flat, S]( | ||
| private[kyo] def race[E, A: Flat, S]( |
There was a problem hiding this comment.
I think having the same methods in Async and Fiber can be confusing so I've decided to remove some and only keep the ones needed to implement the Async methods as private[kyo]. It limits the functionality but it's also easy to go back to a Fiber via Async.run.
| /** Folds a Result into a value, with separate handling for errors and successes. | ||
| * | ||
| * @param ifError | ||
| * @param onError |
There was a problem hiding this comment.
minor unrelated fix
| * A computation that directly produces the given value without suspension | ||
| */ | ||
| inline def pure[A, S](inline v: A): A < S = v | ||
| inline def lift[A, S](inline v: A): A < S = v |
There was a problem hiding this comment.
I think pure was a bit opaque for people not used to FP.
There was a problem hiding this comment.
If we are choosing similar naming to ZIO, what do you think about succeed?
There was a problem hiding this comment.
I think succeed makes more sense in a monad that includes error handling by default. In this case, lift seems more appropriate.
| inline def unit[S]: Unit < S = () | ||
|
|
||
| /** Zips two effects into a tuple. | ||
| * |
There was a problem hiding this comment.
some cleanup to avoid the file getting too large with the new versions with up to 8 elements
| * A new effect that produces a Chunk of repeated values | ||
| */ | ||
| def fill[A, S](n: Int)(v: Safepoint ?=> A < S)(using Frame, Safepoint): Chunk[A] < S = | ||
| def repeat[A, S](n: Int)(v: Safepoint ?=> A < S)(using Frame, Safepoint): Chunk[A] < S = |
There was a problem hiding this comment.
Kyo.repeat/Async.repeat seem to be more clear
| case size => | ||
| isolate.capture { state => | ||
| val groupSize = Math.ceil(size.toDouble / Math.max(1, concurrency)).toInt | ||
| Fiber.foreachIndexed(seq.grouped(groupSize).toSeq)((idx, seq) => |
There was a problem hiding this comment.
I think we need to add support for non-seq collections. We may need a typeclass if we want to support only immutable collections for compatibility with multi-shot continuations.
There was a problem hiding this comment.
Multi-shot continuations aren't a concern here since there's IO suspension. I'm not a fan of supporting mutable collections but I think other effect systems allow them, right? It might be important for usability.
There was a problem hiding this comment.
Yes, we need to support more collections for usability.
| */ | ||
| def foreachIndexed[E, A, B: Flat, S]( | ||
| using isolate: Isolate.Stateful[S, Abort[E] & Async] | ||
| )(seq: Seq[A], concurrency: Int = defaultConcurrency)(f: (Int, A) => B < (Abort[E] & Async & S))(using |
There was a problem hiding this comment.
Should we have an IndexedFunction type to avoid boxing?
There was a problem hiding this comment.
I think we can wait the compiler to implement specialization unless we see it introducing overhead. The JIT can sometimes avoid the allocation.
| * @return | ||
| * Chunk containing results of all iterations | ||
| */ | ||
| def repeat[E, A: Flat, S]( |
There was a problem hiding this comment.
This seems useful. Should we call it fill?
There was a problem hiding this comment.
This operation was actually called fill in the Kyo companion. I've renamed it repeat because I thought it was a better name indicating that the computation will be executed multiple times. I'd be ok with changing it back to fill here and in the Kyo companion.
There was a problem hiding this comment.
How about replicate? I find repeat somewhat overlapping with Scheduling/repeating a kyo without saving a collection.
There was a problem hiding this comment.
I ended up renaming it back to fill. The similarity with Scala collection APIs should be helpful too
| )(seq: Seq[A < (Abort[E] & Async & S)])( | ||
| private[kyo] def foreachIndexed[A, B: Flat, E, S]( | ||
| using isolate: Isolate.Contextual[S, IO] | ||
| )(seq: Iterable[A])(f: (Int, A) => B < (Abort[E] & Async & S))( |
| def race[E, A: Flat, S]( | ||
| using isolate: Isolate.Stateful[S, Abort[E] & Async] | ||
| )(seq: Seq[A < (Abort[E] & Async & S)])( | ||
| )(seq: Iterable[A < (Abort[E] & Async & S)])( |
There was a problem hiding this comment.
same as elsewhere, could use rename of seq
| case size => | ||
| isolate.capture { state => | ||
| val groupSize = Math.ceil(size.toDouble / Math.max(1, concurrency)).toInt | ||
| Fiber.foreachIndexed(seq.grouped(groupSize).toSeq)((idx, group) => |
There was a problem hiding this comment.
does this need to be converted to a Seq now?
There was a problem hiding this comment.
yep, group returns an Iterator. I tried accepting IterableOnce instead of Iterable, which would support Iterator, but then it's not possible to get the size to initialize data structures in Fiber.* methods since it allows traversing only once.
### Problem I noticed a few builds failing after #1086. This error should only happen if a `Safepoint` from one thread leaks to the execution in another thread via implicits, introducing concurrency issues since the `Trace` management is meant to used by its owning thread only and isn't synchronized: ``` [info] - no buffer *** FAILED *** (6 milliseconds) [info] java.lang.NullPointerException: Cannot invoke "kyo.kernel.internal.Trace.frames()" because "newTrace" is null [info] at kyo.kernel.internal.Trace$Owner.copyTrace(Trace.scala:60) [info] at kyo.Fiber$package$Fiber$$anon$33$$anon$34.loop$8(Fiber.scala:497) [info] at kyo.Fiber$package$Fiber$$anon$33$$anon$34.apply(Fiber.scala:661) [info] at kyo.Fiber$package$Fiber$$anon$33$$anon$34.apply(Fiber.scala:495) [info] at kyo.Async$package$Async$$anon$15.apply(Async.scala:363) [info] at kyo.scheduler.IOTask.partialLoop$1(IOTask.scala:42) [info] at kyo.scheduler.IOTask.eval(IOTask.scala:66) [info] at kyo.scheduler.IOTask.run(IOTask.scala:78) [info] at kyo.scheduler.Worker.runTask(Worker.scala:257) [info] at kyo.scheduler.Worker.run(Worker.scala:219) ``` https://github.com/getkyo/kyo/actions/runs/13888866130/job/38857394040#step:6:5899 ### Solution I believe the cause of this new issue is that I removed an `IO` suspension that seemed to be unnecessary in `Fiber.foreachIndexed`. The suspension was actually hiding potential safepoint leaks since suspensions automatically restore the correct safepoint. This PR keeps `Fiber.foreachIndexed` without the additional suspension, removes similar suspensions from other methods, and ensures the methods can't use a leaked safepoint by aways getting the instance from the thread local via `Safepoint.get`. I can't reproduce the issue locally to validate and writing a unit test is challenging without exposing new `Safepoint` APIs but this seems a solid attempt at fixing the issue.
Problem
Kyo's
Asyncmethod aren't very convenient to deal with collections.Solution
Provide a new set of methods resembling the collection operations in the kernel's
Kyocompanion object.Notes
Kyo.scalamethod in the internal package of the kernel. I renamed it toKyoInternal.scalato avoid confusion