Skip to content

OutOfMemoryError not propagated when IO originates from CompletableFuture #4505

@tpetillot

Description

@tpetillot

In our application, an OutOfMemoryError raised inside a fiber created from a CompletableFuture does not crash the JVM process. Instead, the error is caught and returned as a failed fiber outcome.

This differs from the behavior when the same error is thrown directly from an IO, where the error bubbles up and terminates the process as expected.

Example:

import cats.effect._
import cats.implicits._

import java.util.concurrent.{CompletableFuture, Executor, Executors}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] =
    Resource.fromAutoCloseable(IO(Executors.newFixedThreadPool(1))).use { executor =>
      for {
        pingFiber <- pingIO.start
        // _ <- boomFromIO.start // -> crashes the app (expected)
        _ <- boomFromCompletableFuture(ExecutionContext.fromExecutor(executor)).start // -> does NOT crash the app
        _ <- pingFiber.join
      } yield ExitCode.Success
    }

  private val pingIO =
    (IO.println("ping") *> IO.sleep(1.seconds)).foreverM

  private def boomFromIO: IO[Unit] = IO {
    println("Waiting 2 seconds before boom...")
    Thread.sleep(2000)
    println("Gonna boom!")
    throw new OutOfMemoryError("Boom!")
  }

  private def boomFromCompletableFuture(executor: Executor): IO[Unit] =
    IO.fromCompletableFuture(IO(CompletableFuture.runAsync(() => {
      println("Waiting 2 seconds before boom...")
      Thread.sleep(2000)
      println("Gonna boom!")
      throw new OutOfMemoryError("Boom!")
    }, executor))).void
}

Analysis:
The difference seems to come from IO.fromCompletableFuture, which relies on CompletableFuture.handle. Since handle catches all Throwable, the OutOfMemoryError ends up wrapped in the failed outcome instead of escaping and crashing the process.

Question:
Is this the intended behavior? If not, should fromCompletableFuture avoid intercepting fatal errors like OutOfMemoryError to align with how IO behaves?

Notes:
I experimented by modifying the implementation to re-surface fatal errors in onError, and in that case the OutOfMemoryError bubbled up as expected:

def fromCompletableFuture[F[_], A](fut: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] = F.cont {
    new Cont[F, A, A] {
      def apply[G[_]](implicit G: MonadCancelThrow[G]): (Either[Throwable, A] => Unit, G[A], F ~> G) => G[A] = {
        (resume, get, lift) =>
          G.uncancelable { poll =>
            G.flatMap(poll(lift(fut))) { cf =>
              val go = F.delay {
                cf.handle[Unit] {
                  case (a, null) => resume(Right(a))
                  case (_, NonFatal(t)) =>
                    resume(Left(t match {
                      case e: CompletionException if e.getCause ne null => e.getCause
                      case _ => t
                    }))
                }
              }

              val await = G.onCancel(
                poll(get.onError(_ => G.unit)), // re-surface the OutOfMemoryError to main IO
                // if cannot cancel, fallback to get
                G.ifM(lift(F.delay(cf.cancel(true))))(G.unit, G.void(get))
              )

              G.productR(lift(go))(await)
            }
          }
      }
    }
  }

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    Status

    No status

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions