Skip to content

Conversation

@Neverlord
Copy link
Member

Problem

The core actor dispatches incoming messages (from local publishers, peers or WebSockets) to subscribers (again: local subscribers, peers or WebSockets). For that, Broker currently sets up data flow pipelines. Each subscriber subscribes to the central merge point (where all the inputs flow together) and then filters unwanted messages.

The problem is that this filtering is done by each subscriber individually. This means we will perform n filter checks, where n is the number of subscribers. Since subscriptions in Broker are prefix-based, this means a filter check isn’t just a single hash map lookup.

Proposed Solution

At the end of the day, Broker shoves data from one set of buffers (inputs) to another set of buffers (outputs). Those buffers then connect to hubs, stores, peers or WebSocket clients. While data flows come with nice properties and APIs, we barely use any of that. In fact, the pipeline setup in the core actor has become quite involved since there we have added WebSocket peers and hubs to the mix.

The proposed solution operates much closer to the buffers. We introduce a new handler concept in the core actor, which is basically just a data sink. It is managing an output buffer and also adds back-pressure. Each handler has a queue that fills up if the buffer reached its capacity (which happens if we push faster than the reader consumes from the buffer). If that queue overflows, we apply the configured strategy (disconnect, drop-oldest or drop-newest). The main change is that we store all handlers in a single filter: a multi map that allows storing multiples handlers for the same topic(s). This means we always do a single, central filter lookup. The result of that lookup is a list of handlers that have a matching subscription. Then we call offer on all those handlers. This method may do a few extra checks. For example, a data store handler discards anything that isn’t a command message.

With this design, we only ever have exactly one filter lookup. While that filter may still grow with the number of peers, a central filter lookup should still be faster than doing n lookups.

Benchmarking

Setup: sending 20k messages/s. To start the receiving node:

./build/bin/broker-node -t "[$( tr -d '\n' < topics.txt ) '/benchmark/events']" -p 1234 -v -r

To start the sending node:

./build/bin/broker-throughput --verbose -r 20000 localhost:1234

After a while, we peer 20 additional nodes with the receiver, starting each via:

./build/bin/broker-node -t "[$( tr -d '\n' < topics.txt )]" --peers='["tcp://localhost:1234"]' -v

Baseline

No additional peerings

broker-node: ~36% CPU
broker-throughput: ~34% CPU

20 additional peerings

broker-node: ~53%
broker-throughput: ~37%

Branch

broker-node: ~36% CPU
broker-throughput: ~34% CPU

20 additional peerings

broker-node: ~47%
broker-throughput: ~36%

Discussion

Doing a single filter lookup reduces CPU load in the benchmark by about 6% (from 53% to 47%), which is good. However, there is still some linear scaling with the number of peers. The additional peers are not subscribed to the /benchmark/events topic, so they don’t receive any data. I can verify from the system diagnostics that the extra peers sit idle well below 1% CPU and don’t receive any data. Hence, they shouldn’t cause additional CPU load.

Long story short, the new structure cuts down unnecessary CPU load due to centralizing the filter lookup (resulting in a ~6% reduction of CPU load for the benchmark). I think the new code is also simpler, which is an added bonus. However, even in the new branch we can see that adding more peers adds run-time cost even if they don’t receive anything. We should look into that next, after wrapping up this PR.

@Neverlord Neverlord changed the title Topic/neverlord/core actor redesign Centralize the filter lookup in the core actor Sep 28, 2025
Neverlord added a commit to zeek/zeek that referenced this pull request Oct 5, 2025
@Neverlord Neverlord marked this pull request as ready for review October 5, 2025 11:47
@ckreibich ckreibich self-assigned this Oct 6, 2025
Copy link
Member

@ckreibich ckreibich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! As usual, sorry for taking forever to review this. My usual Broker disclaimer applies — I think I get this at a high level, and almost certainly won't notice bugs in the details. I am continuing to treat CAF as a black box.

My gut feel is that this is quite a large changeset for a potential 6% improvement, but I do like the more explicit callbacks and overall layout of the code. I'm nervous about new buffering semantics. To be clear: the only real change in backpressure logic is for datastores, right?

My only real change request here is refactoring the commits that start with "fix" into earlier commits — these should ideally go away, unless I'm misunderstanding their purpose? If they need to stay, it'd be great to get more detail on what "fixing" means.

}

/// Selects all values that match the given topic, i.e., appends all values
/// from entries that have a key that is a prefix of the given topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prefix logic is also the reason you're not using std::multimap, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. We need to scan all entries anyways due to the prefix lookup. A vector is simply faster in this case. Can also use the trie I've been working on as future optimization.

std::sort(result.begin(), result.end());
result.erase(std::unique(result.begin(), result.end()), result.end());
}
std::vector<value_type> select(const key_type& topic) const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want an extra line before this for readability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've also added more documentation.

public:
using super = handler;

using buffer_producer_ptr = caf::async::spsc_buffer_producer_ptr<T>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what does spsc stand for?

