Skip to content

Resource: avoid leaking Queue based Finalizer details#1137

Merged
fwbrasil merged 3 commits into
getkyo:mainfrom
hearnadam:scope-p1
Apr 15, 2025
Merged

Resource: avoid leaking Queue based Finalizer details#1137
fwbrasil merged 3 commits into
getkyo:mainfrom
hearnadam:scope-p1

Conversation

@hearnadam
Copy link
Copy Markdown
Collaborator

Problem

Finalizer could be implemented a number of ways. Exposing the Queue makes the implementation hard to change. Additionally, this makes it more complex when we support hierarchical scopes in #1131.

Solution

Use an abstract class, exposing only the necessary behaviors. In a followup, I will work towards a rename to Scope (optional?), then adding support for Scope.fork or some similar naming.

Comment thread kyo-core/shared/src/main/scala/kyo/Resource.scala
@hearnadam
Copy link
Copy Markdown
Collaborator Author

[info]   - parallel *** FAILED *** (404 milliseconds)
[info]     done.await(millis, java.util.concurrent.TimeUnit.MILLISECONDS) was false Latch had count 2 after 200.millis (MonixTest.scala:177)

Flaky test...


def ensure(v: => Any < (Async & Abort[Throwable]))(using Frame): Unit < IO =
IO.Unsafe {
if queue.offer(IO(v.unit)).isFailure then
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to ignore panics. I think JCTools can't panic for an offer but it'd be nice to cover the scenario since we might change the underlying impl at some point

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point, I will update.

.map(promise.becomeDiscard)
}

def await(using Frame): Unit < Async = promise.get
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! I think this essentially implements proper back pressure?

def ensure(v: => Any < (Async & Abort[Throwable]))(using Frame): Unit < IO

object Finalizer:
sealed abstract class Closeable extends Finalizer:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Awaitable?

val closeable = Finalizer.Closeable.Unsafe.init(closeParallelism)
ContextEffect.handle(Tag[Resource], closeable, _ => closeable)(v)
.handle(IO.ensure(closeable.close))
.map(result => closeable.await.andThen(result))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is returning a Fiber so use can be used. Not sure it's worth it

@fwbrasil
Copy link
Copy Markdown
Collaborator


[info]   - parallel *** FAILED *** (404 milliseconds)

[info]     done.await(millis, java.util.concurrent.TimeUnit.MILLISECONDS) was false Latch had count 2 after 200.millis (MonixTest.scala:177)

Flaky test...

Yeah, I've looked into these failures a couple of times. I suspect Monix has some race condition with interrupts

Comment thread kyo-core/shared/src/main/scala/kyo/Resource.scala Outdated
Co-authored-by: Flavio Brasil <fwbrasil@users.noreply.github.com>
@fwbrasil fwbrasil merged commit 7219e90 into getkyo:main Apr 15, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants