[core] fix interrupts leak#1125
Conversation
| * @return | ||
| * The number of waiters on this Fiber | ||
| */ | ||
| def waiters(using Frame): Int < IO = IO(self.waiters()) |
There was a problem hiding this comment.
I needed it for testing and thought it'd be useful to expose to users
| eval(discard(f(error.asInstanceOf[Error[E]]))) | ||
| self | ||
| def removeInterrupt(other: IOPromise[?, ?]) = | ||
| self.removeInterrupt(other).onComplete(f) |
There was a problem hiding this comment.
I had to remove inline since it'd recurse the inlining here
|
|
||
| def waiters: Int | ||
| def interrupt[E2 >: E](v: Error[E2]): Pending[E, A] | ||
| def removeInterrupt(other: IOPromise[?, ?]): Pending[E, A] |
There was a problem hiding this comment.
This new method is relatively expensive since it'll rebuild the pending list up to the point of the interrupts callback. It should be ok since, in the majority of cases, the list is quite small with typically two elements: one for interrupt and another for completion. The exception is the collection handling methods in Fiber, where State objects link to all forked fibers but it becomes available for GC right after the collection operation finishes. I think we don't need an early cleanup there.
| @@ -178,12 +178,13 @@ abstract private class PublisherToSubscriberTest extends Test: | |||
| subStream3 <- subscriber3.stream | |||
| subscriber4 <- streamSubscriber | |||
| subStream4 <- subscriber4.stream | |||
There was a problem hiding this comment.
this test was flaky
| * @return | ||
| * Maybe containing the Result if the Fiber is done, or Absent if still pending | ||
| */ | ||
| def poll(using Frame): Maybe[Result[E, A]] < IO = IO(self.poll()) |
| case null => | ||
| cont(null) |
There was a problem hiding this comment.
It's an edge case when the promise is completed with null. I added a couple of test scenarios for that a some time ago after having a complex issue due to it not being handled well.
Fixes: - Scope finalizer release order changed from FIFO to LIFO (#1439) - Closed error propagation in mapChunkPar and mapChunkParUnordered, porting the getResult+AtomicRef pattern from mapPar fix (PR #1497) Tests: 40+ new resource safety tests across 6 files covering Scope ordering, acquireRelease safety, fiber interaction, interrupt cleanup, stream resource cleanup, Closed propagation, finalizer failure isolation, and regression tests for fixed bugs (#1125, #1095, #1458, #1339, #736). Pending tests document unfixed systemic issues: - Sync.ensure not firing on Abort.fail - Sync.ensure skipped by Stream.take short-circuit (#1398) - Acquire-register race window (#1224) - Finalizer failure blocking subsequent finalizers
Fixes #1122
Problem
When a parent
Fiberis executing and encounters a new childFiberto be waited on, the parent needs to register a callback to propagate interrupts to the child in case there's an interrupt during the waiting. The issue is that, once the child fiber finishes and the parent resumes, the interrupt callback for the child isn't necessary anymore but it stays in the pending list until the parent fully completes execution. This leads to a memory leak when a parent fiber handles many children during its execution.Solution
When the parent fiber resumes execution, remove the interrupt callback for the completed child.
Notes
I've also included a fast-path optimization: if the child fiber is already completed, the parent resumes execution immeditely without suspending. There's a performance penalty with this change due to the removal of a few
inlines inIOPromisethat should be easily offset with this additional optimization.