-
Notifications
You must be signed in to change notification settings - Fork 560
Description
In our application, an OutOfMemoryError raised inside a fiber created from a CompletableFuture does not crash the JVM process. Instead, the error is caught and returned as a failed fiber outcome.
This differs from the behavior when the same error is thrown directly from an IO, where the error bubbles up and terminates the process as expected.
Example:
import cats.effect._
import cats.implicits._
import java.util.concurrent.{CompletableFuture, Executor, Executors}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
Resource.fromAutoCloseable(IO(Executors.newFixedThreadPool(1))).use { executor =>
for {
pingFiber <- pingIO.start
// _ <- boomFromIO.start // -> crashes the app (expected)
_ <- boomFromCompletableFuture(ExecutionContext.fromExecutor(executor)).start // -> does NOT crash the app
_ <- pingFiber.join
} yield ExitCode.Success
}
private val pingIO =
(IO.println("ping") *> IO.sleep(1.seconds)).foreverM
private def boomFromIO: IO[Unit] = IO {
println("Waiting 2 seconds before boom...")
Thread.sleep(2000)
println("Gonna boom!")
throw new OutOfMemoryError("Boom!")
}
private def boomFromCompletableFuture(executor: Executor): IO[Unit] =
IO.fromCompletableFuture(IO(CompletableFuture.runAsync(() => {
println("Waiting 2 seconds before boom...")
Thread.sleep(2000)
println("Gonna boom!")
throw new OutOfMemoryError("Boom!")
}, executor))).void
}Analysis:
The difference seems to come from IO.fromCompletableFuture, which relies on CompletableFuture.handle. Since handle catches all Throwable, the OutOfMemoryError ends up wrapped in the failed outcome instead of escaping and crashing the process.
Question:
Is this the intended behavior? If not, should fromCompletableFuture avoid intercepting fatal errors like OutOfMemoryError to align with how IO behaves?
Notes:
I experimented by modifying the implementation to re-surface fatal errors in onError, and in that case the OutOfMemoryError bubbled up as expected:
def fromCompletableFuture[F[_], A](fut: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] = F.cont {
new Cont[F, A, A] {
def apply[G[_]](implicit G: MonadCancelThrow[G]): (Either[Throwable, A] => Unit, G[A], F ~> G) => G[A] = {
(resume, get, lift) =>
G.uncancelable { poll =>
G.flatMap(poll(lift(fut))) { cf =>
val go = F.delay {
cf.handle[Unit] {
case (a, null) => resume(Right(a))
case (_, NonFatal(t)) =>
resume(Left(t match {
case e: CompletionException if e.getCause ne null => e.getCause
case _ => t
}))
}
}
val await = G.onCancel(
poll(get.onError(_ => G.unit)), // re-surface the OutOfMemoryError to main IO
// if cannot cancel, fallback to get
G.ifM(lift(F.delay(cf.cancel(true))))(G.unit, G.void(get))
)
G.productR(lift(go))(await)
}
}
}
}
}Metadata
Metadata
Assignees
Labels
Type
Projects
Status