-
TL;DR: want to use a function that returns the result as a callback, and then return that value to the next operator. I am pretty sure this won't work because the asio token is lazy and the rpp operator is eager. Or is it? I have a callback interface from a class that is invoked every time a message arrives on a socket. I used the code below to hook the event emitter to the observable. auto emitter = ...
auto obs = rpp::source::create<std::vector<uint8_t>>([emitter](const auto& sub) {
emitter->watch([](const std::vector<uint8_t> msg) {
sub.on_next(msg);
});
}); This is working just fine for the case of: auto obs = (...)
.map([](auto msg) { return std::make_shared<Decoder>(msg); })
.subscribe(
[](std::shared_ptr<Decoder> res) { std::cout << res->value(); }, // Print out the parsed value
[](std::exception_ptr err) {
try {
std::rethrow_exception(err);
} catch (std::exception &exp) {
std::cerr << exp.what();
}
exit();
}); What I would like to do next is dispatch this to an async call that returns an asio completion token type. I would like to extract that result and then pass it on to the next stage of the pipeline. Something like: auto asio_request = ...
auto obs = (...)
.map([](auto msg) { return std::make_shared<Decoder>(std::move(msg)); })
.map([asio_request](auto msg) {
return asio_request->request(msg->value(), asio::deferred); } // return an asio completion token.
})
...???... // on the resolution of the completion token emit the value and/or error
.subscribe(
[](auto res) { std::cout << res; }, // Print out the message returned from the completion token
[](std::exception_ptr err) {
try {
std::rethrow_exception(err);
} catch (std::exception &exp) {
std::cerr << exp.what();
}
exit();
}); |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
Sorry, i don't have a lot of experience with asio especially new things related to tokens and etc. Could you provide more detailed or abstract example? auto obs = ...;
obs.observer_on(rpp::scheduler::new_thread{}).
.map([](auto msg) { .... std::this_thread::sleep_for(std::chrono::seconds{10}); return msg.get_value(); })
.subscribe.... |
Beta Was this translation helpful? Give feedback.
-
TL;DR: map+concat is the answer =) |
Beta Was this translation helpful? Give feedback.
TL;DR: map+concat is the answer =)