Overview

Here’s a list of relevant features in cobalt:

Table 1. Coroutine types

promise

An eager coroutine returning a single result- consider it the default

generator

An eager coroutine that can yield multiple values.

task

A lazy version of promise that can be spawned onto other executors.

detached

A coroutine similar to promise, without a handle

Table 2. Synchronization Functions

race

A function that waits for one coroutine out of a set that is ready in a pseudo-random way, to avoid starvation.

join

A function that waits for a set of coroutines and returns all of them as value or throws an exception if any awaitable does so.

gather

A function that waits for a set of coroutines and returns all of them as result, capturing all exceptions individually.

left_race

A deterministic race that evaluates left-to-right.

Table 3. Utilities

channel

A thread-local utility to send values between coroutines.

with

An async RAII helper, that allows async teardown when exceptions occur

Table 4. Reading guide

Coroutine Primer

A short introduction to C++ coroutines

Read if you’ve never used coroutines before

Tour

An abbreviated high level view of the features and concepts

Read if you’re familiar with asio & coroutines and want a rough idea what this library offers.

Tutorial

Low level view of usages

Read if you want to get coding quickly

Reference

API reference

Look up details while coding

[technical_background]

Some implementation details

Read if you’re not confused enough

Motivation

Many languages programming languages like node.js and python provide easy to use single-threaded concurrency frameworks. While more complex than synchronous code, single threaded asynchronicity avoids many of the pitfalls & overhead of multi-threading.

That is, one coroutine can work, while others wait for events (e.g. a response from a server). This allows to write applications that do multiple things simultaneously on a single thread.

This library is meant to provide this to C++: simple single threaded asynchronicity akin to node.js and asyncio in python that works with existing libraries like boost.beast, boost.mysql or boost.redis. It based on boost.asio.

It takes a collection of concepts from other languages and provides them based on C++20 coroutines.

Unlike asio::awaitable and asio::experimental::coro, cobalt coroutines are open. That is, an asio::awaitable can only await and be awaited by other asio::awaitable and does not provide coroutine specific synchronization mechanisms.

cobalt on the other hand provides a coroutine specific channel and different wait types (race, gather etc.) that are optimized to work with coroutines and awaitables.

Coroutine Primer

Async programming

Asynchronous programming generally refers to a style of programming that allows tasks to be run in the background, while the other works is performed.

Imagine if you will a get-request function that performs a full http request including connecting & ssl handshakes etc.

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    auto res = http_get("https://boost.org");
    printf("%s", res.c_str());
    return 0;
}

The above code would be traditional synchronous programming. If we want to perform two requests in parallel we would need to create another thread to run another thread with synchronous code.

std::string http_get(std:string_view url);

int main(int argc, char * argv[])
{
    std::string other_res;

    std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
    auto res = http_get("https://boost.org");
    thr.join();

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

This works, but our program will spend most of the time waiting for input. Operating systems provide APIs that allow IO to be performed asynchronously, and libraries such as boost.asio provide portable ways to manage asynchronous operations. Asio itself does not dictate a way to handle the completions. This library (boost.cobalt) provides a way to manage this all through coroutines/awaitables.

cobalt::promise<std::string> http_cobalt_get(std:string_view url);

cobalt::main co_main(int argc, char * argv[])
{
    auto [res, other_res] =
            cobalt::join(
                http_cobalt_get(("https://boost.org"),
                http_cobalt_get(("https://cppalliance.org")
            );

    printf("%s", res.c_str());
    printf("%s", other_res.c_str());
    return 0;
}

In the above code the asynchronous function to perform the request takes advantage of the operating system APIs so that the actual IO doesn’t block. This means that while we’re waiting for both functions to complete, the operations are interleaved and non-blocking. At the same time cobalt provides the coroutine primitives that keep us out of callback hell.

Coroutines

Coroutines are resumable functions. Resumable means that a function can suspend, i.e. pass the control back to the caller multiple times.

A regular function yields control back to the caller with the return function, where it also returns the value.

A coroutine on the other hand might yield control to the caller and get resumed multiple times.

A coroutine has three control keywords akin to co_return (of which only co_return has to be supported).

  • co_return

  • co_yield

  • co_await

co_return

This is similar to return, but marks the function as a coroutine.

co_await

The co_await expression suspends for an Awaitable, i.e. stops execution until the awaitable resumes it.

E.g.:

cobalt::promise<void> delay(std::chrono::milliseconds);

cobalt::task<void> example()
{
  co_await delay(std::chrono::milliseconds(50));
}

A co_await expression can yield a value, depending on what it is awaiting.

cobalt::promise<std::string> read_some();

cobalt::task<void> example()
{
  std::string res = co_await read_some();
}
In cobalt most coroutine primitives are also Awaitables.

co_yield

The co_yield expression is similar to the co_await, but it yields control to the caller and carries a value.

For example:

cobalt::generator<int> iota(int max)
{
  int i = 0;
  while (i < max)
    co_yield i++;

  co_return i;
}

A co_yield expression can also produce a value, which allows the user of yielding coroutine to push values into it.

cobalt::generator<int> iota()
{
  int i = 0;
  bool more = false;
  do
  {
    more = co_yield i++;
  }
  while(more);
  co_return -1;
}
Stackless

C++ coroutine are stack-less, which means they only allocate their own function frame.

See Stackless for more details.

Awaitables

Awaitables are types that can be used in a co_await expression.

struct awaitable_prototype
{
    bool await_ready();

    template<typename T>
    see_below await_suspend(std::coroutine_handle<T>);

    return_type  await_resume();
};
Type will be implicitly converted into an awaitable if there is an operator co_await call available. This documentation will use awaitable to include these types, and "actual_awaitable" to refer to type conforming to the above prototype.
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


flowchart TD
    aw{await_ready?}
    aw ---->|true| ar[await_resume]
    aw -->|false| as[await_suspend]
    as -->|Resume| ar

In a co_await expression the waiting coroutine will first invoke await_ready to check if the coroutine needs to suspend. When ready, it goes directly to await_resume to get the value, as there is no suspension needed. Otherwise, it will suspend itself and call await_suspend with a std::coroutine_handle to its own promise.

std::coroutine_handle<void> can be used for type erasure.

The return_type is the result type of the co_await expression, e.g. int:

int i = co_await awaitable_with_int_result();

The return type of the await_suspend can be three things:

  • void

  • bool

  • std::coroutine_handle<U>

If it is void the awaiting coroutine remains suspended. If it is bool, the value will be checked, and if false, the awaiting coroutine will resume right away.

If a std::coroutine_handle is returned, this coroutine will be resumed. The latter allows await_suspend to return the handle passed in, being effectively the same as returning false.

If the awaiting coroutine gets re-resumed right away, i.e. after calling await_resume, it is referred to as "immediate completion" within this library. This is not to be confused with a non-suspending awaitable, i.e. one that returns true from await_ready.

Event Loops

Since the coroutines in cobalt can co_await events, they need to be run on an event-loop. That is another piece of code is responsible for tracking outstanding event and resume a resuming coroutines that are awaiting them. This pattern is very common and is used in a similar way by node.js or python’s asyncio.

cobalt uses an asio::io_context as its default event loop. That is, the classes thread, main and the run function are using it internally.

You can use any event loop that can produce an asio::any_io_executor with the library. The easiest way to achieve this is by using spawn.

The event loop is accessed through an executor (following the asio terminology) and can be manually set using set_executor.

Tour

Entry into an cobalt environment

In order to use awaitables we need to be able to co_await them, i.e. be within a coroutine.

We got four ways to achieve this:

cobalt/main.hpp

replace int main with a coroutine

cobalt::main co_main(int argc, char* argv[])
{
    // co_await things here
    co_return 0;
}
cobalt/thread.hpp

create a thread for the asynchronous environments

cobalt::thread my_thread()
{
    // co_await things here
    co_return;
}

int main(int argc, char ** argv[])
{
    auto t = my_thread();
    t.join();
    return 0;
}
cobalt/task.hpp

create a task and run or spawn it

cobalt::task<void> my_thread()
{
   // co_await things here
   co_return;
}

int main(int argc, char ** argv[])
{
    cobalt::run(my_task()); // sync
    asio::io_context ctx;
    cobalt::spawn(ctx, my_task(), asio::detached);
    ctx.run();
    return 0;
}

Promises

Promises are the recommended default coroutine type. They’re eager and thus easily usable for ad-hoc concurrecy.

cobalt::promise<int> my_promise()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // start the promise here
    auto p = my_promise();
    // do something else here
    co_await do_the_other_thing();
    // wait for the promise to complete
    auto res = co_wait p;

    co_return res;
}

Tasks

Tasks are lazy, which means they won’t do anything before awaited or spwaned.

cobalt::task<int> my_task()
{
   co_await do_the_thing();
   co_return 0;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the task here
    auto t = my_task();
    // do something else here first
    co_await do_the_other_thing();
    // start and wait for the task to complete
    auto res = co_wait t;
    co_return res;
}

Generator

A generator is the only type in cobalt that can co_yield values.

Generator are eager by default. Unlike std::generator the cobalt::generator can co_await and thus is asynchronous.

cobalt::generator<int> my_generator()
{
   for (int i = 0; i < 10; i++)
    co_yield i;
   co_return 10;
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator();
    while (g)
        printf("Generator %d\n", co_await g);
    co_return 0;
}

Values can be pushed into the generator, that will be returned from the co_yield.

cobalt::generator<double, int> my_eager_push_generator(int value)
{
   while (value != 0)
       value = co_yield value * 0.1;
   co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(5);

    assert(0.5 == co_await g(4)); // result of 5
    assert(0.4 == co_await g(3)); // result of 4
    assert(0.3 == co_await g(2)); // result of 3
    assert(0.2 == co_await g(1)); // result of 2
    assert(0.1 == co_await g(0)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

A coroutine can also be made lazy using this_coro::initial.

cobalt::generator<double, int> my_eager_push_generator()
{
    auto value = co_await this_coro::initial;
    while (value != 0)
        value = co_yield value * 0.1;
    co_return std::numeric_limits<double>::quiet_NaN();
}

cobalt::main co_main(int argc, char * argv[])
{
    // create the generator
    auto g = my_generator(); // lazy, so the generator waits for the first pushed value
    assert(0.5 == co_await g(5)); // result of 5
    assert(0.4 == co_await g(4)); // result of 4
    assert(0.3 == co_await g(3)); // result of 3
    assert(0.2 == co_await g(2)); // result of 2
    assert(0.1 == co_await g(1)); // result of 1

    // we let the coroutine go out of scope while suspended
    // no need for another co_await of `g`

    co_return 0;
}

join

If multiple awaitables work in parallel they can be awaited simultaneously with join.

cobalt::promise<int> some_work();
cobalt::promise<double> more_work();

cobalt::main co_main(int argc, char * argv[])
{
    std::tuple<int, double> res = cobalt::join(some_work(), more_work());
    co_return 0;
}

race

If multiple awaitables work in parallel, but we want to be notified if either completes, we shall use race.

cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();

cobalt::main co_main(int argc, char * argv[])
{
    auto g1 = some_data_source();
    auto g2 = another_data_source();

    int res1    = co_await g1;
    double res2 = co_await g2;

    printf("Result: %f", res1 * res2);

    while (g1 && g2)
    {
        switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
        {
            case 0:
                res1 = variant2::get<0>(nx);
                break;
            case 1:
                res2 = variant2::get<1>(nx);
                break;
        }
        printf("New result: %f", res1 * res2);
    }

    co_return 0;
}
race in this context will not cause any data loss.

Tutorial

delay

Let’s start with the simplest example possible: a simple delay.

example/delay.cpp
cobalt::main co_main(int argc, char * argv[]) (1)
{
  asio::steady_timer tim{co_await asio::this_coro::executor, (2)
                         std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
  co_await tim.async_wait(cobalt::use_op); (4)
  co_return 0; (5)
}
1 The co_main function defines an implicit main when used and is the easiest way to set up an environment to run asynchronous code.
2 Take the executor from the current coroutine promise.
3 Use an argument to set the timeout
4 Perform the wait by using cobalt::use_op.
5 Return a value that gets returned from the implicit main.

In this example we use the cobalt/main.hpp header, which provides us with a main coroutine if co_main is defined as above. This has a few advantages:

  • The environment get set up correctly (executor & memory)

  • asio is signaled that the context is single threaded

  • an asio::signal_set with SIGINT & SIGTERM is automatically connected to cancellations (i.e. Ctrl+C causes cancellations)

This coroutine then has an executor in its promise (the promise the C++ name for a coroutine state. Not to be confused with cobalt/promise.hpp) which we can obtain through the dummy-awaitables in the this_coro namespace.

We can then construct a timer and initiate the async_wait with use_op. cobalt provides multiple ways to co_await to interact with asio, of which use_op is the easiest.

echo server

We’ll be using the use_op (asio completion) token everywhere, so we’re using a default completion token, so that we can skip the last parameters.

example/echo_server.cpp declarations
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket   = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;

We’re writing the echo function as a promise coroutine. It’s an eager coroutine and recommended as the default; in case a lazy coro is needed, task is available.

example/echo_server.cpp echo function
cobalt::promise<void> echo(tcp_socket socket)
{
  try (1)
  {
    char data[4096];
    while (socket.is_open()) (2)
    {
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
      co_await async_write(socket, boost::asio::buffer(data, n)); (4)
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo: exception: %s\n", e.what());
  }
}
1 When using the use_op completion token, I/O errors are translated into C++ exceptions. Additionally, if the coroutine gets cancelled (e.g. because the user hit Ctrl-C), an exception will be raised, too. Under these conditions, we print the error and exit the loop.
2 We run the loop until we get cancelled (exception) or the user closes the connection.
3 Read as much as is available.
4 Write all the read bytes.

Note that promise is eager. Calling echo will immediately execute code until async_read_some and then return control to the caller.

Next, we also need an acceptor function. Here, we’re using a generator to manage the acceptor state. This is a coroutine that can be co_awaited multiple times, until a co_return expression is reached.

example/echo_server.cpp listen function
cobalt::generator<tcp_socket> listen()
{
  tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
  for (;;) (1)
  {
    tcp_socket sock = co_await acceptor.async_accept(); (2)
    co_yield std::move(sock); (3)
  }
  co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 Cancellation will also lead to an exception here being thrown from the co_await
2 Asynchronously accept the connection
3 Yield it to the awaiting coroutine
4 co_return a value for C++ conformance.

With those two functions we can now write the server:

example/echo_server.cpp run_server function
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
  auto l = listen(); (1)
  while (true)
  {
    if (workers.size() == 10u)
      co_await workers.wait_one();  (2)
    else
      workers.push_back(echo(co_await l)); (3)
  }
}
1 Construct the listener generator coroutine. When the object is destroyed, the coroutine will be cancelled, performing all required cleanup.
2 When we have more than 10 workers, we wait for one to finish
3 Accept a new connection & launch it.

The wait_group is used to manage the running echo functions. This class will cancel & await the running echo coroutines.

We do not need to do the same for the listener, because it will just stop on its own, when l gets destroyed. The destructor of a generator will cancel it.

Since the promise is eager, just calling it is enough to launch. We then put those promises into a wait_group which will allow us to tear down all the workers on scope exit.

example/echo_server.cpp co_main function
cobalt::main co_main(int argc, char ** argv)
{
  co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
  co_return 0u;
}
1 Run run_server with an async scope.

The with function shown above, will run a function with a resource such as wait_group. On scope exit with will invoke & co_await an asynchronous teardown function. This will cause all connections to be properly shutdown before co_main exists.

price ticker

To demonstrate channels and other tools, we need a certain complexity. For that purpose our project is a price ticker, that connects to https://blockchain.info. A user can then connection to localhost to query a given currency pair, like this:

wscat -c localhost:8080/btc/usd

First we do the same declarations as echo-server.

example/ticker.cpp declarations
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type   = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;

The next step is to write a function to connect an ssl-stream, to connect upstream:

example/ticker.cpp connect
cobalt::promise<asio::ssl::stream<socket_type>> connect(
        std::string host, boost::asio::ssl::context & ctx)
{
    asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
    auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)

    asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
    co_await sock.next_layer().async_connect(*ep.begin()); (2)
    co_await sock.async_handshake(asio::ssl::stream_base::client); (3)

    co_return sock; (4)
}
1 Lookup the host
2 Connect to the endpoint
3 Do the ssl handshake
4 Return the socket to the caller

Next, we’ll need a function to do the websocket upgrade on an existing ssl-stream.

example/ticker.cpp connect_to_blockchain_info
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
 ws.set_option(beast::websocket::stream_base::decorator(
     [](beast::websocket::request_type& req)
     {
       req.set(http::field::user_agent,
               std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
       req.set(http::field::origin,
               "https://exchange.blockchain.com"); (1)
     }));

 co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 blockchain.info requires this header to be set.
2 Perform the websocket handshake.

Once the websocket is connected, we want to continuously receive json messages, for which a generator is a good choice.

example/ticker.cpp json_read
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
    beast::flat_buffer buf;
    while (ws.is_open()) (1)
    {
        auto sz = co_await ws.async_read(buf); (2)
        json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
        auto obj = json::parse(data);
        co_yield obj.as_object(); (3)
        buf.consume(sz);
    }
    co_return {};
}
catch (std::exception & e)
{
  std::cerr << "Error reading: " << e.what() << std::endl;
  throw;
}
1 Keep running as long as the socket is open
2 Read a frame from the websocket
3 Parse & co_yield it as an object.

This then needs to be connected to subscriber, for which we’ll utilize channels to pass raw json. To make life-time management easy, the subscriber will hold a shared_ptr, and the producer a weak_ptr.

example/ticker.cpp subscription types
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;

The main function running the blockchain connector, operates on two inputs: data coming from the websocket and a channel to handle new subscriptions.

example/ticker.cpp run blockchain_info
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
    asio::ssl::context ctx{asio::ssl::context_base::tls_client};
    websocket_type ws{co_await connect("blockchain.info", ctx)};
    co_await connect_to_blockchain_info(ws); (1)

    subscription_map subs;
    std::list<std::string> unconfirmed;

    auto rd = json_reader(ws); (2)
    while (ws.is_open()) (3)
    {
      switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
      {
        case 0: (5)
          if (auto ms = get<0>(msg);
              ms.at("event") == "rejected") // invalid sub, cancel however subbed
            co_await handle_rejections(unconfirmed, subs, ms);
          else
            co_await handle_update(unconfirmed, subs, ms, ws);
        break;
        case 1: // (6)
            co_await handle_new_subscription(
                unconfirmed, subs,
                std::move(get<1>(msg)), ws);
        break;
      }
    }

    for (auto & [k ,c] : subs)
    {
        if (auto ptr = c.lock())
            ptr->close();
    }
}
catch(std::exception & e)
{
  std::cerr << "Exception: " << e.what() << std::endl;
  throw;
}
1 Initialize the connection
2 Instantiate the json_reader
3 Run as long as the websocket is open
4 Select, i.e. wait for either a new json message or subscription
5 When it’s a json handle an update or a rejection
6 Handle new subscription messages

The handle_* function’s contents are not as important for the cobalt functionality, so it’s skipped in this tutorial.

The handle_new_subscription function sends a message to the blockchain.info, which will send a confirmation or rejection back. The handle_rejection and handle_update will take the json values and forward them to the subscription channel.

On the consumer side, our server will just forward data to the client. If the client inputs data, we’ll close the websocket immediately. We’re using as_tuple to ignore potential errors.

example/ticker.cpp read and close
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
    system::error_code ec;
    co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
    co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
    st.next_layer().close(ec);
}

Next, we’re running the session that the users sends

example/ticker.cpp run_session
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
                                 cobalt::channel<subscription> & subc)
try
{
    http::request<http::empty_body> req;
    beast::flat_buffer buf;
    co_await http::async_read(st.next_layer(), buf, req); (1)
    // check the target
    auto r = urls::parse_uri_reference(req.target());
    if (r.has_error() || (r->segments().size() != 2u)) (2)
    {
        http::response<http::string_body> res{http::status::bad_request, 11};
        res.body() = r.has_error() ? r.error().message() :
                    "url needs two segments, e.g. /btc/usd";
        co_await http::async_write(st.next_layer(), res);
        st.next_layer().close();
        co_return ;
    }

    co_await st.async_accept(req); (3)

    auto sym = std::string(r->segments().front()) + "-" +
               std::string(r->segments().back());
    boost::algorithm::to_upper(sym);
    // close when data gets sent
    auto p = read_and_close(st, std::move(buf)); (4)

    auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
    co_await subc.write(subscription{sym, ptr}); (6)

    while (ptr->is_open() && st.is_open()) (7)
    {
      auto bb = json::serialize(co_await ptr->read());
      co_await st.async_write(asio::buffer(bb));
    }

    co_await st.async_close(beast::websocket::close_code::going_away,
                            asio::as_tuple(cobalt::use_op)); (8)
    st.next_layer().close();
    co_await p; (9)

}
catch(std::exception & e)
{
    std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 Read the http request, because we want the path
2 Check the path, e.g. /btc/usd.
3 Accept the websocket
4 Start reading & close if the consumer sends something
5 Create the channel to receive updates
6 Send a subscription requests to run_blockchain_info
7 While the channel & websocket are open, we’re forwarding data.
8 Close the socket & ignore the error
9 Since the websocket is surely closed by now, wait for the read_and_close to close.

With run_session and run_blockchain_info written, we can not move on to main:

example/ticker.cpp main
cobalt::main co_main(int argc, char * argv[])
{
    acceptor_type acc{co_await cobalt::this_coro::executor,
                      asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
    std::cout << "Listening on localhost:8080" << std::endl;

    constexpr int limit = 10; // allow 10 ongoing sessions
    cobalt::channel<subscription> sub_manager; (1)

    co_await join( (2)
      run_blockchain_info(sub_manager),
      cobalt::with( (3)
        cobalt::wait_group(
            asio::cancellation_type::all,
            asio::cancellation_type::all),
        [&](cobalt::wait_group & sessions) -> cobalt::promise<void>
        {
          while (!co_await cobalt::this_coro::cancelled) (4)
          {
            if (sessions.size() >= limit) (5)
              co_await sessions.wait_one();

            auto conn = co_await acc.async_accept(); (6)
            sessions.push_back( (7)
                run_session(
                    beast::websocket::stream<socket_type>{std::move(conn)},
                    sub_manager));
          }
        })
      );

    co_return 0;
}
1 Create the channel to manage subscriptions
2 Use join to run both tasks in parallel.
3 Use an cobalt scope to provide a wait_group.
4 Run until cancelled.
5 When we’ve reached the limit we wait for one task to complete.
6 Wait for a new connection.
7 Insert the session into the wait_group.

Main is using join because one task failing should cancel the other one.

delay op

We’ve used the use_op so far, to use an implicit operation based on asio’s completion token mechanic.

We can however implement our own ops, that can also utilize the await_ready optimization. Unlike immediate completion, the coroutine will never suspend when await_ready returns true.

To leverage this coroutine feature, cobalt provides an easy way to create a skipable operation:

example/delay_op.cpp
struct wait_op final : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;
  wait_op(asio::steady_timer & tim) : tim(tim) {}
  void ready(cobalt::handler<system::error_code> h ) override (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
      h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
  {
    tim.async_wait(std::move(complete));
  }
};


cobalt::main co_main(int argc, char * argv[])
{
  asio::steady_timer tim{co_await asio::this_coro::executor,
                         std::chrono::milliseconds(std::stoi(argv[1]))};
  co_await wait_op(tim); (4)
  co_return 0; //
}
1 Declare the op. We inherit op to make it awaitable.
2 The pre-suspend check is implemented here
3 Do the wait if we need to
4 Use the op just like any other awaitable.

This way we can minimize the amounts of coroutine suspensions.

While the above is used with asio, you can also use these handlers with any other callback based code.

Generator with push value

Coroutines with push values are not as common, but can simplify certain issues significantly.

Since we’ve already got a json_reader in the previous example, here’s how we can write a json_writer that gets values pushed in.

The advantage of using a generator is the internal state management.

cobalt::generator<system::error_code, json::object>
    json_writer(websocket_type & ws)
try
{
    char buffer[4096];
    json::serializer ser;

    while (ws.is_open()) (1)
    {
        auto val = co_yield system::error_code{}; (2)

        while (!ser.done())
        {
            auto sv = ser.read(buffer);
            co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
        }

    }
    co_return {};
}
catch (system::system_error& e)
{
    co_return e.code();
}
catch (std::exception & e)
{
    std::cerr << "Error reading: " << e.what() << std::endl;
    throw;
}
1 Keep running as long as the socket is open
2 co_yield the current error and retrieve a new value.
3 Write a frame to the websocket

Now we can use the generator like this:

auto g = json_writer(my_ws);

extern std::vector<json::value> to_write;

for (auto && tw : std::move(to_write))
{
    if (auto ec = co_await g(std::move(tw)))
        return ec; // yield error
}

Advanced examples

More examples are provided in the repository as code only. All examples are listed below.

Table 5. All examples

example/http.cpp

An http client that performs a single http get request.

example/outcome.cpp

Using the boost.outcome coroutine types.

example/python.cpp & example/python.py

Using nanobind to integrate cobalt with python. It uses python’s asyncio as executor and allows C++ to co_await python functions et vice versa.

example/signals.cpp

Adopting boost.signals2 into an awaitable type (single threaded).

example/spsc.cpp

Creating a boost.lockfree based & awaitable spsc_queue (multi threaded).

example/thread.cpp

Using worker threads with asio’s `concurrent_channel.

example/thread_pool.cpp

Using an asio::thread_pool and spawning tasks onto them.

example/delay.cpp

The example used by the delay section

example/delay_op.cpp

The example used by the delay op section

example/echo_server.cpp

The example used by the echo server section

example/ticker.cpp

The example used by the price ticker section

example/channel.cpp

The example used by the channel reference

Design

Concepts

This library has two fundamental concepts:

An awaitable is an expression that can be used with co_await from within a coroutine, e.g.:

co_await delay(50ms);

However, a coroutine promise can define an await_transform, i.e. what is actually valid to use with co_await expression depends on the coroutine.

Thus, we should redefine what an awaitable is: An awaitable is a type that can be co_await-ed from within a coroutine, which promise does not define await_transform.

A pseudo-keyword is a type that can be used in a coroutines that is adds special functionality for it due to its promise await_transform.

All the verbs in the this_coro namespace are such pseudo-keywords.

auto exec = co_await this_coro::executor;
This library exposes a set of enable_* base classes for promises, to make the creation of custom coroutines easy. This includes the enable_awaitables, which provides an await_transform that just forward awaitables.

A coroutine in the context of this documentation refers to an asynchronous coroutine, i.e. synchronous coroutines like std::generator are not considered.

All coroutines except main are also actual awaitables.

Executors

Since everything is asynchronous the library needs to use an event-loop. Because everything is single-threaded, it can be assumed that there is exactly one executor per thread, which will suffice for 97% of use-cases. Therefore, there is a thread_local executor that gets used as default by the coroutine objects (although stored by copy in the coroutine promise).

Likewise, there is one executor type used by the library, which defaults to asio::any_io_executor.

If you write your own coroutine, it should hold a copy of the executor, and have a get_executor function returning it by const reference.

Using Strands

While strands can be used, they are not compatible with the thread_local executor. This is because they might switch threads, thus they can’t be thread_local.

If you wish to use strands (e.g. through a spawn) the executor for any promise, generator or channel must be assigned manually.

In the case of a channel this is a constructor argument, but for the other coroutine types, asio::executor_arg needs to be used. This is done by having asio::executor_arg_t (somewhere) in the argument list directly followed by the executor to be used in the argument list of the coroutine, e.g.:

cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);

This way the coroutine-promise can pick up the executor from the third argument, instead of defaulting to the thread_local one.

The arguments can of course be defaulted, to make them less inconvenient, if they are sometimes with a thread_local executor.

cobalt::promise<void> example_with_executor(int some_arg,
                                           asio::executor_arg_t = asio::executor_arg,
                                           cobalt::executor = cobalt::this_thread::get_executor());

If this gets omitted on a strand an exception of type asio::bad_allocator is thrown, or - worse - the wrong executor is used.

polymorphic memory resource

Similarly, the library uses a thread_local pmr::memory_resource to allocate coroutine frames & to use as allocator on asynchronous operations.

The reason is, that users may want to customize allocations, e.g. to avoid locks, limit memory usage or monitor usage. pmr allows us to achieve this without introducing unnecessary template parameters, i.e. no promise<T, Allocator> complexity. Using pmr however does introduce some minimal overheads, so a user has the option to disable by defining BOOST_COBALT_NO_PMR.

op uses an internal resource optimized for asio’s allocator usages and gather, race and join use a monotonic resource to miminize allocations. Both still work with BOOST_COBALT_NO_PMR defined, in which case they’ll use new/delete as upstream allocations.

main and thread single pmr::unsynchronized_pool_resource per thread with PMR enabled.

If you write your own coroutine, it should have a get_allocator function returning a pmr::polymorphic_allocator<void>.

cancellation

cobalt uses implicit cancellation based on asio::cancellation_signal. This is mostly used implicitly (e.g. with race), so that there is very little explicit use in the examples.

If you write custom coroutine it must return a cancellation_slot from a get_cancellation_slot function in order to be able to cancel other operations.
If you write a custom awaitable, it can use that function in await_suspend to receive cancellation signals.

Promise

The main coroutine type is a promise, which is eager. The reason to default to this, is that the compiler can optimize out promises that do not suspend, like this:

cobalt::promise<void> noop()
{
  co_return;
}

Awaiting the above operation is in theory a noop, but practically speaking, compilers aren’t there as of 2023.

Select

The most important synchronization mechanism is the race function.

It awaits multiple awaitables in a pseudo-random order and will return the result of the first one completion, before disregarding the rest.

That is, it initiates the co_await in a pseudo-random order and stops once one awaitable is found to be ready or completed immediately.

cobalt::generator<int> gen1();
cobalt::generator<double> gen2();

cobalt::promise<void> p()
{
  auto g1 = gen1();
  auto g2 = gen2();
  while (!co_await cobalt::this_coro::cancelled)
  {
    switch(auto v = co_await race(g1, g2); v.index())
    {
    case 0:
      printf("Got int %d\n", get<0>(v));
      break;
    case 1:
      printf("Got double %f\n", get<1>(v));
      break;
    }
  }
}

The race must however internally wait for all awaitable to complete once it initiates to co_await. Therefor, once the first awaitable completes, it tries to interrupt the rest, and if that fails cancels them.

race is the preferred way to trigger cancellations, e.g:

cobalt::promise<void> timeout();
cobalt::promise<void> work();

race(timeout(), work());

interrupt_await

If it naively cancelled it would however lose data. Thus, the concept of interrupt_await is introduced, which tells the awaitable (that supports it) to immediately resume the awaiter and return or throw an ignored value.

Example of an interruptible awaitable
struct awaitable
{
   bool await_ready() const;

   template<typename Promise>
   std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);

   T await_resume();

   void interrupt_await() &;
};

If the interrupt_await doesn’t result in immediate resumption (of h), race will send a cancel signal.

race applies these with the correct reference qualification:

auto g = gen1();
race(g, gen2());

The above will call a interrupt_await() & function for g1 and interrupt_await() && for g2 if available.

Generally speaking, the coroutines in cobalt support lvalue interruption, i.e. interrupt_await() &. channel operations are unqualified, i.e. work in both cases.

join and gather will forward interruptions, i.e. this will only interrupt g1 and g2 if gen2() completes first:

Associators

cobalt uses the associator concept of asio, but simplifies it. That is, it has three associators that are member functions of an awaiting promise.

  • const executor_type & get_executor() (always executor, must return by const ref)

  • allocator_type get_allocator() (always pmr::polymorphic_allocator<void>)

  • cancellation_slot_type get_cancellation_slot() (must have the same IF as asio::cancellation_slot)

cobalt uses concepts to check if those are present in its await_suspend functions.

That way custom coroutines can support cancellation, executors and allocators.

In a custom awaitable you can obtain them like this:

struct my_awaitable
{
    bool await_ready();
    template<typename T>
    void await_suspend(std::corutine_handle<P> h)
    {
        if constexpr (requires  (Promise p) {p.get_executor();})
            handle_executor(h.promise().get_executor();

        if constexpr (requires (Promise p) {p.get_cancellation_slot();})
            if ((cl = h.promise().get_cancellation_slot()).is_connected())
                cl.emplace<my_cancellation>();
    }

    void await_resume();
};

Cancellation gets connected in a co_await expression (if supported by the coroutine & awaitable), including synchronization mechanism like race.

Threading

This library is single-threaded by design, because this simplifies resumption and thus more performant handling of synchronizations like race. race would need to lock every raceed awaitable to avoid data loss which would need to be blocking and get worse with every additional element.

you can’t have any coroutines be resumed on a different thread than created on, except for a task (e.g. using spawn).

The main technical reason is that the most efficient way of switching coroutines is by returning the handle of the new coroutine from await_suspend like this:

struct my_awaitable
{
    bool await_ready();
    std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
    void await_resume();
};

In this case, the awaiting coroutine will be suspended before await_suspend is called, and the coroutine returned is resumed. This of course doesn’t work if we need to go through an executor.

This doesn’t only apply to awaited coroutines, but channels, too. The channels in this library use an intrusive list of awaitables and may return the handle of reading (and thus suspended) coroutine from a write_operation’s await_suspend.

Reference

cobalt/main.hpp

The easiest way to get started with an cobalt application is to use the co_main function with the following signature:

cobalt::main co_main(int argc, char *argv[]);

Declaring co_main will add a main function that performs all the necessary steps to run a coroutine on an event loop. This allows us to write very simple asynchronous programs.

cobalt::main co_main(int argc, char *argv[])
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 get the executor main running on
2 Use it with an asio object
3 co_await an cobalt operation

The main promise will create an asio::signal_set and uses it for cancellation. SIGINT becomes total , while SIGTERM becomes terminal cancellation.

The cancellation will not be forwarded to detached coroutines. The user will need to take care to end then on cancellation, since the program otherwise doesn’t allow graceful termination.

Executor

It will also create an asio::io_context to run on, which you can get through the this_coro::executor. It will be assigned to the cobalt::this_thread::get_executor() .

Memory Resource

It also creates a memory resource that will be used as a default for internal memory allocations. It will be assigned to the thread_local to the cobalt::this_thread::get_default_resource().

Promise

Every coroutine has an internal state, called promise (not to be confused with the cobalt::promise). Depending on the coroutine properties different things can be co_await-ed, like we used in the example above.

They are implemented through inheritance, and shared among different promise types

The main promise has the following properties.

Specification

  1. declaring co_main will implicitly declare a main function

  2. main is only present when co_main is defined.

  3. SIGINT and SIGTERM will cause cancellation of the internal task.

cobalt/promise.hpp

A promise is an eager coroutine that can co_await and co_return values. That is, it cannot use co_yield.

cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

Promises are by default attached. This means, that a cancellation is sent when the promise handles goes out of scope.

A promise can be detached by calling detach or by using the prefix + operator. This is a runtime alternative to using detached. A detached promise will not send a cancellation on destruction.

cobalt::promise<void> my_task();

cobalt::main co_main(int argc, char *argv[])
{
  +my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 By using + the task gets detached. Without it, the compiler would generate a nodiscard warning.

Executor

The executor is taken from the thread_local get_executor function, unless a asio::executor_arg is used in any position followed by the executor argument.

cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

Memory Resource

The memory resource is taken from the thread_local get_default_resource function, unless a std::allocator_arg is used in any position followed by a polymorphic_allocator argument.

cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

Outline

template<typename Return>
struct [[nodiscard]] promise
{
    promise(promise &&lhs) noexcept;
    promise& operator=(promise && lhs) noexcept;

    // enable `co_await`. (1)
    auto operator co_await ();

    // Ignore the return value, i.e. detach it. (2)
    void operator +() &&;

    // Cancel the promise.
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

    // Check if the result is ready
    bool ready() const;
    // Check if the promise can be awaited.
    explicit operator bool () const; (3)

    // Detach or attach
    bool attached() const;
    void detach();
    void attach();
    // Get the return value. If !ready() this function has undefined behaviour.
    Return get();
};
1 Supports Interrupt Wait
2 This allows to create promise running in parallel with a simple +my_task() expression.
3 This allows code like while (p) co_await p;

Promise

The coroutine promise (promise::promise_type) has the following properties.

cobalt/generator.hpp

A generator is an eager coroutine that can co_await and co_yield values to the caller.

cobalt::generator<int> example()
{
  printf("In coro 1\n");
  co_yield 2;
  printf("In coro 3\n");
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n");
  printf("In main %d\n", co_await f);
  printf("In main %d\n", co_await f);
  return 0;
}

Which will generate the following output

In main 0
In coro 1
In main 1
In main 2
In coro 3
In main 4
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    participant main;
    Note left of main: "In main 0"
    main->>+example: example()
    Note right of example: "In coro 1"
    example-->>main: co_yield 2
    Note left of main: "In main 2"
    main-->>+example: co_await f
    Note right of example: "In coro 3"
    example->>main: co_return 3
    Note left of main: "In main 4"

Values can be pushed into the generator, when Push (the second template parameter) is set to non-void:

cobalt::generator<int, int> example()
{
  printf("In coro 1\n");
  int i =  co_yield 2;
  printf("In coro %d\n", i);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main %d\n", co_await f(3)); (1)
  co_return 0;
}
1 The pushed value gets passed through operator() to the result of co_yield.

Which will generate the following output

In main 0
In coro 1
In main 2
In coro 3

Lazy

A generator can be turned lazy by awaiting initial. This co_await expression will produce the Push value. This means the generator will wait until it’s awaited for the first time, and then process the newly pushed value and resume at the next co_yield.

cobalt::generator<int, int> example()
{
  int v = co_await cobalt::this_coro::initial;
  printf("In coro %d\n", v);
  co_yield 2;
  printf("In coro %d\n", v);
  co_return 4;
}

cobalt::main co_main(int argc, char * argv[])
{
  printf("In main 0\n");
  auto f = example(); // call and let it run until the first co_yield
  printf("In main 1\n"); // < this is now before the co_await initial
  printf("In main %d\n", co_await f(1));
  printf("In main %d\n", co_await f(3));
  return 0;
}

Which will generate the following output

In main 0
In main 1
In coro 1
In main 2
In coro 3
In main 4
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    participant main;
    Note left of main: "In main 0"
    main->>+example: example()
    Note right of example: "In coro 1"
    example-->>main: co_yield 2
    Note left of main: "In main 2"
    main-->>+example: co_await f
    Note right of example: "In coro 3"
    example->>main: co_return 3
    Note left of main: "In main 4"

Executor

The executor is taken from the thread_local get_executor function, unless a asio::executor_arg is used in any position followed by the executor argument.

cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

Memory Resource

The memory resource is taken from the thread_local get_default_resource function, unless a std::allocator_arg is used in any position followed by a polymorphic_allocator argument.

cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

Outline

template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
  // Movable

  generator(generator &&lhs) noexcept = default;
  generator& operator=(generator &&) noexcept = default;

  // True until it co_returns & is co_awaited after (1)
  explicit operator bool() const;

  // Cancel the generator. (3)
  void cancel(asio::cancellation_type ct = asio::cancellation_type::all);

  // Check if a value is available
  bool ready() const;

  // Get the returned value. If !ready() this function has undefined behaviour.
  Yield get();

  // Cancel & detach the generator.
  ~generator();

  // an awaitable that results in value of Yield.
  using generator_awaitable = unspecified;

  // Present when Push != void
  generator_awaitable operator()(      Push && push);
  generator_awaitable operator()(const Push &  push);

  // Present when Push == void, i.e. can co_await the generator directly.
  generator_awaitable operator co_await (); (2)

};
1 This allows code like while (gen) co_await gen:
2 Supports Interrupt Wait
3 A cancelled generator maybe be resumable

cobalt/task.hpp

A task is a lazy coroutine that can co_await and co_return values. That is, it cannot use co_yield.

cobalt::task<void> delay(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
}

cobalt::main co_main(int argc, char *argv[])
{
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}

Unlike a promise, a task can be awaited or spawned on another executor than it was created on.

Executor

Since a task it lazy, it does not need to have an executor on construction. It rather attempts to take it from the caller or awaiter if present. Otherwise, it’ll default to the thread_local executor.

Memory Resource

The memory resource is NOT taken from the thread_local get_default_resource function, but pmr::get_default_resource(), unless a `std::allocator_arg is used in any position followed by a polymorphic_allocator argument.

cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

Outline

template<typename Return>
struct [[nodiscard]] task
{
    task(task &&lhs) noexcept = default;
    task& operator=(task &&) noexcept = default;

    // enable `co_await`
    auto operator co_await ();

};
Tasks can be used synchronously from a sync function by calling run(my_task()).

use_task

The use_task completion token can be used to create a task from an cobalt_ function. This is less efficient than use_op as it needs to allocate a coroutine frame, but has a simpler return type and supports Interrupt Wait.

cobalt/detached.hpp

A detached is an eager coroutine that can co_await but not co_return values. That is, it cannot be resumed and is usually not awaited.

cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
  asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
  co_await tim.async_wait(cobalt::use_op);
  printf("Hello world\n");
}

cobalt::main co_main(int argc, char *argv[])
{
  delayed_print();
  co_return 0;
}

Detached is used to run coroutines in the background easily.

cobalt::detached my_task();

cobalt::main co_main(int argc, char *argv[])
{
  my_task(); (1)
  co_await delay(std::chrono::milliseconds(50));
  co_return 0;
}
1 Spawn off the detached coro.

A detached can assign itself a new cancellation source like this:

cobalt::detached my_task(asio::cancellation_slot sl)
{
   co_await this_coro::reset_cancellation_source(sl);
   // do somework
}

cobalt::main co_main(int argc, char *argv[])
{
  asio::cancellation_signal sig;
  my_task(sig.slot()); (1)
  co_await delay(std::chrono::milliseconds(50));
  sig.emit(asio::cancellation_type::all);
  co_return 0;
}

Executor

The executor is taken from the thread_local get_executor function, unless a asio::executor_arg is used in any position followed by the executor argument.

cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);

Memory Resource

The memory resource is taken from the thread_local get_default_resource function, unless a std::allocator_arg is used in any position followed by a polymorphic_allocator argument.

cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);

Outline

struct detached {};
1 Supports Interrupt Wait

cobalt/op.hpp

An operation in cobalt is an awaitable wrapping an asio operation.

use_op

The use_op token is the direct to create an op, i.e. using cobalt::use_op as the completion token will create the required awaitable.

auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();

Depending on the completion signature the co_await expression may throw.

Signature Return type Exception

void()

void

noexcept

void(T)

T

noexcept

void(T…​)

std::tuple<T…​>

noexcept

void(system::error_code, T)

T

system::system_error

void(system::error_code, T…​)

std::tuple<T…​>

system::system_error

void(std::exception_ptr, T)

T

any exception

void(std::exception_ptr, T…​)

std::tuple<T…​>

any exception

use_op will never complete immediately, i.e. await_ready will always return false, but always suspend the coroutine.

Hand coded Operations

Operations are a more advanced implementation of the cobalt/op.hpp feature.

This library makes it easy to create asynchronous operations with an early completion condition, i.e. a condition that avoids suspension of coroutines altogether.

We can for example create a wait_op that does nothing if the timer is already expired.

struct wait_op : cobalt::op<system::error_code> (1)
{
  asio::steady_timer & tim;

  wait_op(asio::steady_timer & tim) : tim(tim) {}

  bool ready(cobalt::handler<system::error_code> ) (2)
  {
    if (tim.expiry() < std::chrono::steady_clock::now())
        h(system::error_code{});
  }
  void initiate(cobalt::completion_handler<system::error_code> complete) (3)
  {
    tim.async_wait(std::move(complete));
  }
};
1 Inherit op with the matching signature await_transform picks it up
2 Check if the operation is ready - called from await_ready
3 Initiate the operation if its not ready.

cobalt/concepts.hpp

Awaitable

An awaitable is an expression that can be used with co_await.

template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
    {aw.await_ready()} -> std::convertible_to<bool>;
    {aw.await_suspend(h)};
    {aw.await_resume()};
};

template<typename Awaitable, typename Promise = void>
concept awaitable =
        awaitable_type<Awaitable, Promise>
    || requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
    || requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
awaitables in this library require that the coroutine promise return their executor by const reference if they provide one. Otherwise it’ll use this_thread::get_executor().

Enable awaitables

Inheriting enable_awaitables will enable a coroutine to co_await anything through await_transform that would be co_await-able in the absence of any await_transform.

cobalt/this_coro.hpp

The this_coro namespace provides utilities to access the internal state of a coroutine promise.

Pseudo-awaitables:

// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;

// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;

// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
    Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
    InFilter && in_filter,
    OutFilter && out_filter);

// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);

// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);


// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;

// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;

// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;

Await Allocator

The allocator of a coroutine supporting enable_await_allocator can be obtained the following way:

co_await cobalt::this_coro::allocator;

In order to enable this for your own coroutine you can inherit enable_await_allocator with the CRTP pattern:

struct my_promise : cobalt::enable_await_allocator<my_promise>
{
  using allocator_type = __your_allocator_type__;
  allocator_type get_allocator();
};
If available the allocator gets used by use_op

Await Executor

The allocator of a coroutine supporting enable_await_executor can be obtained the following way:

co_await cobalt::this_coro::executor;

In order to enable this for your own coroutine you can inherit enable_await_executor with the CRTP pattern:

struct my_promise : cobalt::enable_await_executor<my_promise>
{
  using executor_type = __your_executor_type__;
  executor_type get_executor();
};
If available the executor gets used by use_op

Memory resource base

The promise_memory_resource_base base of a promise will provide a get_allocator in the promise taken from either the default resource or one passed following a std::allocator_arg argument. Likewise, it will add operator new overloads so the coroutine uses the same memory resource for its frame allocation.

Throw if cancelled

The promise_throw_if_cancelled_base provides the basic options to allow operation to enable a coroutines to turn throw an exception when another actual awaitable is awaited.

co_await cobalt::this_coro::throw_if_cancelled;

Cancellation state

The promise_cancellation_base provides the basic options to allow operation to enable a coroutines to have a cancellation_state that is resettable by reset_cancellation_state

co_await cobalt::this_coro::reset_cancellation_state();

For convenience there is also a short-cut to check the current cancellation status:

asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above

cobalt/this_thread.hpp

Since everything is single threaded this library provides an executor & default memory-resource for every thread.

namespace boost::cobalt::this_thread
{

pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)

typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)

}
1 Get the default resource - will be pmr::get_default_resource unless set
2 Set the default resource - returns the previously set one
3 Get an allocator wrapping (1)
4 Get the executor of the thread - throws if not set
5 Set the executor of the current thread.

The coroutines will use these as defaults, but keep a copy just in case.

The only exception is the initialization of an cobalt-operation, which will use the this_thread::executor to rethrow from.

cobalt/channel.hpp

Channels can be used to exchange data between different coroutines on a single thread.

Outline

channel outline
template<typename T>
struct channel
{
  // create a channel with a buffer limit, executor & resource.
  explicit
  channel(std::size_t limit = 0u,
          executor executor = this_thread::get_executor(),
          pmr::memory_resource * resource = this_thread::get_default_resource());
  // not movable.
  channel(channel && rhs) noexcept = delete;
  channel & operator=(channel && lhs) noexcept = delete;

  using executor_type = executor;
  const executor_type & get_executor();

  // Closes the channel
  ~channel();
  bool is_open() const;
  // close the operation, will cancel all pending ops, too
  void close();

  // an awaitable that yields T
  using read_op = unspecified;

  // an awaitable that yields void
  using write_op = unspecified;

  // read a value to a channel
  read_op  read();

  // write a value to the channel
  write_op write(const T  && value);
  write_op write(const T  &  value);
  write_op write(      T &&  value);
  write_op write(      T  &  value);

  // write a value to the channel if T is void

};

Description

Channels are a tool for two coroutines to communicate and synchronize.

const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};

// in coroutine (1)
co_await ch.write(42);

// in coroutine (2)
auto val = co_await ch.read();
1 Send a value to the channel - will block until it can be sent
2 Read a value from the channel - will block until a value is awaitable.

Both operations maybe be blocking depending on the channel buffer size.

If the buffer size is zero, a read & write will need to occur at the same time, i.e. act as a rendezvous.

If the buffer is not full, the write operation will not suspend the coroutine; likewise if the buffer is not empty, the read operation will not suspend.

If two operations complete at once (as is always the case with an empty buffer), the second operation gets posted to the executor for later completion.

A channel type can be void, in which case write takes no parameter.

The channel operations can be cancelled without losing data. This makes them usable with race.

generator<variant2::variant<int, double>> merge(
    channel<int> & c1,
    channel<double> & c2)
{
    while (c1 && c2)
       co_yield co_await race(c1, c2);
}

Example

cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
  for (int i = 0; i < 4; i++)
    co_await chan.write(i);

  chan.close();
}

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  while (c.is_open())
    std::cout << co_await c.read() << std::endl;

  co_await p;
  co_return 0;
}

Additionally, a channel_reader is provided to make reading channels more convenient & usable with BOOST_COBALT_FOR.

cobalt::main co_main(int argc, char * argv[])
{
  cobalt::channel<int> c;

  auto p = producer(c);
  BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
    std::cout << value << std::endl;

  co_await p;
  co_return 0;
}

cobalt/with.hpp

The with facility provides a way to perform asynchronous tear-down of coroutines. That is it like an asynchronous destructor call.

struct my_resource
{
  cobalt::promise<void> await_exit(std::exception_ptr e);
};

cobalt::promise<void> work(my_resource & res);

cobalt::promise<void> outer()
{
  co_await cobalt::with(my_resource(), &work);
}

The teardown can either be done by providing an await_exit member function or a tag_invoke function that returns an awaitable or by providing the teardown as the third argument to with.

using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void>   disconnect(ws_stream &ws); (2)

auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
  return disconnect(ws);
}

cobalt::promise<void> run_session(ws_stream & ws);

cobalt::main co_main(int argc, char * argv[])
{
  co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
  co_return 0;
}
1 Implement websocket connect & websocket initiation
2 Implement an orderly shutdown.
The std::exception_ptr is null if the scope is exited without exception. NOTE: It’s legal for the exit functions to take the exception_ptr by reference and modify it.

cobalt/race.hpp

The race function can be used to co_await one awaitable out of a set of them.

It can be called as a variadic function with multiple awaitable or as on a range of awaitables.

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_wait()
{
  co_await cobalt::race(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::race(aws); (2)
}
1 Wait for a variadic set of awaitables
2 wait for a vector of awaitables

The first parameter so race can be a uniform random bit generator.

Signatures of race
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;

std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);

// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);

// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);

Interrupt Wait

When arguments are passed as rvalue reference, the race will attempt to use .interrupt_await on the awaitable to signal the awaitable to complete now and that the result will be ignored. If supported, the Awaitable must resume the awaiting coroutine before interrupt_await returns. If the race doesn’t detect the function, it will send a cancellation.

This means that you can reuse race like this:

cobalt::promise<void> do_wait()
{
  auto t1 = task1();
  auto t2 = task2();
  co_await cobalt::race(t1, t2); (1)
  co_await cobalt::race(t1, t2); (2)
}
1 Wait for the first task to complete
2 Wait for the other task to complete

This is supported by promise, generator and gather.

The race will invoke the functions of the awaitable as if used in a co_await expression or not evaluate them at all.

left_race

The left_race functions are like race but follow a strict left-to-right scan. This can lead to starvation issues, which is why this is not the recommended default, but can be useful for prioritization if proper care is taken.

Outline

// Concept for the random number generator.
template<typename G>
  concept uniform_random_bit_generator =
    requires ( G & g)
    {
      {typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
      // T	Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
      {std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
      // T	Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
      {std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
      {g()} -> std::same_as<typename std::decay_t<G>::result_type>;
    } && (std::decay_t<G>::max() > std::decay_t<G>::min());


// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);

// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
         uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);

// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);

// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);

// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);

// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
Selecting an empty range will cause an exception to be thrown.

cobalt/gather.hpp

The gather function can be used to co_await multiple awaitables at once with cancellations being passed through.

The function will gather all completion and return them as system::result, i.e. capture conceptions as values. One awaitable throwing an exception will not cancel the others.

It can be called as a variadic function with multiple Awaitable or as on a range of awaitables.

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_gather()
{
  co_await cobalt::gather(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::gather(aws); (2)
}
1 Wait for a variadic set of awaitables
2 Wait for a vector of awaitables

The gather will invoke the functions of the awaitable as if used in a co_await expression.

Signatures of gather
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);

std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 =  co_await gather(pvv);

extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
           system::result<monostate>,
           system::result<int>,
           system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);

Outline

// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);

// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);

cobalt/join.hpp

The join function can be used to co_await multiple awaitable at once with properly connected cancellations.

The function will gather all completion and return them as values, unless an exception is thrown. If an exception is thrown, all outstanding ops are cancelled (or interrupted if possible) and the first exception gets rethrown.

void will be returned as variant2::monostate in the tuple, unless all awaitables yield void.

It can be called as a variadic function with multiple Awaitable or as on a range of awaitables.

cobalt::promise<void> task1();
cobalt::promise<void> task2();

cobalt::promise<void> do_join()
{
  co_await cobalt::join(task1(), task2()); (1)
  std::vector<cobalt::promise<void>> aws {task1(), task2()};
  co_await cobalt::join(aws); (2)
}
1 Wait for a variadic set of awaitables
2 Wait for a vector of awaitables

The join will invoke the functions of the awaitable as if used in a co_await expression.

Signatures of join
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);

std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);

extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);

std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);

Outline

// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);

// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
Selecting an on empty range will cause an exception.

cobalt/wait_group.hpp

The wait_group function can be used to manage multiple coroutines of type promise<void>. It works out of the box with cobalt/with.hpp, by having the matching await_exit member.

Essentially, a wait_group is a dynamic list of promises that has a race function (wait_one), a gather function (wait_all) and will clean up on scope exit.

struct wait_group
{
    // create a wait_group
    explicit
    wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
               asio::cancellation_type exception_cancel = asio::cancellation_type::all);

    // insert a task into the group
    void push_back(promise<void> p);

    // the number of tasks in the group
    std::size_t size() const;
    // remove completed tasks without waiting (i.e. zombie tasks)
    std::size_t reap();
    // cancel all tasks
    void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
    // wait for one task to complete.
    wait_one_op wait_one();
    // wait for all tasks to complete
    wait_op wait();
    // wait for all tasks to complete
    wait_op operator co_await ();
    // when used with with , this will receive the exception
    // and wait for the completion
    // if ep is set, this will use the exception_cancel level,
    // otherwise the normal_cancel to cancel all promises.
    wait_op await_exit(std::exception_ptr ep);
};

cobalt/spawn.hpp

The spawn functions allow to run task on an asio executor/execution_context and consume the result with a completion token.

auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);

Spawn will dispatch its initiation and post the completion. That makes it safe to use task to run the task on another executor and consume the result on the current one with use_op. That is, spawn can be used to cross threads.

Example

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
  auto f = spawn(ctx, work(), asio::use_future);
  ctx.run();

  return f.get();
}
The caller needs to make sure that the executor is not running on multiple threads concurrently, e,g, by using a single-threaded asio::io_context or a strand.

cobalt/run.hpp

The run function is similar to spawn but running synchronously. It will internally setup an execution context and the memory resources.

This can be useful when integrating a piece of cobalt code into a synchronous application.

Outline

// Run the task and return it's value or rethrow any exception.
T run(task<T> t);

Example

cobalt::task<int> work();

int main(int argc, char *argv[])
{
  return run(work());
}

cobalt/thread.hpp

The thread type is another way to create an environment that is similar to main, but doesn’t use a signal_set.

cobalt::thread my_thread()
{
  auto exec = co_await cobalt::this_coro::executor;             (1)
  asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
  co_await tim.async_wait(cobalt::use_op);                      (3)
  co_return 0;
}
1 get the executor thread running on
2 Use it with an asio object
3 co_await an cobalt operation

To use a thread you can use it like a std::thread:

int main(int argc, char * argv[])
{
  auto thr = my_thread();
  thr.join();
  return 0;
}

A thread is also an awaitable (including cancellation).

cobalt::main co_main(int argc, char * argv[])
{
  auto thr = my_thread();
  co_await thr;
  co_return 0;
}
Destructing a detached thread will cause a hard stop (io_context::stop) and join the thread.
Nothing in this library, except for awaiting a cobalt/thread.hpp and cobalt/spawn.hpp, is thread-safe. If you need to transfer data across threads, you’ll need a thread-safe utility like asio::concurrent_channel. You cannot share any cobalt primitives between threads, with the sole exception of being able to spawn a task onto another thread’s executor.

Executor

It will also create an asio::io_context to run on, which you can get through the this_coro::executor. It will be assigned to the cobalt::this_thread::get_executor() .

Memory Resource

It also creates a memory resource that will be used as a default for internal memory allocations. It will be assigned to the thread_local to the cobalt::this_thread::get_default_resource().

Outline

struct thread
{
  // Send a cancellation signal
  void cancel(asio::cancellation_type type = asio::cancellation_type::all);


  // Allow the thread to be awaited. NOOP if the thread is invalid.
  auto operator co_await() &-> detail::thread_awaitable; (1)
  auto operator co_await() && -> detail::thread_awaitable; (2)

  // Stops the io_context & joins the executor
  ~thread();
  /// Move constructible
  thread(thread &&) noexcept = default;

  using executor_type = executor;

  using id = std::thread::id;
  id get_id() const noexcept;

  // Add the functions similar to `std::thread`
  void join();
  bool joinable() const;
  void detach();

  executor_type get_executor() const;
};
1 Supports Interrupt Wait
2 Always forward cancel

Promise

The thread promise has the following properties.

cobalt/result.hpp

Awaitables can be modified to return system::result or std::tuple instead of using exceptions.

// value only
T res = co_await foo();

// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());

// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());

Awaitables can also provide custom ways to handle results and tuples, by providing await_resume overloads using cobalt::as_result_tag and cobalt::as_tuple_tag.:

your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type  await_resume(cobalt::as_tuple_tag);

This allows an awaitable to provide other error types than std::exception_ptr, for example system::error_code. This is done by op and channel.

// example of an op with result system::error_code, std::size_t
system::result<std::size_t>                 await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitables are still allowed to throw exceptions, e.g. for critical exceptions such as OOM.

cobalt/async_for.hpp

For types like generators a BOOST_COBALT_FOR macro is provided, to emulate an for co_await loop.

cobalt::generator<int> gen();

cobalt::main co_main(int argc, char * argv[])
{
    BOOST_COBALT_FOR(auto i, gen())
        printf("Generated value %d\n", i);

    co_return 0;
}

The requirement is that the awaitable used in the for loop has an operator bool to check if it can be awaited again. This is the case for generator and promise.

cobalt/error.hpp

In order to make errors easier to manage, cobalt provides an error_category to be used with boost::system::error_code.

enum class error
{
  moved_from,
  detached,
  completed_unexpected,
  wait_not_ready,
  already_awaited,
  allocation_failed
};

system::error_category & cobalt_category();
system::error_code make_error_code(error e);

cobalt/config.hpp

The config adder allows to config some implementation details of boost.cobalt.

executor_type

The executor type defaults to boost::asio::any_io_executor.

You can set it to boost::asio::any_io_executor by defining BOOST_COBALT_CUSTOM_EXECUTOR and adding a boost::cobalt::executor type yourself.

Alternatively, BOOST_COBALT_USE_IO_CONTEXT can be defined to set the executor to boost::asio::io_context::executor_type.

pmr

Boost.cobalt can be used with different pmr implementations, defaulting to std::pmr.

The following macros can be used to configure it:

  • BOOST_COBALT_USE_STD_PMR

  • BOOST_COBALT_USE_BOOST_CONTAINER_PMR

  • BOOST_COBALT_USE_CUSTOM_PMR

If you define BOOST_COBALT_USE_CUSTOM_PMR you will need to provide a boost::cobalt::pmr namespace, that is a drop-in replacement for std::pmr.

Alternatively, the pmr use can be disabled with

  • BOOST_COBALT_NO_PMR.

In this case, cobalt will use a non-pmr monotonic resource for the synchronization functions (race, gather and join).

use_op uses a small-buffer-optimized resource which’s size can be set by defining BOOST_COBALT_SBO_BUFFER_SIZE and defaults to 4096 bytes.

cobalt/leaf.hpp

Async provides integration with boost.leaf. It provides functions similar to leaf that take an awaitables instead of a function object and return an awaitable.

template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );

template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );

See the leaf documentation for details.

In-Depth

Custom Executors

One of the reasons cobalt defaults to asio::any_io_executor is that it is a type-erased executor, i.e. you can provide your own event-loop without needing to recompile cobalt.

However, during the development of the Executor TS, the executor concepts got a bit unintuitive, to put it mildly.

Ruben Perez wrote an excellent blog post, which I am shamelessly going to draw from.

Definition

An executor is a type that points to the actual event loop and is (cheaply) copyable, which supports properties (see below) is equality comparable and has an execute function.

execute
struct example_executor
{
  template<typename Fn>
  void execute(Fn && fn) const;
};

The above function executes fn in accordance with its properties.

Properties

A property can be queried, preferred or required, e.g.:

struct example_executor
{
  // get a property by querying it.
  asio::execution::relationship_t &query(asio::execution::relationship_t) const
  {
    return asio::execution::relationship.fork;
  }

  // require an executor with a new property
  never_blocking_executor require(const execution::blocking_t::never_t);

  // prefer an executor with a new property. the executor may or may not support it.
  never_blocking_executor prefer(const execution::blocking_t::never_t);
  // not supported
  example_executor prefer(const execution::blocking_t::always_t);
};
Properties of the asio::any_io_executor

