-
Notifications
You must be signed in to change notification settings - Fork 25
Centralize the filter lookup in the core actor #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
See Broker-side PR zeek/broker#487
ckreibich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! As usual, sorry for taking forever to review this. My usual Broker disclaimer applies — I think I get this at a high level, and almost certainly won't notice bugs in the details. I am continuing to treat CAF as a black box.
My gut feel is that this is quite a large changeset for a potential 6% improvement, but I do like the more explicit callbacks and overall layout of the code. I'm nervous about new buffering semantics. To be clear: the only real change in backpressure logic is for datastores, right?
My only real change request here is refactoring the commits that start with "fix" into earlier commits — these should ideally go away, unless I'm misunderstanding their purpose? If they need to stay, it'd be great to get more detail on what "fixing" means.
| } | ||
|
|
||
| /// Selects all values that match the given topic, i.e., appends all values | ||
| /// from entries that have a key that is a prefix of the given topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This prefix logic is also the reason you're not using std::multimap, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. We need to scan all entries anyways due to the prefix lookup. A vector is simply faster in this case. Can also use the trie I've been working on as future optimization.
| std::sort(result.begin(), result.end()); | ||
| result.erase(std::unique(result.begin(), result.end()), result.end()); | ||
| } | ||
| std::vector<value_type> select(const key_type& topic) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want an extra line before this for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've also added more documentation.
| public: | ||
| using super = handler; | ||
|
|
||
| using buffer_producer_ptr = caf::async::spsc_buffer_producer_ptr<T>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what does spsc stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single Producer Single Consumer.
| out->push(std::move(items)); | ||
| demand -= n; | ||
| } | ||
| if (queue.empty() && !in) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!in means that the producer is done for good, yes? I'm inferring this from the !again below. A few comments on the semantics of the pointer would be helpful.
| using impl_t = handler_impl<data_message>; | ||
| auto client_id = endpoint_id::random(); | ||
| auto state = std::make_shared<impl_t>(web_socket_buffer_size(), | ||
| auto state = std::make_shared<impl_t>(client_id, std::move(type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in the previous commit?
| } | ||
|
|
||
| caf::flow::backpressure_overflow_strategy | ||
| core_actor_state::store_overflow_policy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear on how the fact that a store now has an overflow policy relates to what we've had so far. This just reflects that we actually don't understand our own store implementation, but could you compare them? Did the past implementation just keep queueing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous implementation applied the back-pressure only to peers. In this new model, we have a more consistent setup. Consequently, I've made this explicit now.
broker-node/broker-node.cc
Outdated
|
|
||
| // -- mode implementations ----------------------------------------------------- | ||
|
|
||
| void print_rate(const std::shared_ptr<std::atomic<bool>>& stopped, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a few words either here or in the commit message how this makes it more accurate, so I don't have to figure it out from the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll pull that out and file it separately.
| ok, | ||
| /// The callback was invoked but resulted in a terminal state. | ||
| disposed, | ||
| disconnect, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you because I didn't like disposed and was going to comment on that. :-)
| } | ||
| } | ||
|
|
||
| void core_actor_state::on_cancel(const handler_ptr& ptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm commenting here because as usual Github won't let me comment on the commit message (in aa3953b) — I understad simplifying, but I don't understand fixing, could you clarify what was broken? (If it's a bugfix to the earlier code, this commit should go away and be refactored in the the previous ones.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just inter-commit fixing. I'll clean that up.
|
|
||
| namespace broker::internal { | ||
|
|
||
| /// Denotes the subtype of a handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for ea8506d — it'd be great if you could refactor that into the earlier commits so this one goes away.
30a993d to
13cbacd
Compare
|
@ckreibich thank you for the feedback! I've split the PR into four logical commits: two prepare steps, the actual change, and one cleanup commit that only removes obsolete files. The unrelated change to the broker-node has been filed separately and I've added additional documentation to the multimap. |
Currently, we have one filter per subscriber in the core actor. Then, we ask each subscriber individually whether a certain topic matches its filter. The new `subscription_multimap` flips this work flow: we register all subscribers at the topics they are interested in. Then, we make a single pass over the subscriptions to obtain all matches via the `select` member function. The use case we have in Broker is to have a single filter that stores all subscribers and then do a single central lookup. The runtime cost of `select` depends on the number of subscribed topics, not on the number of peers. Hence, we should expect better scalability when switching to `subscription_multimap`.
Currently, we pass the message that is entering / leaving a peer or client buffer to the observer. However, this message isn't really useful to an observer such as Zeek. Furthermore, this API prevents us from batching calls to the observer. Instead of passing the message to those callbacks, we now simply pass a `count` argument. This allows us to "compress" multiple events and also doesn't require us to have the messages available when calling the observer.
A recent benchmark revealed that Broker's CPU usage goes up significantly with the number of peering relationships, even if the new peers never receive any data. The root cause for this behavior is the flow-based processing in Broker, where each new peer results in a new observer that subscribes to the central flow of messages. For each incoming message, its topic is then checked against the peer's filter. Hence, each message causes N filter lookups where N is the number of peers. Moreover, Broker does not really utilize the feature set of CAF flows. Ultimately, the core actor simply dispatches data by reading from buffers filled by local producers or peers and then forwards those messages by writing to another set of buffers that are connected to subscribers. The new `handler` abstraction in the core actor expresses this work flow more aptly. A handler is simply a combination of a sink and a source. A handler can be registered at a `subscription_multimap` for each topic it is interested in. A handler may still ignore matched messages, for example, a handler for a data store would ignore any message on the topic that is not of type `command_message`.
After switching from the flow-based processing in the core actor to a new buffer-oriented approach, we no longer need this set of utilities.
13cbacd to
1c7032d
Compare
See Broker-side PR zeek/broker#487
See Broker-side PR zeek/broker#487
By calling `abort` instead of just `dispose` on the output buffer, the WebSocket client should emit an error frame before closing the connection to let the clients know that an error occurred.
Add three new unit tests for the core actor to verify that the overflow policy for peers is enforced correctly.
See Broker-side PR zeek/broker#487
Problem
The core actor dispatches incoming messages (from local publishers, peers or WebSockets) to subscribers (again: local subscribers, peers or WebSockets). For that, Broker currently sets up data flow pipelines. Each subscriber subscribes to the central merge point (where all the inputs flow together) and then filters unwanted messages.
The problem is that this filtering is done by each subscriber individually. This means we will perform n filter checks, where n is the number of subscribers. Since subscriptions in Broker are prefix-based, this means a filter check isn’t just a single hash map lookup.
Proposed Solution
At the end of the day, Broker shoves data from one set of buffers (inputs) to another set of buffers (outputs). Those buffers then connect to hubs, stores, peers or WebSocket clients. While data flows come with nice properties and APIs, we barely use any of that. In fact, the pipeline setup in the core actor has become quite involved since there we have added WebSocket peers and hubs to the mix.
The proposed solution operates much closer to the buffers. We introduce a new
handlerconcept in the core actor, which is basically just a data sink. It is managing an output buffer and also adds back-pressure. Each handler has a queue that fills up if the buffer reached its capacity (which happens if we push faster than the reader consumes from the buffer). If that queue overflows, we apply the configured strategy (disconnect, drop-oldest or drop-newest). The main change is that we store all handlers in a single filter: a multi map that allows storing multiples handlers for the same topic(s). This means we always do a single, central filter lookup. The result of that lookup is a list of handlers that have a matching subscription. Then we callofferon all those handlers. This method may do a few extra checks. For example, a data store handler discards anything that isn’t acommandmessage.With this design, we only ever have exactly one filter lookup. While that filter may still grow with the number of peers, a central filter lookup should still be faster than doing n lookups.
Benchmarking
Setup: sending 20k messages/s. To start the receiving node:
To start the sending node:
After a while, we peer 20 additional nodes with the receiver, starting each via:
Baseline
No additional peerings
broker-node: ~36% CPU
broker-throughput: ~34% CPU
20 additional peerings
broker-node: ~53%
broker-throughput: ~37%
Branch
broker-node: ~36% CPU
broker-throughput: ~34% CPU
20 additional peerings
broker-node: ~47%
broker-throughput: ~36%
Discussion
Doing a single filter lookup reduces CPU load in the benchmark by about 6% (from 53% to 47%), which is good. However, there is still some linear scaling with the number of peers. The additional peers are not subscribed to the
/benchmark/eventstopic, so they don’t receive any data. I can verify from the system diagnostics that the extra peers sit idle well below 1% CPU and don’t receive any data. Hence, they shouldn’t cause additional CPU load.Long story short, the new structure cuts down unnecessary CPU load due to centralizing the filter lookup (resulting in a ~6% reduction of CPU load for the benchmark). I think the new code is also simpler, which is an added bonus. However, even in the new branch we can see that adding more peers adds run-time cost even if they don’t receive anything. We should look into that next, after wrapping up this PR.