Skip to content

[core] fix interrupts leak#1125

Merged
hearnadam merged 5 commits into
mainfrom
interrupts-leak
Apr 4, 2025
Merged

[core] fix interrupts leak#1125
hearnadam merged 5 commits into
mainfrom
interrupts-leak

Conversation

@fwbrasil
Copy link
Copy Markdown
Collaborator

@fwbrasil fwbrasil commented Apr 3, 2025

Fixes #1122

Problem

When a parent Fiber is executing and encounters a new child Fiber to be waited on, the parent needs to register a callback to propagate interrupts to the child in case there's an interrupt during the waiting. The issue is that, once the child fiber finishes and the parent resumes, the interrupt callback for the child isn't necessary anymore but it stays in the pending list until the parent fully completes execution. This leads to a memory leak when a parent fiber handles many children during its execution.

Solution

When the parent fiber resumes execution, remove the interrupt callback for the completed child.

Notes

I've also included a fast-path optimization: if the child fiber is already completed, the parent resumes execution immeditely without suspending. There's a performance penalty with this change due to the removal of a few inlines in IOPromise that should be easily offset with this additional optimization.

* @return
* The number of waiters on this Fiber
*/
def waiters(using Frame): Int < IO = IO(self.waiters())
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed it for testing and thought it'd be useful to expose to users

eval(discard(f(error.asInstanceOf[Error[E]])))
self
def removeInterrupt(other: IOPromise[?, ?]) =
self.removeInterrupt(other).onComplete(f)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to remove inline since it'd recurse the inlining here


def waiters: Int
def interrupt[E2 >: E](v: Error[E2]): Pending[E, A]
def removeInterrupt(other: IOPromise[?, ?]): Pending[E, A]
Copy link
Copy Markdown
Collaborator Author

@fwbrasil fwbrasil Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method is relatively expensive since it'll rebuild the pending list up to the point of the interrupts callback. It should be ok since, in the majority of cases, the list is quite small with typically two elements: one for interrupt and another for completion. The exception is the collection handling methods in Fiber, where State objects link to all forked fibers but it becomes available for GC right after the collection operation finishes. I think we don't need an early cleanup there.

@@ -178,12 +178,13 @@ abstract private class PublisherToSubscriberTest extends Test:
subStream3 <- subscriber3.stream
subscriber4 <- streamSubscriber
subStream4 <- subscriber4.stream
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was flaky

* @return
* Maybe containing the Result if the Fiber is done, or Absent if still pending
*/
def poll(using Frame): Maybe[Result[E, A]] < IO = IO(self.poll())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

Comment thread kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala Outdated
Comment thread kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala Outdated
Comment on lines +55 to +56
case null =>
cont(null)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this possible?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an edge case when the promise is completed with null. I added a couple of test scenarios for that a some time ago after having a complex issue due to it not being handled well.

@hearnadam hearnadam merged commit 6aa02b6 into main Apr 4, 2025
3 checks passed
@hearnadam hearnadam deleted the interrupts-leak branch April 4, 2025 20:42
fwbrasil added a commit that referenced this pull request Apr 2, 2026
Fixes:
- Scope finalizer release order changed from FIFO to LIFO (#1439)
- Closed error propagation in mapChunkPar and mapChunkParUnordered,
  porting the getResult+AtomicRef pattern from mapPar fix (PR #1497)

Tests: 40+ new resource safety tests across 6 files covering Scope
ordering, acquireRelease safety, fiber interaction, interrupt cleanup,
stream resource cleanup, Closed propagation, finalizer failure
isolation, and regression tests for fixed bugs (#1125, #1095, #1458,
#1339, #736).

Pending tests document unfixed systemic issues:
- Sync.ensure not firing on Abort.fail
- Sync.ensure skipped by Stream.take short-circuit (#1398)
- Acquire-register race window (#1224)
- Finalizer failure blocking subsequent finalizers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]: Memory leak when creating a large number of Promise instances

2 participants