Skip to content

Conversation

@Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented May 19, 2025

PublishWithAck

A PublishWithAck envelope message works just like a normal Publish envelope, but unlike normal Publish message, a PublishWithAck has an extra Timeout property and, if failed to be published, will be stored inside a per-topic buffer to wait until a subscriber in the cluster subscribed to the topic.

Buffer

  • If the mediator received a PublishWithAck message and able to deliver it immediately, it will send back a PublishSucceeded message back to the original sender as a signal.
  • If the mediator could not deliver a PublishWithAck message, it will store the message inside a buffer.
  • The size of this buffer can be configured using the HOCON setting akka.cluster.pub-sub.buffered-messages.max-per-topic

Buffer timeout

  • The PublishWithAck message contains the Timeout property.
  • Every akka.cluster.pub-sub.buffered-messages.timeout-check-interval time, the mediator will scan through all the buffered message to check and see if it has been sitting in the buffer for more than timeout period.
  • Timed out buffered message will be removed from the buffer and either silently discarded or sent to the dead letter.
  • A PublishFailure message will also be sent to the original sender to signal publish failures.

Buffer delivery

When any subscribers of the same topic as the PublishWithAck topic joins the publisher node, all buffered messages will be sent to the subscriber. For each delivered message, the mediator will send a PublishSucceeded message to the original sender as a signal.

Buffer overflow

When a new message is inserted and the topic buffer count exceeds akka.cluster.pub-sub.buffered-messages.max-per-topic, the mediator will discard the oldest message in the buffer and send a PublishFailed to the original sender of that message.

Example

This example is the same as the publisher example above, but now it uses PublishWithAck and it will wait for 30 minutes for the payload to be consumed.

using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;

namespace SamplePublisher;

public class PublisherWithAck: ReceiveActor
{
    public PublisherWithAck()
    {
        var log = Context.GetLogger();
        var mediator = DistributedPubSub.Get(Context.System).Mediator;
        
        Receive<string>(input => mediator.Tell(
            new PublishWithAck("content", input.ToUpperInvariant(), TimeSpan.FromSeconds(30))));
        
        Receive<PublishSucceeded>(success => log.Info(
            "Published {0} to topic {1}.", success.Message.Message, success.Message.Topic));
        
        Receive<PublishFailed>(fail => log.Error(
            "Failed to publish {0} to topic {1}. Reason: {2}", fail.Message.Message, fail.Message.Topic, fail.Reason));
    }
}

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include a code sample

@Arkatufus
Copy link
Contributor Author

Done

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) May 20, 2025 14:21
@Aaronontheweb Aaronontheweb merged commit adf9eb4 into akkadotnet:dev May 20, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants