-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(persistence): add completion callbacks and async handler support #7937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(persistence): add completion callbacks and async handler support #7937
Conversation
Add completion callback support to PersistAll and PersistAllAsync methods to detect when all events in a batch have been persisted and their handlers executed. Add async handler support (Func<TEvent, Task>) for all persist methods (Persist, PersistAsync, PersistAll, PersistAllAsync). Changes: - Add 8 new invocation classes with interface segregation for sync/async handlers and sync/async completion callbacks - Add IStashingInvocation marker interface for type-safe detection of stashing invocations - Add 10 new method overloads across all 4 persist methods supporting completion callbacks and async handlers - Update PeekApplyHandler to chain async operations in a single RunTask call to avoid nested RunTask exceptions - Maintain backward compatibility and preserve actor semantics Test coverage: - 8 new tests covering completion callbacks and async handlers - All new tests pass consistently (100% pass rate) - No new failures introduced in existing test suite
Aaronontheweb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes - IMHO, I think I can simplify this by transforming all of the Action<object> callbacks into Func<object,Task> callbacks by just wrapping the delegate. That will cut down on the number of internal classes and interfaces pretty significantly.
Plus I identified some inconsistencies in how we're handling empty / null event arrays.
| namespace Akka.Persistence | ||
| { | ||
| public interface IPendingHandlerInvocation | ||
| internal interface IPendingHandlerInvocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving all of the IPendingHandlerInvocations from public to internal - this is going to be part of the v1.6 API changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should have never been public APIs in the first place IMHO
| internal interface IPendingHandlerInvocation | ||
| { | ||
| object Event { get; } | ||
| bool IsLastInBatch { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to signal if the OnCompletion handler should be fired. No-ops for individual Persist / PersistAsync calls.
| bool IsLastInBatch { get; } | ||
| } | ||
|
|
||
| internal interface ISyncHandlerInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sync == uses Action<object>
Async == uses Func<object,Task>
| Func<object, Task> AsyncHandler { get; } | ||
| } | ||
|
|
||
| internal interface ISyncCompletionInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completion callbacks don't take an event, so just Action or Func<Task> here
| /// <summary> | ||
| /// Marker interface for stashing invocations that require command stashing. | ||
| /// </summary> | ||
| internal interface IStashingInvocation : IPendingHandlerInvocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Persistence Handler Invocation Types
This document describes the internal invocation types used by Eventsourced actors to manage persistence handlers and completion callbacks.
Interfaces
| Interface | Purpose | Key Property |
|---|---|---|
IPendingHandlerInvocation |
Base interface for all invocations | Event, IsLastInBatch |
ISyncHandlerInvocation |
Sync event handler (Action<object>) |
Handler |
IAsyncHandlerInvocation |
Async event handler (Func<object, Task>) |
AsyncHandler |
ISyncCompletionInvocation |
Sync completion callback (Action) |
CompletionCallback |
IAsyncCompletionInvocation |
Async completion callback (Func<Task>) |
AsyncCompletionCallback |
IStashingInvocation |
Marker for stashing behavior (blocks commands) | (none - marker only) |
Concrete Classes
| Class | Stashing | Handler | Completion | Used By |
|---|---|---|---|---|
StashingHandlerInvocation |
Yes | Sync | Sync | Persist, PersistAll |
StashingHandlerInvocationWithAsyncCompletion |
Yes | Sync | Async | PersistAll |
StashingAsyncHandlerInvocation |
Yes | Async | Sync | Persist, PersistAll |
StashingAsyncHandlerInvocationWithAsyncCompletion |
Yes | Async | Async | PersistAll |
NonStashingHandlerInvocation |
No | Sync | Sync | PersistAsync, PersistAllAsync, DeferAsync |
NonStashingHandlerInvocationWithAsyncCompletion |
No | Sync | Async | PersistAllAsync |
NonStashingAsyncHandlerInvocation |
No | Async | Sync | PersistAsync, PersistAllAsync |
NonStashingAsyncHandlerInvocationWithAsyncCompletion |
No | Async | Async | PersistAllAsync |
Interface Implementation Matrix
| Class | IStashingInvocation |
ISyncHandlerInvocation |
IAsyncHandlerInvocation |
ISyncCompletionInvocation |
IAsyncCompletionInvocation |
|---|---|---|---|---|---|
StashingHandlerInvocation |
✓ | ✓ | ✓ | ||
StashingHandlerInvocationWithAsyncCompletion |
✓ | ✓ | ✓ | ||
StashingAsyncHandlerInvocation |
✓ | ✓ | ✓ | ||
StashingAsyncHandlerInvocationWithAsyncCompletion |
✓ | ✓ | ✓ | ||
NonStashingHandlerInvocation |
✓ | ✓ | |||
NonStashingHandlerInvocationWithAsyncCompletion |
✓ | ✓ | |||
NonStashingAsyncHandlerInvocation |
✓ | ✓ | |||
NonStashingAsyncHandlerInvocationWithAsyncCompletion |
✓ | ✓ |
Key Concepts
- Stashing: When
IStashingInvocationis implemented, incoming commands are stashed until all persistence handlers complete. This ensures consistency during persistence operations. - Handler Type: Sync (
Action<TEvent>) vs Async (Func<TEvent, Task>) determines how event handlers execute. - Completion Type: Sync (
Action) vs Async (Func<Task>) determines how the batch completion callback executes.
Naming Convention
The class names follow a consistent pattern:
[Stashing|NonStashing][Async]HandlerInvocation[WithAsyncCompletion]
Stashing/NonStashing- Whether commands are stashed during persistenceAsync(before Handler) - Whether the event handler is asyncWithAsyncCompletion- Whether the completion callback is async (omitted for sync completion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a larger number of overloads here due to the combination of things we need to support both for backwards compatibility and ease of API use.... alllllllllllllthough..... this gives me an idea....
| throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); | ||
| } | ||
|
|
||
| if (events == null) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also exit of the events collection is zero-length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, don't we need to invoke the task handler here just like we did in the above overload?
| throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later."); | ||
| } | ||
|
|
||
| if (events == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about zero-length?
| var invocation = _pendingInvocations.First.Value; | ||
|
|
||
| // Check if we have any async work (handler or completion callback) | ||
| bool hasAsyncHandler = invocation is IAsyncHandlerInvocation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check to see what type of handler(s) we need to run
| // Invoke handler (async or sync) | ||
| if (hasAsyncHandler) | ||
| { | ||
| await ((IAsyncHandlerInvocation)invocation).AsyncHandler(payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event-specific handler always goes first
| // Invoke completion callback if this is the last event in the batch | ||
| if (invocation.IsLastInBatch) | ||
| { | ||
| if (invocation is IAsyncCompletionInvocation asyncCompInv && asyncCompInv.AsyncCompletionCallback != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completion handler always goes last
…ative to #7937 (#7954) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
|
Superseded via #7954 |
…ative to akkadotnet#7937 (akkadotnet#7954) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
…ative to #7937 (#7954) (#7957) * feat(persistence): add completion callbacks and async handler support Add completion callback overloads for PersistAll and PersistAllAsync that invoke a callback after all events have been persisted and their handlers executed. Also add async handler support (Func<TEvent, Task>) to all persist methods. Key changes: - Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation, and IStashingInvocation interfaces for type-safe handler invocation - Add StashingHandlerInvocation, StashingAsyncHandlerInvocation, AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes - Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add PersistAll overloads with completion callbacks (sync and async) - Add PersistAllAsync overloads with completion callbacks (sync and async) - Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload - Add internal stashing Defer methods for completion callback support - Update PeekApplyHandler to handle async handlers via RunTask - Update PersistingEvents to use IStashingInvocation marker interface Stashing semantics are preserved: PersistAll completion callbacks use internal stashing Defer (increments _pendingStashingPersistInvocations), while PersistAllAsync uses non-stashing DeferAsync. * chore: update API approval for persistence completion callbacks Update verified API file to reflect: - New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async handler overloads and completion callback overloads - New public method: DeferAsync with async handler - Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation, and IPendingHandlerInvocation are now internal (implementation detail) * chore: update .NET Framework API approval for persistence completion callbacks * fixed API approvals * test(persistence): add empty events tests and convert to async test methods - Convert all tests to use async/await with ExpectMsgAsync instead of sync-over-async ExpectMsg calls - Add tests for PersistAll/PersistAllAsync with empty events to verify completion callbacks are invoked immediately for all overloads: - PersistAll with sync completion callback (existing) - PersistAll with async completion callback (new) - PersistAllAsync with sync completion callback (new) - PersistAllAsync with async completion callback (new) - Update EmptyEventsWithCompletionActor to support all four scenarios * fix(persistence): use Defer for empty events completion callbacks to maintain ordering When PersistAll/PersistAllAsync is called with empty events, the completion callback must still be queued through Defer/DeferAsync to maintain the in-order execution guarantee. Previously, the callback was invoked immediately which could cause out-of-order execution if there were pending invocations from prior Persist/PersistAll calls. Changes: - Replace immediate invocation with Defer/DeferAsync for all 8 overloads that have completion callbacks when events collection is null or empty - Add SequentialPersistOrderingActor test actor for ordering verification - Add test: Persist followed by empty PersistAll maintains execution order - Add test: Sequential PersistAll with empty in middle maintains order
Summary
fixes #7935
This PR adds completion callback support and async handler support to Akka.NET persistence, enabling users to:
PersistAll/PersistAllAsyncbatch have been persisted and their handlers executedFunc<TEvent, Task>) with all persist methodsNote: This is a proof of concept implementation.
New API Reference
Async Handler Overloads (NEW)
Single event persistence with async handlers:
Batch Persistence with Async Handlers (NEW)
Completion Callbacks for Sync Handlers (NEW)
Usage Examples
Async handler with completion callback:
Async handler with async completion callback:
Empty batch handling:
Changes
New Features
PersistAllandPersistAllAsyncnow accept optionalonComplete(sync) oronCompleteAsync(async) callbacks that execute after all events in the batch are persistedPersist,PersistAsync,PersistAll,PersistAllAsync) now supportFunc<TEvent, Task>async handlersImplementation Details
IStashingInvocationmarker interface for type-safe detection of stashing invocationsPeekApplyHandlerto chain all async operations in a singleRunTaskcall to avoid nestedRunTaskexceptionsTest Coverage
Added comprehensive test suite in
PersistenceCompletionCallbackSpec.cs:PersistAllAsync_should_invoke_completion_callback_after_all_events_persistedPersistAll_should_invoke_completion_callback_after_all_events_persistedPersistAllAsync_should_invoke_async_completion_callbackPersistAll_should_invoke_async_completion_callbackPersistAllAsync_should_support_async_handlersPersistAsync_should_support_async_handlersCompletion_callback_should_fire_exactly_once_per_batchEmpty_event_list_should_invoke_completion_callback_immediatelyAll 8 new tests pass consistently (100% pass rate).
Test Results
Full test suite results match baseline stability (266-268 passing, 7-10 failing - matching pre-existing baseline variance). No new failures introduced by this implementation.