In order to wrap an executor in an asio::any_io_executor two properties are required:

  • `execution::context_t

  • execution::blocking_t::never_t

That means we need to either make them require-able (which makes no sense for context) or return the expected value from query.

The execution::context_t query should return asio::execution_context& like so:

struct example_executor
{
  asio::execution_context &query(asio::execution::context_t) const;
};

The execution context is used to manage lifetimes of services that manage lifetimes io-objects, such as asio’s timers & sockets. That is to say, by providing this context, all of asio’s io works with it.

The execution_context must remain alive after the executor gets destroyed.

The following may be preferred:

  • execution::blocking_t::possibly_t

  • execution::outstanding_work_t::tracked_t

  • execution::outstanding_work_t::untracked_t

  • execution::relationship_t::fork_t

  • execution::relationship_t::continuation_

That means you might want to support them in your executor for optimizations.

The blocking property

As we’ve seen before, this property controls whether the function passed to execute() can be run immediately, as part of execute(), or must be queued for later execution. Possible values are:

  • asio::execution::blocking.never: never run the function as part of execute(). This is what asio::post() does.

  • asio::execution::blocking.possibly: the function may or may not be run as part of execute(). This is the default (what you get when calling io_context::get_executor).

  • asio::execution::blocking.always: the function is always run as part of execute(). This is not supported by io_context::executor.

The relationship property

relationship can take two values:

  • asio::execution::relationship.continuation: indicates that the function passed to execute() is a continuation of the function calling execute().

  • asio::execution::relationship.fork: the opposite of the above. This is the default (what you get when calling io_context::get_executor()).

Setting this property to continuation enables some optimizations in how the function gets scheduled. It only has effect if the function is queued (as opposed to run immediately). For io_context, when set, the function is scheduled to run in a faster, thread-local queue, rather than the context-global one.

The outstanding_work_t property

outstanding_work can take two values:

  • asio::execution::outstanding_work.tracked: indicates that while the executor is alive, there’s still work to do.

  • asio::execution::outstanding_work.untracked: the opposite of the above. This is the default (what you get when calling io_context::get_executor()).

Setting this property to tracked means that the event loop will not return as long as the executor is alive.

A minimal executor

With this, let’s look at the interface of a minimal executor.

struct minimal_executor
{
  minimal_executor() noexcept;

  asio::execution_context &query(asio::execution::context_t) const;

  static constexpr asio::execution::blocking_t
  query(asio::execution::blocking_t) noexcept
  {
    return asio::execution::blocking.never;
  }

  template<class F>
  void execute(F && f) const;

  bool operator==(minimal_executor const &other) const noexcept;
  bool operator!=(minimal_executor const &other) const noexcept;
};
See example/python.cpp for an implementation using python’s asyncio event-loop.

Adding a work guard.

Now, let’s add in a require function for the outstanding_work property, that uses multiple types.

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};

struct untracked_executor : minimal_executor
{
  untracked_executor() noexcept;

  constexpr   tracked_executor require(asio::execution::outstanding_work::  tracked_t) const {return *this;}
  constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};

Note that it is not necessary to return a different type from the require function, it can also be done like this:

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor require(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};

If we wanted to use prefer it would look as shown below:

struct trackable_executor : minimal_executor
{
  trackable_executor() noexcept;

  constexpr trackable_executor prefer(asio::execution::outstanding_work::  tracked_t) const;
  constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};

Summary

As you can see, the property system is not trivial, but quite powerful. Implementing a custom executor is a problem category of its own, which is why this documentation doesn’t do that. Rather, there is an example of how to wrap a python event loop in an executor.

Below are some reading recommendations.

Stackless

C++20 coroutines are stackless, meaning they don’t have their own stack.

A stack in C++ describes the callstack, i.e. all the function frames stacked. A function frame is the memory a function needs to operate, i.e. a slice of memory to store its variables and information such as the return address.

The size of a function frame is known at compile time, but not outside the compile unit containing its definition.
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}

int main()
{
    return foo();
}

The call stack in the above example is:

main()
  foo()
    bar()
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    main->>+foo: call
    foo->>+bar: call
    bar->>-foo: return
    foo->>-main: return

Coroutines can be implemented a stackful, which means that it allocates a fixes chunk of memory and stacks function frames similar to a thread. C++20 coroutines are stackless, i.e. they only allocate their own frame and use the callers stack on resumption. Using our previous example:

fictional_eager_coro_type<int> example()
{
    co_yield 0;
    co_yield 1;
}

void nested_resume(fictional_eager_coro_type<int>& f)
{
    f.resume();
}

int main()
{
    auto f = example();
    nested_resume(f);
    f.reenter();
    return 0;
}

This will yield a call stack similar to this:

main()
  f$example()
  nested_resume()
    f$example()
  f$example()
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    participant main
    participant nested_resume
    main->>+example: create & call
    example-->>main: co_yield
    main->>+nested_resume: call
    nested_resume-->>example: resume
    example-->>nested_resume: co_yield
    nested_resume->>-main: return
    main-->>example: resume
    example->>-main: co_return

The same applies if a coroutine gets moved accross threads.

Lazy & eager

Coroutines are lazy if they only start execution of its code after it gets resumed, while an eager one will execute right-away until its first suspension point (i.e. a co_await, co_yield or co_return expression.)

lazy_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    lazy.resume();
    printf("resumed twice\n");
    return 0;
}

Which will produce output like this:

enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    participant main;
    Note left of main: "enter main"
    main-->>+lazy: co_example()
    Note left of main: "constructed coro"
    main->>lazy: resume()
    Note right of lazy: "Entered coro
    lazy-->>main: co_yield 0
    Note left of main: "resumed once"
    main-->>+lazy: resume()
    Note right of lazy: "Coro done"
    lazy->>main: co_return
    Note left of main: "resumed twice"

Whereas an eager coro would look like this:

eager_coro co_example()
{
    printf("Entered coro\n");
    co_yield 0;
    printf("Coro done\n");
}

int main()
{
    printf("enter main\n");
    auto lazy = co_example();
    printf("constructed coro\n");
    lazy.resume();
    printf("resumed once\n");
    return 0;
}

Which will produce output like this:

enter main
Entered coro
constructed coro
resume once
Coro Done
Failed to generate image: mmdc failed:
Error: Failed to launch the browser process!
/root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory


TROUBLESHOOTING: https://pptr.dev/troubleshooting

    at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24)
    at Interface.emit (node:events:529:35)
    at Interface.close (node:internal/readline/interface:534:10)
    at Socket.onend (node:internal/readline/interface:260:10)
    at Socket.emit (node:events:529:35)
    at endReadableNT (node:internal/streams/readable:1368:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)


sequenceDiagram
    participant main;
    Note left of main: "enter main"
    main->>lazy: co_example()
    Note right of lazy: "Entered coro
    lazy-->>main: co_yield 0
    Note left of main: "constructed coro"
    main-->>+lazy: resume()
    Note right of lazy: "Coro done"
    lazy->>main: co_return
    Note left of main: "resumed once"

Benchmarks

Run on 11th Gen Intel® Core™ i7-1185G7 @ 3.00GHz

Posting to an executor

The benchmark is running the following code, with cobalt’s task, asio::awaitable and `asio’s stackful coroutine (boost.context) based.

cobalt::task<void> atest()
{
  for (std::size_t i = 0u; i < n; i++)
    co_await asio::post(cobalt::use_op);
}
Table 6. results for 50M times in ms
gcc 12 clang 16

cobalt

2472

2098

awaitable

2432

2253

stackful

3655

3725

Running noop coroutine in parallel

This benchmark uses an asio::experimental::channel that has a size of zero, to read & write in parallel to it. It uses gather with cobalt and an awaitable_operator in the asio::awaitable.

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  for (std::size_t i = 0u; i < n; i++)
    co_await cobalt::gather(
              chan.async_send(system::error_code{}, cobalt::use_task),
              chan.async_receive(cobalt::use_task));
}

asio::awaitable<void> awtest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
  using boost::asio::experimental::awaitable_operators::operator&&;
  for (std::size_t i = 0u; i < n; i++)
    co_await (
        chan.async_send(system::error_code{}, asio::use_awaitable)
        &&
        chan.async_receive(asio::use_awaitable));
}
Table 7. results for 3M times in ms
gcc 12 clang 16

cobalt

1563

1468

awaitable

2800

2805

Immediate

This benchmark utilizes the immediate completion, by using a channel with a size of 1, so that every operation is immediate.

cobalt::task<void> atest()
{
  asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
  for (std::size_t i = 0u; i < n; i++)
  {
    co_await chan.async_send(system::error_code{}, cobalt::use_op);
    co_await chan.async_receive(cobalt::use_op);
  }
}
Table 8. result for 10M times in ms
gcc 12 clang 16

cobalt

1810

1864

awaitable

3109

4110

stackful

3922

4705

Channels

In this benchmark asio::experimental::channel and cobalt::channel get compared.

This is similar to the parallel test, but uses the cobalt::channel instead.

Table 9. result of running the test 3M times in ms
gcc clang

cobalt

500

350

awaitable

790

770

stackful

867

907

Operation Allocations

This benchmark compares the different possible solutions for the associated allocator of asynchronous operations

Table 10. result of running the test 2M times in ms
gcc clang

std::allocator

1136

1139

cobalt::monotonic

1149

1270

pmr::monotonic

1164

1173

cobalt::sbo

1021

1060

The latter method is used internally by cobalt.

Requirements

Libraries

Boost.cobalt requires a C++20 compilers and directly depends on the following boost libraries:

  • boost.asio

  • boost.system

  • boost.circular_buffer

  • boost.intrusive

  • boost.smart_ptr

  • boost.container (for clang < 16)

Compiler

This library is supported since Clang 14, Gcc 10 & MSVC 19.30 (Visual Studio 2022).

Gcc versions 12.1 and 12.2 appear to have a bug for coroutines with out stack variables as can be seen [here](https://godbolt.org/z/6adGcqP1z) and should be avoided for coroutines.

Clang only added std::pmr support in 16, so older clang versions use boost::contianer::pmr as a drop-in replacement.

Some if not all MSVC versions have a broken coroutine implementation, that this library needs to workaround. This may cause non-deterministic behaviour and overhead.

A coroutine continuation may be done in the awaitable returned from a final_suspend, like this:

// in promise
auto final_suspend() noexcept
{
    struct final_awaitable
    {
      std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
      bool await_ready() const noexcept;
      std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
      {
        auto cc = continuation;
        h.destroy(); (2)
        return cc;
      }

      void await_resume() noexcept {}
    };
    return final_awaitable{my_continuation};
};
1 The continuation
2 Self-destroying the coroutine before continuation

The final_suspend does not properly suspend the coroutine on MSVC, so that the h.destroy() will cause double destruction of elements on the coroutine frame. Therefor, msvc will need to post the destruction, to do it out of line. This will cause overhead and make the actual freeing of memory not deterministic.

Acknowledgements

This library would not have been possible without the CppAlliance and its founder Vinnie Falco. Vinnie trusted me enough to let me work on this project, while himself having very different views on how such a library should be designed.

Thanks also go to Ruben Perez & Richard Hodges for listening to my design problems and giving me advice & use-cases. Furthermore, this library would not have been possible without the great boost.asio by Chris Kohlhoff.