Copy link
Member Author

@Neverlord Neverlord Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single Producer Single Consumer.

out->push(std::move(items));
demand -= n;
}
if (queue.empty() && !in) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!in means that the producer is done for good, yes? I'm inferring this from the !again below. A few comments on the semantics of the pointer would be helpful.

using impl_t = handler_impl<data_message>;
auto client_id = endpoint_id::random();
auto state = std::make_shared<impl_t>(web_socket_buffer_size(),
auto state = std::make_shared<impl_t>(client_id, std::move(type),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the previous commit?

}

caf::flow::backpressure_overflow_strategy
core_actor_state::store_overflow_policy() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on how the fact that a store now has an overflow policy relates to what we've had so far. This just reflects that we actually don't understand our own store implementation, but could you compare them? Did the past implementation just keep queueing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation applied the back-pressure only to peers. In this new model, we have a more consistent setup. Consequently, I've made this explicit now.


// -- mode implementations -----------------------------------------------------

void print_rate(const std::shared_ptr<std::atomic<bool>>& stopped,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a few words either here or in the commit message how this makes it more accurate, so I don't have to figure it out from the code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll pull that out and file it separately.

ok,
/// The callback was invoked but resulted in a terminal state.
disposed,
disconnect,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you because I didn't like disposed and was going to comment on that. :-)

}
}

void core_actor_state::on_cancel(const handler_ptr& ptr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm commenting here because as usual Github won't let me comment on the commit message (in aa3953b) — I understad simplifying, but I don't understand fixing, could you clarify what was broken? (If it's a bugfix to the earlier code, this commit should go away and be refactored in the the previous ones.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just inter-commit fixing. I'll clean that up.


namespace broker::internal {

/// Denotes the subtype of a handler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for ea8506d — it'd be great if you could refactor that into the earlier commits so this one goes away.

@Neverlord Neverlord force-pushed the topic/neverlord/core-actor-redesign branch from 30a993d to 13cbacd Compare November 23, 2025 18:46
@Neverlord Neverlord requested a review from ckreibich November 23, 2025 18:46
@Neverlord
Copy link
Member Author

@ckreibich thank you for the feedback! I've split the PR into four logical commits: two prepare steps, the actual change, and one cleanup commit that only removes obsolete files. The unrelated change to the broker-node has been filed separately and I've added additional documentation to the multimap.

Currently, we have one filter per subscriber in the core actor. Then, we
ask each subscriber individually whether a certain topic matches its
filter.

The new `subscription_multimap` flips this work flow: we register all
subscribers at the topics they are interested in. Then, we make a single
pass over the subscriptions to obtain all matches via the `select`
member function. The use case we have in Broker is to have a single
filter that stores all subscribers and then do a single central lookup.
The runtime cost of `select` depends on the number of subscribed topics,
not on the number of peers. Hence, we should expect better scalability
when switching to `subscription_multimap`.
Currently, we pass the message that is entering / leaving a peer or
client buffer to the observer. However, this message isn't really useful
to an observer such as Zeek. Furthermore, this API prevents us from
batching calls to the observer. Instead of passing the message to those
callbacks, we now simply pass a `count` argument. This allows us to
"compress" multiple events and also doesn't require us to have the
messages available when calling the observer.
A recent benchmark revealed that Broker's CPU usage goes up
significantly with the number of peering relationships, even if the new
peers never receive any data.

The root cause for this behavior is the flow-based processing in Broker,
where each new peer results in a new observer that subscribes to the
central flow of messages. For each incoming message, its topic is then
checked against the peer's filter. Hence, each message causes N filter
lookups where N is the number of peers.

Moreover, Broker does not really utilize the feature set of CAF flows.
Ultimately, the core actor simply dispatches data by reading from
buffers filled by local producers or peers and then forwards those
messages by writing to another set of buffers that are connected to
subscribers.

The new `handler` abstraction in the core actor expresses this work flow
more aptly. A handler is simply a combination of a sink and a source. A
handler can be registered at a `subscription_multimap` for each topic it
is interested in. A handler may still ignore matched messages, for
example, a handler for a data store would ignore any message on the
topic that is not of type `command_message`.
After switching from the flow-based processing in the core actor to a
new buffer-oriented approach, we no longer need this set of utilities.
@Neverlord Neverlord force-pushed the topic/neverlord/core-actor-redesign branch from 13cbacd to 1c7032d Compare November 23, 2025 18:57
ckreibich pushed a commit to zeek/zeek that referenced this pull request Nov 29, 2025
ckreibich pushed a commit to zeek/zeek that referenced this pull request Dec 1, 2025
By calling `abort` instead of just `dispose` on the output buffer, the
WebSocket client should emit an error frame before closing the
connection to let the clients know that an error occurred.
Add three new unit tests for the core actor to verify that the overflow
policy for peers is enforced correctly.
Neverlord added a commit to zeek/zeek that referenced this pull request Dec 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants