Skip to content

Conversation

@Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Nov 21, 2025

Summary

fixes #7935

This PR adds completion callback support and async handler support to Akka.NET persistence, enabling users to:

  1. Detect when all events in a PersistAll/PersistAllAsync batch have been persisted and their handlers executed
  2. Use async handlers (Func<TEvent, Task>) with all persist methods

Note: This is a proof of concept implementation.

New API Reference

Async Handler Overloads (NEW)

Single event persistence with async handlers:

// Stashing (blocks incoming commands until persisted)
void Persist<TEvent>(TEvent @event, Func<TEvent, Task> handler)

// Non-stashing (continues processing commands while persisting)
void PersistAsync<TEvent>(TEvent @event, Func<TEvent, Task> handler)

Batch Persistence with Async Handlers (NEW)

// Stashing variants
void PersistAll<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler)
void PersistAll<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler, Action onComplete)
void PersistAll<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler, Func<Task> onCompleteAsync)

// Non-stashing variants  
void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler)
void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler, Action onComplete)
void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Func<TEvent, Task> handler, Func<Task> onCompleteAsync)

Completion Callbacks for Sync Handlers (NEW)

// Stashing variants with sync handler
void PersistAll<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler, Action onComplete)
void PersistAll<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler, Func<Task> onCompleteAsync)

// Non-stashing variants with sync handler
void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler, Action onComplete)
void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handler, Func<Task> onCompleteAsync)

Usage Examples

Async handler with completion callback:

PersistAll(events, async evt => 
{
    // Async work for each event
    await UpdateProjectionsAsync(evt);
}, 
onComplete: () => 
{
    // Called once after ALL events persisted and handlers executed
    Sender.Tell(new BatchComplete());
});

Async handler with async completion callback:

PersistAll(events, async evt => 
{
    await UpdateProjectionsAsync(evt);
}, 
onCompleteAsync: async () => 
{
    // Async completion work
    await NotifySubscribersAsync();
    Sender.Tell(new BatchComplete());
});

Empty batch handling:

// Completion callback is invoked immediately for empty event lists
PersistAll(Array.Empty<MyEvent>(), _ => { }, () => 
{
    // This is called immediately
    Sender.Tell(new BatchComplete());
});

Changes

New Features

  • Completion Callbacks: PersistAll and PersistAllAsync now accept optional onComplete (sync) or onCompleteAsync (async) callbacks that execute after all events in the batch are persisted
  • Async Handlers: All 4 persist methods (Persist, PersistAsync, PersistAll, PersistAllAsync) now support Func<TEvent, Task> async handlers

Implementation Details

  • Added 8 new invocation classes with interface segregation pattern for sync/async handlers and sync/async completion callbacks
  • Added IStashingInvocation marker interface for type-safe detection of stashing invocations
  • Added 10 new method overloads providing all combinations of sync/async handlers with sync/async completion callbacks
  • Updated PeekApplyHandler to chain all async operations in a single RunTask call to avoid nested RunTask exceptions
  • Maintains backward compatibility - all existing persist method signatures unchanged
  • Preserves actor semantics (stashing vs non-stashing behavior)

Test Coverage

Added comprehensive test suite in PersistenceCompletionCallbackSpec.cs:

  • PersistAllAsync_should_invoke_completion_callback_after_all_events_persisted
  • PersistAll_should_invoke_completion_callback_after_all_events_persisted
  • PersistAllAsync_should_invoke_async_completion_callback
  • PersistAll_should_invoke_async_completion_callback
  • PersistAllAsync_should_support_async_handlers
  • PersistAsync_should_support_async_handlers
  • Completion_callback_should_fire_exactly_once_per_batch
  • Empty_event_list_should_invoke_completion_callback_immediately

All 8 new tests pass consistently (100% pass rate).

Test Results

Full test suite results match baseline stability (266-268 passing, 7-10 failing - matching pre-existing baseline variance). No new failures introduced by this implementation.

Add completion callback support to PersistAll and PersistAllAsync methods
to detect when all events in a batch have been persisted and their handlers
executed. Add async handler support (Func<TEvent, Task>) for all persist
methods (Persist, PersistAsync, PersistAll, PersistAllAsync).

Changes:
- Add 8 new invocation classes with interface segregation for sync/async
  handlers and sync/async completion callbacks
- Add IStashingInvocation marker interface for type-safe detection of
  stashing invocations
- Add 10 new method overloads across all 4 persist methods supporting
  completion callbacks and async handlers
- Update PeekApplyHandler to chain async operations in a single RunTask
  call to avoid nested RunTask exceptions
- Maintain backward compatibility and preserve actor semantics

Test coverage:
- 8 new tests covering completion callbacks and async handlers
- All new tests pass consistently (100% pass rate)
- No new failures introduced in existing test suite
Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes - IMHO, I think I can simplify this by transforming all of the Action<object> callbacks into Func<object,Task> callbacks by just wrapping the delegate. That will cut down on the number of internal classes and interfaces pretty significantly.

Plus I identified some inconsistencies in how we're handling empty / null event arrays.

namespace Akka.Persistence
{
public interface IPendingHandlerInvocation
internal interface IPendingHandlerInvocation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving all of the IPendingHandlerInvocations from public to internal - this is going to be part of the v1.6 API changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should have never been public APIs in the first place IMHO

internal interface IPendingHandlerInvocation
{
object Event { get; }
bool IsLastInBatch { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to signal if the OnCompletion handler should be fired. No-ops for individual Persist / PersistAsync calls.

bool IsLastInBatch { get; }
}

internal interface ISyncHandlerInvocation : IPendingHandlerInvocation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync == uses Action<object>
Async == uses Func<object,Task>

Func<object, Task> AsyncHandler { get; }
}

internal interface ISyncCompletionInvocation : IPendingHandlerInvocation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completion callbacks don't take an event, so just Action or Func<Task> here

/// <summary>
/// Marker interface for stashing invocations that require command stashing.
/// </summary>
internal interface IStashingInvocation : IPendingHandlerInvocation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Persistence Handler Invocation Types

This document describes the internal invocation types used by Eventsourced actors to manage persistence handlers and completion callbacks.

Interfaces

Interface Purpose Key Property
IPendingHandlerInvocation Base interface for all invocations Event, IsLastInBatch
ISyncHandlerInvocation Sync event handler (Action<object>) Handler
IAsyncHandlerInvocation Async event handler (Func<object, Task>) AsyncHandler
ISyncCompletionInvocation Sync completion callback (Action) CompletionCallback
IAsyncCompletionInvocation Async completion callback (Func<Task>) AsyncCompletionCallback
IStashingInvocation Marker for stashing behavior (blocks commands) (none - marker only)

Concrete Classes

Class Stashing Handler Completion Used By
StashingHandlerInvocation Yes Sync Sync Persist, PersistAll
StashingHandlerInvocationWithAsyncCompletion Yes Sync Async PersistAll
StashingAsyncHandlerInvocation Yes Async Sync Persist, PersistAll
StashingAsyncHandlerInvocationWithAsyncCompletion Yes Async Async PersistAll
NonStashingHandlerInvocation No Sync Sync PersistAsync, PersistAllAsync, DeferAsync
NonStashingHandlerInvocationWithAsyncCompletion No Sync Async PersistAllAsync
NonStashingAsyncHandlerInvocation No Async Sync PersistAsync, PersistAllAsync
NonStashingAsyncHandlerInvocationWithAsyncCompletion No Async Async PersistAllAsync

Interface Implementation Matrix

Class IStashingInvocation ISyncHandlerInvocation IAsyncHandlerInvocation ISyncCompletionInvocation IAsyncCompletionInvocation
StashingHandlerInvocation
StashingHandlerInvocationWithAsyncCompletion
StashingAsyncHandlerInvocation
StashingAsyncHandlerInvocationWithAsyncCompletion
NonStashingHandlerInvocation
NonStashingHandlerInvocationWithAsyncCompletion
NonStashingAsyncHandlerInvocation
NonStashingAsyncHandlerInvocationWithAsyncCompletion

Key Concepts

  • Stashing: When IStashingInvocation is implemented, incoming commands are stashed until all persistence handlers complete. This ensures consistency during persistence operations.
  • Handler Type: Sync (Action<TEvent>) vs Async (Func<TEvent, Task>) determines how event handlers execute.
  • Completion Type: Sync (Action) vs Async (Func<Task>) determines how the batch completion callback executes.

Naming Convention

The class names follow a consistent pattern:

[Stashing|NonStashing][Async]HandlerInvocation[WithAsyncCompletion]
  • Stashing / NonStashing - Whether commands are stashed during persistence
  • Async (before Handler) - Whether the event handler is async
  • WithAsyncCompletion - Whether the completion callback is async (omitted for sync completion)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a larger number of overloads here due to the combination of things we need to support both for backwards compatibility and ease of API use.... alllllllllllllthough..... this gives me an idea....

throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

if (events == null) return;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also exit of the events collection is zero-length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, don't we need to invoke the task handler here just like we did in the above overload?

throw new InvalidOperationException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.");
}

if (events == null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about zero-length?

var invocation = _pendingInvocations.First.Value;

// Check if we have any async work (handler or completion callback)
bool hasAsyncHandler = invocation is IAsyncHandlerInvocation;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check to see what type of handler(s) we need to run

// Invoke handler (async or sync)
if (hasAsyncHandler)
{
await ((IAsyncHandlerInvocation)invocation).AsyncHandler(payload);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event-specific handler always goes first

// Invoke completion callback if this is the last event in the batch
if (invocation.IsLastInBatch)
{
if (invocation is IAsyncCompletionInvocation asyncCompInv && asyncCompInv.AsyncCompletionCallback != null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completion handler always goes last

Aaronontheweb added a commit that referenced this pull request Dec 1, 2025
…ative to #7937 (#7954)

* feat(persistence): add completion callbacks and async handler support

Add completion callback overloads for PersistAll and PersistAllAsync that
invoke a callback after all events have been persisted and their handlers
executed. Also add async handler support (Func<TEvent, Task>) to all
persist methods.

Key changes:
- Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation,
  and IStashingInvocation interfaces for type-safe handler invocation
- Add StashingHandlerInvocation, StashingAsyncHandlerInvocation,
  AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes
- Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAll overloads with completion callbacks (sync and async)
- Add PersistAllAsync overloads with completion callbacks (sync and async)
- Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add internal stashing Defer methods for completion callback support
- Update PeekApplyHandler to handle async handlers via RunTask
- Update PersistingEvents to use IStashingInvocation marker interface

Stashing semantics are preserved: PersistAll completion callbacks use
internal stashing Defer (increments _pendingStashingPersistInvocations),
while PersistAllAsync uses non-stashing DeferAsync.

* chore: update API approval for persistence completion callbacks

Update verified API file to reflect:
- New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async
  handler overloads and completion callback overloads
- New public method: DeferAsync with async handler
- Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation,
  and IPendingHandlerInvocation are now internal (implementation detail)

* chore: update .NET Framework API approval for persistence completion callbacks

* fixed API approvals

* test(persistence): add empty events tests and convert to async test methods

- Convert all tests to use async/await with ExpectMsgAsync instead of
  sync-over-async ExpectMsg calls
- Add tests for PersistAll/PersistAllAsync with empty events to verify
  completion callbacks are invoked immediately for all overloads:
  - PersistAll with sync completion callback (existing)
  - PersistAll with async completion callback (new)
  - PersistAllAsync with sync completion callback (new)
  - PersistAllAsync with async completion callback (new)
- Update EmptyEventsWithCompletionActor to support all four scenarios

* fix(persistence): use Defer for empty events completion callbacks to maintain ordering

When PersistAll/PersistAllAsync is called with empty events, the completion
callback must still be queued through Defer/DeferAsync to maintain the
in-order execution guarantee. Previously, the callback was invoked immediately
which could cause out-of-order execution if there were pending invocations
from prior Persist/PersistAll calls.

Changes:
- Replace immediate invocation with Defer/DeferAsync for all 8 overloads
  that have completion callbacks when events collection is null or empty
- Add SequentialPersistOrderingActor test actor for ordering verification
- Add test: Persist followed by empty PersistAll maintains execution order
- Add test: Sequential PersistAll with empty in middle maintains order
@Aaronontheweb Aaronontheweb removed this from the 1.6.0 milestone Dec 1, 2025
@Aaronontheweb
Copy link
Member Author

Superseded via #7954

@Aaronontheweb Aaronontheweb deleted the feature/persistence-completion-callbacks branch December 1, 2025 21:36
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Dec 2, 2025
…ative to akkadotnet#7937 (akkadotnet#7954)

* feat(persistence): add completion callbacks and async handler support

Add completion callback overloads for PersistAll and PersistAllAsync that
invoke a callback after all events have been persisted and their handlers
executed. Also add async handler support (Func<TEvent, Task>) to all
persist methods.

Key changes:
- Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation,
  and IStashingInvocation interfaces for type-safe handler invocation
- Add StashingHandlerInvocation, StashingAsyncHandlerInvocation,
  AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes
- Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAll overloads with completion callbacks (sync and async)
- Add PersistAllAsync overloads with completion callbacks (sync and async)
- Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add internal stashing Defer methods for completion callback support
- Update PeekApplyHandler to handle async handlers via RunTask
- Update PersistingEvents to use IStashingInvocation marker interface

Stashing semantics are preserved: PersistAll completion callbacks use
internal stashing Defer (increments _pendingStashingPersistInvocations),
while PersistAllAsync uses non-stashing DeferAsync.

* chore: update API approval for persistence completion callbacks

Update verified API file to reflect:
- New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async
  handler overloads and completion callback overloads
- New public method: DeferAsync with async handler
- Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation,
  and IPendingHandlerInvocation are now internal (implementation detail)

* chore: update .NET Framework API approval for persistence completion callbacks

* fixed API approvals

* test(persistence): add empty events tests and convert to async test methods

- Convert all tests to use async/await with ExpectMsgAsync instead of
  sync-over-async ExpectMsg calls
- Add tests for PersistAll/PersistAllAsync with empty events to verify
  completion callbacks are invoked immediately for all overloads:
  - PersistAll with sync completion callback (existing)
  - PersistAll with async completion callback (new)
  - PersistAllAsync with sync completion callback (new)
  - PersistAllAsync with async completion callback (new)
- Update EmptyEventsWithCompletionActor to support all four scenarios

* fix(persistence): use Defer for empty events completion callbacks to maintain ordering

When PersistAll/PersistAllAsync is called with empty events, the completion
callback must still be queued through Defer/DeferAsync to maintain the
in-order execution guarantee. Previously, the callback was invoked immediately
which could cause out-of-order execution if there were pending invocations
from prior Persist/PersistAll calls.

Changes:
- Replace immediate invocation with Defer/DeferAsync for all 8 overloads
  that have completion callbacks when events collection is null or empty
- Add SequentialPersistOrderingActor test actor for ordering verification
- Add test: Persist followed by empty PersistAll maintains execution order
- Add test: Sequential PersistAll with empty in middle maintains order
Aaronontheweb added a commit that referenced this pull request Dec 2, 2025
…ative to #7937 (#7954) (#7957)

* feat(persistence): add completion callbacks and async handler support

Add completion callback overloads for PersistAll and PersistAllAsync that
invoke a callback after all events have been persisted and their handlers
executed. Also add async handler support (Func<TEvent, Task>) to all
persist methods.

Key changes:
- Add IPendingHandlerInvocation, ISyncHandlerInvocation, IAsyncHandlerInvocation,
  and IStashingInvocation interfaces for type-safe handler invocation
- Add StashingHandlerInvocation, StashingAsyncHandlerInvocation,
  AsyncHandlerInvocation, and AsyncAsyncHandlerInvocation classes
- Add Persist<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add PersistAll overloads with completion callbacks (sync and async)
- Add PersistAllAsync overloads with completion callbacks (sync and async)
- Add DeferAsync<TEvent>(TEvent, Func<TEvent, Task>) async handler overload
- Add internal stashing Defer methods for completion callback support
- Update PeekApplyHandler to handle async handlers via RunTask
- Update PersistingEvents to use IStashingInvocation marker interface

Stashing semantics are preserved: PersistAll completion callbacks use
internal stashing Defer (increments _pendingStashingPersistInvocations),
while PersistAllAsync uses non-stashing DeferAsync.

* chore: update API approval for persistence completion callbacks

Update verified API file to reflect:
- New public methods: Persist/PersistAsync/PersistAll/PersistAllAsync async
  handler overloads and completion callback overloads
- New public method: DeferAsync with async handler
- Internal invocation classes: AsyncHandlerInvocation, StashingHandlerInvocation,
  and IPendingHandlerInvocation are now internal (implementation detail)

* chore: update .NET Framework API approval for persistence completion callbacks

* fixed API approvals

* test(persistence): add empty events tests and convert to async test methods

- Convert all tests to use async/await with ExpectMsgAsync instead of
  sync-over-async ExpectMsg calls
- Add tests for PersistAll/PersistAllAsync with empty events to verify
  completion callbacks are invoked immediately for all overloads:
  - PersistAll with sync completion callback (existing)
  - PersistAll with async completion callback (new)
  - PersistAllAsync with sync completion callback (new)
  - PersistAllAsync with async completion callback (new)
- Update EmptyEventsWithCompletionActor to support all four scenarios

* fix(persistence): use Defer for empty events completion callbacks to maintain ordering

When PersistAll/PersistAllAsync is called with empty events, the completion
callback must still be queued through Defer/DeferAsync to maintain the
in-order execution guarantee. Previously, the callback was invoked immediately
which could cause out-of-order execution if there were pending invocations
from prior Persist/PersistAll calls.

Changes:
- Replace immediate invocation with Defer/DeferAsync for all 8 overloads
  that have completion callbacks when events collection is null or empty
- Add SequentialPersistOrderingActor test actor for ordering verification
- Add test: Persist followed by empty PersistAll maintains execution order
- Add test: Sequential PersistAll with empty in middle maintains order
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add completion callbacks and async handler support to PersistAllAsync/PersistAll

1 participant