Skip to content

Fix error reporting in Schedulers#2036

Open
mikkolaj wants to merge 10 commits into
monix:mainfrom
AVSystem:fix-error-reporting-to-upstream
Open

Fix error reporting in Schedulers#2036
mikkolaj wants to merge 10 commits into
monix:mainfrom
AVSystem:fix-error-reporting-to-upstream

Conversation

@mikkolaj

Copy link
Copy Markdown

Original PR: AVSystem#15

Summary

In JDK 25 ForkJoinPool implements ScheduledExecutorService. This change resulted in incorrect error reporting and failing tests in AVSystem/monix/#6

Problem

Monix has two internal implementations of ExecutorScheduler:

  • FromSimpleExecutor - for regular ExecutorService
  • FromScheduledExecutor - for ScheduledExecutorService

The FromScheduledExecutor implementation relies on error reporting being done by the underlying executor itself (via afterExecute hook), but this only works correctly with AdaptedThreadPoolExecutor (Monix's custom implementation). For other ScheduledExecutorService implementations, error reporting did not work properly.

Before JDK 25 this problem wasn't visible, because almost all Scheduler factory methods use AsyncScheduler, FromSimpleExecutor or FromScheduledExecutor together with AdaptedThreadPoolExecutor.

Additionally, withUncaughtExceptionReporter didn't work correctly for all scheduler methods in wrapped schedulers (AsyncScheduler, etc.) - the updated reporter wasn't being used for scheduleOnce, scheduleAtFixedRate, and scheduleWithFixedDelay.

Changes

  1. Restricted FromScheduledExecutor (now FromAdaptedThreadPoolExecutor) usage - This implementation is now only used for schedulers backed by AdaptedScheduledThreadPoolExecutor. All other ExecutorService implementations (including ScheduledExecutorService and ForkJoinPool) use FromSimpleExecutor.

  2. Fixed error reporting in wrapped schedulers (ReferenceScheduler) - The scheduleOnce, scheduleAtFixedRate, and scheduleWithFixedDelay methods now wrap runnables with InterceptRunnable to pass exceptions to correct reporters.

@alexandru

Copy link
Copy Markdown
Member

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 899ee7aa62

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

s.scheduleWithFixedDelay(initialDelay, delay, unit, InterceptRunnable(r, reporter))
override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable =
s.scheduleAtFixedRate(initialDelay, period, unit, r)
s.scheduleAtFixedRate(initialDelay, period, unit, InterceptRunnable(r, reporter))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve periodic task suppression after failures

When withUncaughtExceptionReporter is used on a scheduler whose periodic methods come from ReferenceScheduler (for example schedulers backed by a plain ExecutorService), wrapping the user runnable here catches and swallows the exception before the underlying ReferenceScheduler.scheduleAtFixedRate loop sees it. That loop only stops because r.run() throws, and the public Scheduler docs say later fixed-rate executions are suppressed after an exception; with this wrapper the task keeps rescheduling forever after reporting the failure.

Useful? React with 👍 / 👎.

Comment on lines +57 to +58
def withUncaughtExceptionReporter(reporter: UncaughtExceptionReporter): AdaptedScheduledThreadPoolExecutor =
new AdaptedScheduledThreadPoolExecutor(corePoolSize, factory, reporter)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reuse the existing executor when swapping reporters

For singleThread/fixedPool schedulers, calling withUncaughtExceptionReporter now constructs an entirely new ScheduledThreadPoolExecutor instead of returning a view over the same service. In the common pattern of creating a scheduler and immediately overriding its reporter, any already-started original pool is left independent and won't be shut down when the returned scheduler is shut down; existing scheduled work also remains attached to the old reporter/executor, unlike the other withUncaughtExceptionReporter implementations that keep the same executor.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants