C++23 coroutine network framework
Explore the docs »
View Demo
·
Report Bug
·
Request Feature
Table of Contents
Based on the libuv event loop, use C++20 stackless coroutines to implement network components, and provide channel to send and receive data between tasks.
asyncio might be better than existing coroutine network libraries in the following ways:
- A unified error handling method based on
std::expected<T, std::error_code>, but also supports exception handling. - A simple and direct cancellation method similar to
Python'sasyncio-task.cancel(). - Lessons learned from
JavaScript'sPromise.all,Promise.any,Promise.race, etc., subtask management methods. - Lessons learned from
Golang'sWaitGroupdynamic task management groups. - Built-in call stack tracing allows for better debugging and analysis.
Required compiler:
- GCC >= 15
- LLVM >= 18
- MSVC >= 19.38
Export environment variables:
- VCPKG_ROOT
- ANDROID_NDK_HOME(Android)
cmake --workflow --preset debugInstall asyncio from the vcpkg private registry:
-
Create a
vcpkg-configuration.jsonfile in the project root directory:{ "registries": [ { "kind": "git", "repository": "https://github.com/Hackerl/vcpkg-registry", "baseline": "aa3865a8ad99b5265c824b0b550fc71bea9a90b1", "packages": [ "asyncio", "zero" ] } ] }The
baselinedefines the minimum version ofasynciothat will be installed. The one used above might be outdated, so please update it as necessary. -
Create a
vcpkg.jsonfile in the project root directory:{ "name": "project name", "version-string": "1.0.0", "builtin-baseline": "6b3172d1a7be062b3d0278369aac9a0258cefc65", "dependencies": [ "asyncio" ] } -
Add the following to the
CMakeLists.txtfile:find_package(asyncio CONFIG REQUIRED) target_link_libraries(custom_target PRIVATE asyncio::asyncio-main)
I'm using a typical TCP echo server to demonstrate the features of asyncio as much as possible.
#include <asyncio/net/stream.h> // streaming network components
#include <asyncio/thread.h> // thread and thread pool components
#include <asyncio/signal.h> // signal component
#include <asyncio/time.h> // time component
#include <zero/cmdline.h> // command line parsing component
#include <zero/os/resource.h> // operating system fd/handle wrapper
#ifdef _WIN32
#include <zero/os/windows/error.h> // Windows API call wrapper
#endif
namespace {
// Receive event or signal, print the task's call stack.
// For the top-level task, complex subtasks will branch out and form a tree.
asyncio::task::Task<void, std::error_code> tracing(const auto &task) {
#ifdef _WIN32
const auto handle = CreateEventA(nullptr, false, false, "Global\\AsyncIOBacktraceEvent");
if (!handle)
co_return std::unexpected{
std::error_code{static_cast<int>(GetLastError()), std::system_category()}
};
const zero::os::Resource event{handle};
while (true) {
bool cancelled{false};
// `WaitForSingleObject` cannot be integrated into EventLoop, so we use a separate thread to call it,
// and a custom cancellation function allows it to be seamlessly integrated into coroutine management.
CO_EXPECT(co_await asyncio::toThread(
[&, &cancelled = std::as_const(cancelled)]() -> std::expected<void, std::error_code> {
if (WaitForSingleObject(*event, INFINITE) != WAIT_OBJECT_0)
return std::unexpected{
std::error_code{static_cast<int>(GetLastError()), std::system_category()}
};
if (cancelled)
return std::unexpected{asyncio::task::Error::CANCELLED};
return {};
},
[&](std::thread::native_handle_type) -> std::expected<void, std::error_code> {
cancelled = true;
return zero::os::windows::expected([&] {
return SetEvent(*event);
});
}
));
// print the task's formatted call stack
fmt::print(stderr, "{}\n", task.trace());
}
#else
// On UNIX, the Signal component can be used directly.
auto signal = asyncio::Signal::make();
CO_EXPECT(signal);
while (true) {
CO_EXPECT(co_await signal->on(SIGUSR1));
fmt::print(stderr, "{}\n", task.trace());
}
#endif
}
asyncio::task::Task<void, std::error_code> doSomething() {
using namespace std::chrono_literals;
while (true) {
CO_EXPECT(co_await asyncio::sleep(1s));
fmt::print("do some thing\n");
}
}
asyncio::task::Task<void, std::error_code> handle(asyncio::net::TCPStream stream) {
const auto address = stream.remoteAddress();
CO_EXPECT(address);
fmt::print("connection[{}]\n", *address);
while (true) {
std::string message;
message.resize(1024);
const auto n = co_await stream.read(std::as_writable_bytes(std::span{message}));
CO_EXPECT(n);
if (*n == 0)
break;
message.resize(*n);
fmt::print("receive message: {}\n", message);
CO_EXPECT(co_await stream.writeAll(std::as_bytes(std::span{message})));
}
co_return {};
}
asyncio::task::Task<void, std::error_code> serve(asyncio::net::TCPListener listener) {
std::expected<void, std::error_code> result;
// By adding each dynamic task to a `TaskGroup`,
// we can cancel them all at once and wait for them during graceful shutdown,
// ensuring that no resources or subtasks are leaked.
asyncio::task::TaskGroup group;
while (true) {
auto stream = co_await listener.accept();
if (!stream) {
result = std::unexpected{stream.error()};
break;
}
auto task = handle(*std::move(stream));
group.add(task);
// Since the `TaskGroup` doesn't care about the results of the subtasks, we can use future to bind callbacks.
// Callback binding is very flexible, just like JavaScript's Promise.
task.future().fail([](const auto &ec) {
fmt::print(stderr, "unhandled error: {:s} ({})\n", ec, ec);
});
}
// This function waits for all tasks in the `TaskGroup`.
// When the parent task is canceled, all tasks in the group will be automatically canceled here and will wait for them to complete.
co_await group;
co_return result;
}
}
asyncio::task::Task<void, std::error_code> asyncMain(const int argc, char *argv[]) {
zero::Cmdline cmdline;
cmdline.add<std::string>("host", "remote host");
cmdline.add<std::uint16_t>("port", "remote port");
cmdline.parse(argc, argv);
const auto host = cmdline.get<std::string>("host");
const auto port = cmdline.get<std::uint16_t>("port");
auto listener = asyncio::net::TCPListener::listen(host, port);
CO_EXPECT(listener);
auto signal = asyncio::Signal::make();
CO_EXPECT(signal);
// This is the main task of our program.
auto task = race(
// A TCP server was started, along with a `doSomething` task to do something else.
// They are aggregated by `all`, just like `Promise.all` in JavaScript, where failure is returned if either task fails, and the remaining tasks are canceled.
all(
serve(*std::move(listener)),
doSomething()
),
// We wait for the `SIGINT` signal to gracefully shut down.
// `race` will use the result of the task that completes fastest, so when the signal arrives,
// the task is complete, `race` returns success and cancels the remaining subtasks.
signal->on(SIGINT).transform([](const int) {
})
);
// Debugging coroutines is always difficult, so we use the built-in traceback functionality of `asyncio` to assist us.
co_return co_await race(
task,
tracing(task)
);
}Start the server with
./server 127.0.0.1 8000, and gracefully exit by pressingctrl + cin the terminal. You can also send signal or event to make it perform traceback.
You may have noticed the prominent CO_EXPECT, but what exactly is it?
#define CO_EXPECT(...) \
if (auto &&_result = __VA_ARGS__; !_result) \
co_return std::unexpected{std::move(_result).error()}Because C++ lacks Rust's question mark syntactic sugar, we cannot conveniently propagate std::expected errors upwards without using macros.
Of course, if you absolutely hate macros, you can also explicitly handle each error like in Golang:
auto listener = asyncio::net::TCPListener::listen(host, port);
if (!listener)
co_return std::unexpected{listener.error()};
auto signal = asyncio::Signal::make();
if (!signal)
co_return std::unexpected{signal.error()};If you feel that exceptions can better handle error propagation, and you don't need this roundabout approach, that's fine; asyncio can certainly support it as well.
#include <asyncio/net/stream.h>
#include <asyncio/thread.h>
#include <asyncio/signal.h>
#include <asyncio/time.h>
#include <zero/cmdline.h>
#include <zero/formatter.h>
#include <zero/os/resource.h>
#ifdef _WIN32
#include <zero/os/windows/error.h>
#endif
template<typename T>
T guard(std::expected<T, std::error_code> &&expected) {
if (!expected)
throw std::system_error{expected.error()};
if constexpr (std::is_void_v<T>)
return;
else
return *std::move(expected);
}
namespace {
asyncio::task::Task<void> tracing(const auto &task) {
#ifdef _WIN32
const auto handle = CreateEventA(nullptr, false, false, "Global\\AsyncIOBacktraceEvent");
if (!handle)
throw std::system_error{
std::error_code{static_cast<int>(GetLastError()), std::system_category()}
};
const zero::os::Resource event{handle};
while (true) {
bool cancelled{false};
guard(co_await asyncio::toThread(
[&, &cancelled = std::as_const(cancelled)]() -> std::expected<void, std::error_code> {
if (WaitForSingleObject(*event, INFINITE) != WAIT_OBJECT_0)
return std::unexpected{
std::error_code{static_cast<int>(GetLastError()), std::system_category()}
};
if (cancelled)
return std::unexpected{asyncio::task::Error::CANCELLED};
return {};
},
[&](std::thread::native_handle_type) -> std::expected<void, std::error_code> {
cancelled = true;
return zero::os::windows::expected([&] {
return SetEvent(*event);
});
}
));
fmt::print(stderr, "{}\n", task.trace());
}
#else
auto signal = guard(asyncio::Signal::make());
while (true) {
guard(co_await signal.on(SIGUSR1));
fmt::print(stderr, "{}\n", task.trace());
}
#endif
}
asyncio::task::Task<void> doSomething() {
using namespace std::chrono_literals;
while (true) {
guard(co_await asyncio::sleep(1s));
fmt::print("do some thing\n");
}
}
asyncio::task::Task<void> handle(asyncio::net::TCPStream stream) {
const auto address = guard(stream.remoteAddress());
fmt::print("connection[{}]\n", address);
while (true) {
std::string message;
message.resize(1024);
const auto n = guard(co_await stream.read(std::as_writable_bytes(std::span{message})));
if (n == 0)
break;
message.resize(n);
fmt::print("receive message: {}\n", message);
guard(co_await stream.writeAll(std::as_bytes(std::span{message})));
}
co_return;
}
asyncio::task::Task<void> serve(asyncio::net::TCPListener listener) {
std::expected<void, std::error_code> result;
asyncio::task::TaskGroup group;
while (true) {
auto stream = co_await listener.accept();
if (!stream) {
result = std::unexpected{stream.error()};
break;
}
auto task = handle(*std::move(stream));
group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "unhandled exception: {}\n", e);
});
}
co_await group;
guard(std::move(result));
}
}
asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
zero::Cmdline cmdline;
cmdline.add<std::string>("host", "remote host");
cmdline.add<std::uint16_t>("port", "remote port");
cmdline.parse(argc, argv);
const auto host = cmdline.get<std::string>("host");
const auto port = cmdline.get<std::uint16_t>("port");
auto listener = guard(asyncio::net::TCPListener::listen(host, port));
auto signal = guard(asyncio::Signal::make());
auto task = race(
all(
serve(std::move(listener)),
doSomething()
),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
guard(co_await signal.on(SIGINT));
})
);
co_return co_await race(
task,
tracing(task)
);
}
int main(const int argc, char *argv[]) {
const auto result = asyncio::run([=] {
return asyncMain(argc, argv);
});
if (!result)
throw std::system_error{result.error()};
if (!*result)
std::rethrow_exception(result->error());
return EXIT_SUCCESS;
}It seems to look better. I'm not against exceptions, but the reason the
asyncioAPI usesstd::error_codeis that it's easy to convert fromstd::error_codeto an exception, but not vice versa.
For more examples, please refer to the Documentation
- HTTP Server
See the open issues for a full list of proposed features (and known issues).
Contributions are what make the open source community such an amazing place to learn, inspire, and create. Any contributions you make are greatly appreciated.
If you have a suggestion that would make this better, please fork the repo and create a pull request. You can also simply open an issue with the tag "enhancement". Don't forget to give the project a star! Thanks again!
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature) - Commit your Changes (
git commit -m 'Add some AmazingFeature') - Push to the Branch (
git push origin feature/AmazingFeature) - Open a Pull Request
Distributed under the Apache 2.0 License. See LICENSE for more information.
Hackerl - @Hackerl - patteliu@gmail.com
Project Link: https://github.com/Hackerl/asyncio