Skip to content

Hackerl/asyncio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Contributors Forks Stargazers Issues Apache 2.0 License


asyncio

C++23 coroutine network framework
Explore the docs »

View Demo · Report Bug · Request Feature

Table of Contents
  1. About The Project
  2. Getting Started
  3. Usage
  4. Roadmap
  5. Contributing
  6. License
  7. Contact
  8. Acknowledgments

About The Project

Based on the libuv event loop, use C++20 stackless coroutines to implement network components, and provide channel to send and receive data between tasks.

asyncio might be better than existing coroutine network libraries in the following ways:

  • A unified error handling method based on std::expected<T, std::error_code>, but also supports exception handling.
  • A simple and direct cancellation method similar to Python's asyncio - task.cancel().
  • Lessons learned from JavaScript's Promise.all, Promise.any, Promise.race, etc., subtask management methods.
  • Lessons learned from Golang's WaitGroup dynamic task management groups.
  • Built-in call stack tracing allows for better debugging and analysis.

(back to top)

Built With

  • CMake
  • vcpkg
  • C++23

(back to top)

Getting Started

Prerequisites

Required compiler:

  • GCC >= 15
  • LLVM >= 18
  • MSVC >= 19.38

Export environment variables:

  • VCPKG_ROOT
  • ANDROID_NDK_HOME(Android)

Build

cmake --workflow --preset debug

(back to top)

Installation

Install asyncio from the vcpkg private registry:

  1. Create a vcpkg-configuration.json file in the project root directory:

    {
      "registries": [
        {
          "kind": "git",
          "repository": "https://github.com/Hackerl/vcpkg-registry",
          "baseline": "aa3865a8ad99b5265c824b0b550fc71bea9a90b1",
          "packages": [
            "asyncio",
            "zero"
          ]
        }
      ]
    }

    The baseline defines the minimum version of asyncio that will be installed. The one used above might be outdated, so please update it as necessary.

  2. Create a vcpkg.json file in the project root directory:

    {
      "name": "project name",
      "version-string": "1.0.0",
      "builtin-baseline": "6b3172d1a7be062b3d0278369aac9a0258cefc65",
      "dependencies": [
        "asyncio"
      ]
    }
  3. Add the following to the CMakeLists.txt file:

    find_package(asyncio CONFIG REQUIRED)
    target_link_libraries(custom_target PRIVATE asyncio::asyncio-main)

Usage

I'm using a typical TCP echo server to demonstrate the features of asyncio as much as possible.

#include <asyncio/net/stream.h> // streaming network components
#include <asyncio/thread.h> // thread and thread pool components
#include <asyncio/signal.h> // signal component
#include <asyncio/time.h> // time component
#include <zero/cmdline.h> // command line parsing component
#include <zero/os/resource.h> // operating system fd/handle wrapper

#ifdef _WIN32
#include <zero/os/windows/error.h> // Windows API call wrapper
#endif

namespace {
    // Receive event or signal, print the task's call stack.
    // For the top-level task, complex subtasks will branch out and form a tree.
    asyncio::task::Task<void, std::error_code> tracing(const auto &task) {
#ifdef _WIN32
        const auto handle = CreateEventA(nullptr, false, false, "Global\\AsyncIOBacktraceEvent");

        if (!handle)
            co_return std::unexpected{
                std::error_code{static_cast<int>(GetLastError()), std::system_category()}
            };

        const zero::os::Resource event{handle};

        while (true) {
            bool cancelled{false};

            // `WaitForSingleObject` cannot be integrated into EventLoop, so we use a separate thread to call it,
            // and a custom cancellation function allows it to be seamlessly integrated into coroutine management.
            CO_EXPECT(co_await asyncio::toThread(
                [&, &cancelled = std::as_const(cancelled)]() -> std::expected<void, std::error_code> {
                    if (WaitForSingleObject(*event, INFINITE) != WAIT_OBJECT_0)
                        return std::unexpected{
                            std::error_code{static_cast<int>(GetLastError()), std::system_category()}
                        };

                    if (cancelled)
                        return std::unexpected{asyncio::task::Error::CANCELLED};

                    return {};
                },
                [&](std::thread::native_handle_type) -> std::expected<void, std::error_code> {
                    cancelled = true;
                    return zero::os::windows::expected([&] {
                        return SetEvent(*event);
                    });
                }
            ));

            // print the task's formatted call stack
            fmt::print(stderr, "{}\n", task.trace());
        }
#else
        // On UNIX, the Signal component can be used directly.
        auto signal = asyncio::Signal::make();
        CO_EXPECT(signal);

        while (true) {
            CO_EXPECT(co_await signal->on(SIGUSR1));
            fmt::print(stderr, "{}\n", task.trace());
        }
#endif
    }

    asyncio::task::Task<void, std::error_code> doSomething() {
        using namespace std::chrono_literals;

        while (true) {
            CO_EXPECT(co_await asyncio::sleep(1s));
            fmt::print("do some thing\n");
        }
    }

    asyncio::task::Task<void, std::error_code> handle(asyncio::net::TCPStream stream) {
        const auto address = stream.remoteAddress();
        CO_EXPECT(address);

        fmt::print("connection[{}]\n", *address);

        while (true) {
            std::string message;
            message.resize(1024);

            const auto n = co_await stream.read(std::as_writable_bytes(std::span{message}));
            CO_EXPECT(n);

            if (*n == 0)
                break;

            message.resize(*n);

            fmt::print("receive message: {}\n", message);
            CO_EXPECT(co_await stream.writeAll(std::as_bytes(std::span{message})));
        }

        co_return {};
    }

    asyncio::task::Task<void, std::error_code> serve(asyncio::net::TCPListener listener) {
        std::expected<void, std::error_code> result;

        // By adding each dynamic task to a `TaskGroup`,
        // we can cancel them all at once and wait for them during graceful shutdown,
        // ensuring that no resources or subtasks are leaked.
        asyncio::task::TaskGroup group;

        while (true) {
            auto stream = co_await listener.accept();

            if (!stream) {
                result = std::unexpected{stream.error()};
                break;
            }

            auto task = handle(*std::move(stream));

            group.add(task);

            // Since the `TaskGroup` doesn't care about the results of the subtasks, we can use future to bind callbacks.
            // Callback binding is very flexible, just like JavaScript's Promise.
            task.future().fail([](const auto &ec) {
                fmt::print(stderr, "unhandled error: {:s} ({})\n", ec, ec);
            });
        }

        // This function waits for all tasks in the `TaskGroup`.
        // When the parent task is canceled, all tasks in the group will be automatically canceled here and will wait for them to complete.
        co_await group;
        co_return result;
    }
}

asyncio::task::Task<void, std::error_code> asyncMain(const int argc, char *argv[]) {
    zero::Cmdline cmdline;

    cmdline.add<std::string>("host", "remote host");
    cmdline.add<std::uint16_t>("port", "remote port");

    cmdline.parse(argc, argv);

    const auto host = cmdline.get<std::string>("host");
    const auto port = cmdline.get<std::uint16_t>("port");

    auto listener = asyncio::net::TCPListener::listen(host, port);
    CO_EXPECT(listener);

    auto signal = asyncio::Signal::make();
    CO_EXPECT(signal);

    // This is the main task of our program.
    auto task = race(
        // A TCP server was started, along with a `doSomething` task to do something else.
        // They are aggregated by `all`, just like `Promise.all` in JavaScript, where failure is returned if either task fails, and the remaining tasks are canceled.
        all(
            serve(*std::move(listener)),
            doSomething()
        ),
        // We wait for the `SIGINT` signal to gracefully shut down.
        // `race` will use the result of the task that completes fastest, so when the signal arrives,
        // the task is complete, `race` returns success and cancels the remaining subtasks.
        signal->on(SIGINT).transform([](const int) {
        })
    );

    // Debugging coroutines is always difficult, so we use the built-in traceback functionality of `asyncio` to assist us.
    co_return co_await race(
        task,
        tracing(task)
    );
}

Start the server with ./server 127.0.0.1 8000, and gracefully exit by pressing ctrl + c in the terminal. You can also send signal or event to make it perform traceback.

You may have noticed the prominent CO_EXPECT, but what exactly is it?

#define CO_EXPECT(...)                                              \
    if (auto &&_result = __VA_ARGS__; !_result)                     \
        co_return std::unexpected{std::move(_result).error()}

Because C++ lacks Rust's question mark syntactic sugar, we cannot conveniently propagate std::expected errors upwards without using macros. Of course, if you absolutely hate macros, you can also explicitly handle each error like in Golang:

auto listener = asyncio::net::TCPListener::listen(host, port);

if (!listener)
    co_return std::unexpected{listener.error()};

auto signal = asyncio::Signal::make();

if (!signal)
    co_return std::unexpected{signal.error()};

If you feel that exceptions can better handle error propagation, and you don't need this roundabout approach, that's fine; asyncio can certainly support it as well.

#include <asyncio/net/stream.h>
#include <asyncio/thread.h>
#include <asyncio/signal.h>
#include <asyncio/time.h>
#include <zero/cmdline.h>
#include <zero/formatter.h>
#include <zero/os/resource.h>

#ifdef _WIN32
#include <zero/os/windows/error.h>
#endif

template<typename T>
T guard(std::expected<T, std::error_code> &&expected) {
    if (!expected)
        throw std::system_error{expected.error()};

    if constexpr (std::is_void_v<T>)
        return;
    else
        return *std::move(expected);
}

namespace {
    asyncio::task::Task<void> tracing(const auto &task) {
#ifdef _WIN32
        const auto handle = CreateEventA(nullptr, false, false, "Global\\AsyncIOBacktraceEvent");

        if (!handle)
            throw std::system_error{
                std::error_code{static_cast<int>(GetLastError()), std::system_category()}
            };

        const zero::os::Resource event{handle};

        while (true) {
            bool cancelled{false};

            guard(co_await asyncio::toThread(
                [&, &cancelled = std::as_const(cancelled)]() -> std::expected<void, std::error_code> {
                    if (WaitForSingleObject(*event, INFINITE) != WAIT_OBJECT_0)
                        return std::unexpected{
                            std::error_code{static_cast<int>(GetLastError()), std::system_category()}
                        };

                    if (cancelled)
                        return std::unexpected{asyncio::task::Error::CANCELLED};

                    return {};
                },
                [&](std::thread::native_handle_type) -> std::expected<void, std::error_code> {
                    cancelled = true;
                    return zero::os::windows::expected([&] {
                        return SetEvent(*event);
                    });
                }
            ));

            fmt::print(stderr, "{}\n", task.trace());
        }
#else
        auto signal = guard(asyncio::Signal::make());

        while (true) {
            guard(co_await signal.on(SIGUSR1));
            fmt::print(stderr, "{}\n", task.trace());
        }
#endif
    }

    asyncio::task::Task<void> doSomething() {
        using namespace std::chrono_literals;

        while (true) {
            guard(co_await asyncio::sleep(1s));
            fmt::print("do some thing\n");
        }
    }

    asyncio::task::Task<void> handle(asyncio::net::TCPStream stream) {
        const auto address = guard(stream.remoteAddress());
        fmt::print("connection[{}]\n", address);

        while (true) {
            std::string message;
            message.resize(1024);

            const auto n = guard(co_await stream.read(std::as_writable_bytes(std::span{message})));

            if (n == 0)
                break;

            message.resize(n);

            fmt::print("receive message: {}\n", message);
            guard(co_await stream.writeAll(std::as_bytes(std::span{message})));
        }

        co_return;
    }

    asyncio::task::Task<void> serve(asyncio::net::TCPListener listener) {
        std::expected<void, std::error_code> result;
        asyncio::task::TaskGroup group;

        while (true) {
            auto stream = co_await listener.accept();

            if (!stream) {
                result = std::unexpected{stream.error()};
                break;
            }

            auto task = handle(*std::move(stream));

            group.add(task);
            task.future().fail([](const auto &e) {
                fmt::print(stderr, "unhandled exception: {}\n", e);
            });
        }

        co_await group;
        guard(std::move(result));
    }
}

asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
    zero::Cmdline cmdline;

    cmdline.add<std::string>("host", "remote host");
    cmdline.add<std::uint16_t>("port", "remote port");

    cmdline.parse(argc, argv);

    const auto host = cmdline.get<std::string>("host");
    const auto port = cmdline.get<std::uint16_t>("port");

    auto listener = guard(asyncio::net::TCPListener::listen(host, port));
    auto signal = guard(asyncio::Signal::make());

    auto task = race(
        all(
            serve(std::move(listener)),
            doSomething()
        ),
        asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
            guard(co_await signal.on(SIGINT));
        })
    );

    co_return co_await race(
        task,
        tracing(task)
    );
}

int main(const int argc, char *argv[]) {
    const auto result = asyncio::run([=] {
        return asyncMain(argc, argv);
    });

    if (!result)
        throw std::system_error{result.error()};

    if (!*result)
        std::rethrow_exception(result->error());

    return EXIT_SUCCESS;
}

It seems to look better. I'm not against exceptions, but the reason the asyncio API uses std::error_code is that it's easy to convert from std::error_code to an exception, but not vice versa.

For more examples, please refer to the Documentation

(back to top)

Roadmap

  • HTTP Server

See the open issues for a full list of proposed features (and known issues).

(back to top)

Contributing

Contributions are what make the open source community such an amazing place to learn, inspire, and create. Any contributions you make are greatly appreciated.

If you have a suggestion that would make this better, please fork the repo and create a pull request. You can also simply open an issue with the tag "enhancement". Don't forget to give the project a star! Thanks again!

  1. Fork the Project
  2. Create your Feature Branch (git checkout -b feature/AmazingFeature)
  3. Commit your Changes (git commit -m 'Add some AmazingFeature')
  4. Push to the Branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

(back to top)

License

Distributed under the Apache 2.0 License. See LICENSE for more information.

(back to top)

Contact

Hackerl - @Hackerl - patteliu@gmail.com

Project Link: https://github.com/Hackerl/asyncio

(back to top)

Acknowledgments

(back to top)

About

C++23 coroutine network framework

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published