Overview
Here’s a list of relevant features in cobalt:
An eager coroutine returning a single result- consider it the default |
|
An eager coroutine that can yield multiple values. |
|
A lazy version of promise that can be spawned onto other executors. |
|
A coroutine similar to promise, without a handle |
A function that waits for one coroutine out of a set that is ready in a pseudo-random way, to avoid starvation. |
|
A function that waits for a set of coroutines and returns all of them as value or throws an exception if any awaitable does so. |
|
A function that waits for a set of coroutines and returns all of them as |
|
A deterministic |
A thread-local utility to send values between coroutines. |
|
An async RAII helper, that allows async teardown when exceptions occur |
A short introduction to C++ coroutines |
Read if you’ve never used coroutines before |
|
An abbreviated high level view of the features and concepts |
Read if you’re familiar with asio & coroutines and want a rough idea what this library offers. |
|
Low level view of usages |
Read if you want to get coding quickly |
|
API reference |
Look up details while coding |
|
Some implementation details |
Read if you’re not confused enough |
Motivation
Many languages programming languages like node.js and python provide easy to use single-threaded concurrency frameworks. While more complex than synchronous code, single threaded asynchronicity avoids many of the pitfalls & overhead of multi-threading.
That is, one coroutine can work, while others wait for events (e.g. a response from a server). This allows to write applications that do multiple things simultaneously on a single thread.
This library is meant to provide this to C++: simple single threaded asynchronicity
akin to node.js and asyncio in python that works with existing libraries like
boost.beast
, boost.mysql
or boost.redis
.
It based on boost.asio
.
It takes a collection of concepts from other languages and provides them based on C++20 coroutines.
-
easy asynchronous base functions, such as an async main & threads
-
an async scope
Unlike asio::awaitable
and asio::experimental::coro
, cobalt
coroutines are open.
That is, an asio::awaitable
can only await and be awaited by other asio::awaitable
and does not provide coroutine specific synchronization mechanisms.
cobalt
on the other hand provides a coroutine specific channel
and different wait types (race
, gather
etc.) that are optimized
to work with coroutines and awaitables.
Coroutine Primer
Async programming
Asynchronous programming generally refers to a style of programming that allows tasks to be run in the background, while the other works is performed.
Imagine if you will a get-request function that performs a full http request including connecting & ssl handshakes etc.
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
auto res = http_get("https://boost.org");
printf("%s", res.c_str());
return 0;
}
The above code would be traditional synchronous programming. If we want to perform two requests in parallel we would need to create another thread to run another thread with synchronous code.
std::string http_get(std:string_view url);
int main(int argc, char * argv[])
{
std::string other_res;
std::thread thr{[&]{ other_res = http_get("https://cppalliance.org"); }};
auto res = http_get("https://boost.org");
thr.join();
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
This works, but our program will spend most of the time waiting for input. Operating systems provide APIs that allow IO to be performed asynchronously, and libraries such as boost.asio provide portable ways to manage asynchronous operations. Asio itself does not dictate a way to handle the completions. This library (boost.cobalt) provides a way to manage this all through coroutines/awaitables.
cobalt::promise<std::string> http_cobalt_get(std:string_view url);
cobalt::main co_main(int argc, char * argv[])
{
auto [res, other_res] =
cobalt::join(
http_cobalt_get(("https://boost.org"),
http_cobalt_get(("https://cppalliance.org")
);
printf("%s", res.c_str());
printf("%s", other_res.c_str());
return 0;
}
In the above code the asynchronous function to perform the request takes advantage of the operating system APIs so that the actual IO doesn’t block. This means that while we’re waiting for both functions to complete, the operations are interleaved and non-blocking. At the same time cobalt provides the coroutine primitives that keep us out of callback hell.
Coroutines
Coroutines are resumable functions. Resumable means that a function can suspend, i.e. pass the control back to the caller multiple times.
A regular function yields control back to the caller with the return
function, where it also returns the value.
A coroutine on the other hand might yield control to the caller and get resumed multiple times.
A coroutine has three control keywords akin to co_return
(of which only co_return
has to be supported).
-
co_return
-
co_yield
-
co_await
co_return
This is similar to return
, but marks the function as a coroutine.
co_await
The co_await
expression suspends for an Awaitable,
i.e. stops execution until the awaitable
resumes it.
E.g.:
cobalt::promise<void> delay(std::chrono::milliseconds);
cobalt::task<void> example()
{
co_await delay(std::chrono::milliseconds(50));
}
A co_await
expression can yield a value, depending on what it is awaiting.
cobalt::promise<std::string> read_some();
cobalt::task<void> example()
{
std::string res = co_await read_some();
}
In cobalt most coroutine primitives are also Awaitables.
|
co_yield
The co_yield
expression is similar to the co_await
,
but it yields control to the caller and carries a value.
For example:
cobalt::generator<int> iota(int max)
{
int i = 0;
while (i < max)
co_yield i++;
co_return i;
}
A co_yield
expression can also produce a value,
which allows the user of yielding coroutine to push values into it.
cobalt::generator<int> iota()
{
int i = 0;
bool more = false;
do
{
more = co_yield i++;
}
while(more);
co_return -1;
}
Awaitables
Awaitables are types that can be used in a co_await
expression.
struct awaitable_prototype
{
bool await_ready();
template<typename T>
see_below await_suspend(std::coroutine_handle<T>);
return_type await_resume();
};
Type will be implicitly converted into an awaitable if there is an operator co_await call available.
This documentation will use awaitable to include these types,
and "actual_awaitable" to refer to type conforming to the above prototype.
|
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) flowchart TD aw{await_ready?} aw ---->|true| ar[await_resume] aw -->|false| as[await_suspend] as -->|Resume| ar
In a co_await
expression the waiting coroutine will first invoke
await_ready
to check if the coroutine needs to suspend.
When ready, it goes directly to await_resume
to get the value,
as there is no suspension needed.
Otherwise, it will suspend itself and call await_suspend
with a
std::coroutine_handle
to its own promise.
std::coroutine_handle<void> can be used for type erasure.
|
The return_type is the result type of the co_await expression
, e.g. int
:
int i = co_await awaitable_with_int_result();
The return type of the await_suspend
can be three things:
-
void
-
bool
-
std::coroutine_handle<U>
If it is void the awaiting coroutine remains suspended. If it is bool
,
the value will be checked, and if false, the awaiting coroutine will resume right away.
If a std::coroutine_handle
is returned, this coroutine will be resumed.
The latter allows await_suspend
to return the handle passed in,
being effectively the same as returning false
.
If the awaiting coroutine gets re-resumed right away, i.e. after calling await_resume,
it is referred to as "immediate completion" within this library.
This is not to be confused with a non-suspending awaitable, i.e. one that returns true
from await_ready
.
Event Loops
Since the coroutines in cobalt
can co_await
events, they need to be run on an event-loop.
That is another piece of code is responsible for tracking outstanding event and resume a resuming coroutines that are awaiting them.
This pattern is very common and is used in a similar way by node.js or python’s asyncio
.
cobalt
uses an asio::io_context
as its default event loop. That is, the classes thread, main and the run function
are using it internally.
You can use any event loop that can produce an asio::any_io_executor
with the library. The easiest way to achieve this is by using spawn.
The event loop is accessed through an executor (following the asio terminology) and can be manually set using set_executor.
Tour
Entry into an cobalt environment
In order to use awaitables we need to be able to co_await
them, i.e. be within a coroutine.
We got four ways to achieve this:
- cobalt/main.hpp
-
replace
int main
with a coroutine
cobalt::main co_main(int argc, char* argv[])
{
// co_await things here
co_return 0;
}
- cobalt/thread.hpp
-
create a thread for the asynchronous environments
cobalt::thread my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
auto t = my_thread();
t.join();
return 0;
}
- cobalt/task.hpp
-
create a task and run or spawn it
cobalt::task<void> my_thread()
{
// co_await things here
co_return;
}
int main(int argc, char ** argv[])
{
cobalt::run(my_task()); // sync
asio::io_context ctx;
cobalt::spawn(ctx, my_task(), asio::detached);
ctx.run();
return 0;
}
Promises
Promises are the recommended default coroutine type. They’re eager and thus easily usable for ad-hoc concurrecy.
cobalt::promise<int> my_promise()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// start the promise here
auto p = my_promise();
// do something else here
co_await do_the_other_thing();
// wait for the promise to complete
auto res = co_wait p;
co_return res;
}
Tasks
Tasks are lazy, which means they won’t do anything before awaited or spwaned.
cobalt::task<int> my_task()
{
co_await do_the_thing();
co_return 0;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the task here
auto t = my_task();
// do something else here first
co_await do_the_other_thing();
// start and wait for the task to complete
auto res = co_wait t;
co_return res;
}
Generator
A generator is the only type in cobalt that can co_yield
values.
Generator are eager by default. Unlike std::generator
the cobalt::generator
can co_await
and thus is asynchronous.
cobalt::generator<int> my_generator()
{
for (int i = 0; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator();
while (g)
printf("Generator %d\n", co_await g);
co_return 0;
}
Values can be pushed into the generator, that will be returned from the co_yield
.
cobalt::generator<double, int> my_eager_push_generator(int value)
{
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(5);
assert(0.5 == co_await g(4)); // result of 5
assert(0.4 == co_await g(3)); // result of 4
assert(0.3 == co_await g(2)); // result of 3
assert(0.2 == co_await g(1)); // result of 2
assert(0.1 == co_await g(0)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
A coroutine can also be made lazy using this_coro::initial
.
cobalt::generator<double, int> my_eager_push_generator()
{
auto value = co_await this_coro::initial;
while (value != 0)
value = co_yield value * 0.1;
co_return std::numeric_limits<double>::quiet_NaN();
}
cobalt::main co_main(int argc, char * argv[])
{
// create the generator
auto g = my_generator(); // lazy, so the generator waits for the first pushed value
assert(0.5 == co_await g(5)); // result of 5
assert(0.4 == co_await g(4)); // result of 4
assert(0.3 == co_await g(3)); // result of 3
assert(0.2 == co_await g(2)); // result of 2
assert(0.1 == co_await g(1)); // result of 1
// we let the coroutine go out of scope while suspended
// no need for another co_await of `g`
co_return 0;
}
join
If multiple awaitables work in parallel they can be awaited simultaneously with join.
cobalt::promise<int> some_work();
cobalt::promise<double> more_work();
cobalt::main co_main(int argc, char * argv[])
{
std::tuple<int, double> res = cobalt::join(some_work(), more_work());
co_return 0;
}
race
If multiple awaitables work in parallel, but we want to be notified if either completes, we shall use race.
cobalt::generator<int> some_data_source();
cobalt::generator<double> another_data_source();
cobalt::main co_main(int argc, char * argv[])
{
auto g1 = some_data_source();
auto g2 = another_data_source();
int res1 = co_await g1;
double res2 = co_await g2;
printf("Result: %f", res1 * res2);
while (g1 && g2)
{
switch(variant2::variant<int, double> nx = co_await cobalt::race(g1, g2))
{
case 0:
res1 = variant2::get<0>(nx);
break;
case 1:
res2 = variant2::get<1>(nx);
break;
}
printf("New result: %f", res1 * res2);
}
co_return 0;
}
race in this context will not cause any data loss.
|
Tutorial
delay
Let’s start with the simplest example possible: a simple delay.
cobalt::main co_main(int argc, char * argv[]) (1)
{
asio::steady_timer tim{co_await asio::this_coro::executor, (2)
std::chrono::milliseconds(std::stoi(argv[1]))}; (3)
co_await tim.async_wait(cobalt::use_op); (4)
co_return 0; (5)
}
1 | The co_main function defines an implicit main when used
and is the easiest way to set up an environment to run asynchronous code. |
2 | Take the executor from the current coroutine promise. |
3 | Use an argument to set the timeout |
4 | Perform the wait by using cobalt::use_op. |
5 | Return a value that gets returned from the implicit main. |
In this example we use the cobalt/main.hpp header, which provides us with a main coroutine if co_main
is defined as above. This has a few advantages:
-
The environment get set up correctly (
executor
&memory
) -
asio is signaled that the context is single threaded
-
an
asio::signal_set
withSIGINT
&SIGTERM
is automatically connected to cancellations (i.e.Ctrl+C
causes cancellations)
This coroutine then has an executor in its promise (the promise the C++ name for a coroutine state. Not to be confused with cobalt/promise.hpp) which we can obtain through the dummy-awaitables in the this_coro namespace.
echo server
We’ll be using the use_op
(asio completion) token everywhere,
so we’re using a default completion token, so that we can skip the last parameters.
namespace cobalt = boost::cobalt;
using boost::asio::ip::tcp;
using boost::asio::detached;
using tcp_acceptor = cobalt::use_op_t::as_default_on_t<tcp::acceptor>;
using tcp_socket = cobalt::use_op_t::as_default_on_t<tcp::socket>;
namespace this_coro = boost::cobalt::this_coro;
We’re writing the echo function as a promise coroutine. It’s an eager coroutine and recommended as the default; in case a lazy coro is needed, task is available.
cobalt::promise<void> echo(tcp_socket socket)
{
try (1)
{
char data[4096];
while (socket.is_open()) (2)
{
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data)); (3)
co_await async_write(socket, boost::asio::buffer(data, n)); (4)
}
}
catch (std::exception& e)
{
std::printf("echo: exception: %s\n", e.what());
}
}
1 | When using the use_op completion token, I/O errors are translated into C++ exceptions. Additionally, if the coroutine gets cancelled (e.g. because the user hit Ctrl-C), an exception will be raised, too. Under these conditions, we print the error and exit the loop. |
2 | We run the loop until we get cancelled (exception) or the user closes the connection. |
3 | Read as much as is available. |
4 | Write all the read bytes. |
Note that promise is eager. Calling echo
will immediately execute code until async_read_some
and then return control to the caller.
Next, we also need an acceptor function. Here, we’re using a generator to manage the acceptor state.
This is a coroutine that can be co_awaited multiple times, until a co_return
expression is reached.
cobalt::generator<tcp_socket> listen()
{
tcp_acceptor acceptor({co_await cobalt::this_coro::executor}, {tcp::v4(), 55555});
for (;;) (1)
{
tcp_socket sock = co_await acceptor.async_accept(); (2)
co_yield std::move(sock); (3)
}
co_return tcp_socket{acceptor.get_executor()}; (4)
}
1 | Cancellation will also lead to an exception here being thrown from the co_await |
2 | Asynchronously accept the connection |
3 | Yield it to the awaiting coroutine |
4 | co_return a value for C++ conformance. |
With those two functions we can now write the server:
cobalt::promise<void> run_server(cobalt::wait_group & workers)
{
auto l = listen(); (1)
while (true)
{
if (workers.size() == 10u)
co_await workers.wait_one(); (2)
else
workers.push_back(echo(co_await l)); (3)
}
}
1 | Construct the listener generator coroutine. When the object is destroyed, the coroutine will be cancelled, performing all required cleanup. |
2 | When we have more than 10 workers, we wait for one to finish |
3 | Accept a new connection & launch it. |
The wait_group is used to manage the running echo functions.
This class will cancel & await the running echo
coroutines.
We do not need to do the same for the listener
, because it will just stop on its own, when l
gets destroyed.
The destructor of a generator will cancel it.
Since the promise
is eager, just calling it is enough to launch.
We then put those promises into a wait_group which will allow us to tear down all the workers on scope exit.
cobalt::main co_main(int argc, char ** argv)
{
co_await cobalt::with(cobalt::wait_group(), &run_server); (1)
co_return 0u;
}
1 | Run run_server with an async scope. |
The with function shown above, will run a function with a resource such as wait_group.
On scope exit with
will invoke & co_await
an asynchronous teardown function.
This will cause all connections to be properly shutdown before co_main
exists.
price ticker
To demonstrate channels
and other tools, we need a certain complexity.
For that purpose our project is a price ticker, that connects to
https://blockchain.info. A user can then connection to localhost
to query a given currency pair, like this:
wscat -c localhost:8080/btc/usd
First we do the same declarations as echo-server.
using executor_type = cobalt::use_op_t::executor_with_default<cobalt::executor>;
using socket_type = typename asio::ip::tcp::socket::rebind_executor<executor_type>::other;
using acceptor_type = typename asio::ip::tcp::acceptor::rebind_executor<executor_type>::other;
using websocket_type = beast::websocket::stream<asio::ssl::stream<socket_type>>;
namespace http = beast::http;
The next step is to write a function to connect an ssl-stream, to connect upstream:
cobalt::promise<asio::ssl::stream<socket_type>> connect(
std::string host, boost::asio::ssl::context & ctx)
{
asio::ip::tcp::resolver res{cobalt::this_thread::get_executor()};
auto ep = co_await res.async_resolve(host, "https", cobalt::use_op); (1)
asio::ssl::stream<socket_type> sock{cobalt::this_thread::get_executor(), ctx};
co_await sock.next_layer().async_connect(*ep.begin()); (2)
co_await sock.async_handshake(asio::ssl::stream_base::client); (3)
co_return sock; (4)
}
1 | Lookup the host |
2 | Connect to the endpoint |
3 | Do the ssl handshake |
4 | Return the socket to the caller |
Next, we’ll need a function to do the websocket upgrade on an existing ssl-stream.
cobalt::promise<void> connect_to_blockchain_info(websocket_type & ws)
{
ws.set_option(beast::websocket::stream_base::decorator(
[](beast::websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " cobalt-ticker");
req.set(http::field::origin,
"https://exchange.blockchain.com"); (1)
}));
co_await ws.async_handshake("ws.blockchain.info", "/mercury-gateway/v1/ws"); (2)
}
1 | blockchain.info requires this header to be set. |
2 | Perform the websocket handshake. |
Once the websocket is connected, we want to continuously receive json messages, for which a generator is a good choice.
cobalt::generator<json::object> json_reader(websocket_type & ws)
try
{
beast::flat_buffer buf;
while (ws.is_open()) (1)
{
auto sz = co_await ws.async_read(buf); (2)
json::string_view data{static_cast<const char*>(buf.cdata().data()), sz};
auto obj = json::parse(data);
co_yield obj.as_object(); (3)
buf.consume(sz);
}
co_return {};
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | Keep running as long as the socket is open |
2 | Read a frame from the websocket |
3 | Parse & co_yield it as an object. |
This then needs to be connected to subscriber, for which we’ll utilize channels to pass raw json.
To make life-time management easy, the subscriber will hold a shared_ptr
, and the producer a weak_ptr
.
using subscription = std::pair<std::string, std::weak_ptr<cobalt::channel<json::object>>>;
using subscription_channel = std::weak_ptr<cobalt::channel<json::object>>;
using subscription_map = boost::unordered_multimap<std::string, subscription_channel>;
The main function running the blockchain connector, operates on two inputs: data coming from the websocket and a channel to handle new subscriptions.
cobalt::promise<void> run_blockchain_info(cobalt::channel<subscription> & subc)
try
{
asio::ssl::context ctx{asio::ssl::context_base::tls_client};
websocket_type ws{co_await connect("blockchain.info", ctx)};
co_await connect_to_blockchain_info(ws); (1)
subscription_map subs;
std::list<std::string> unconfirmed;
auto rd = json_reader(ws); (2)
while (ws.is_open()) (3)
{
switch (auto msg = co_await cobalt::race(rd, subc.read()); msg.index()) (4)
{
case 0: (5)
if (auto ms = get<0>(msg);
ms.at("event") == "rejected") // invalid sub, cancel however subbed
co_await handle_rejections(unconfirmed, subs, ms);
else
co_await handle_update(unconfirmed, subs, ms, ws);
break;
case 1: // (6)
co_await handle_new_subscription(
unconfirmed, subs,
std::move(get<1>(msg)), ws);
break;
}
}
for (auto & [k ,c] : subs)
{
if (auto ptr = c.lock())
ptr->close();
}
}
catch(std::exception & e)
{
std::cerr << "Exception: " << e.what() << std::endl;
throw;
}
1 | Initialize the connection |
2 | Instantiate the json_reader |
3 | Run as long as the websocket is open |
4 | Select, i.e. wait for either a new json message or subscription |
5 | When it’s a json handle an update or a rejection |
6 | Handle new subscription messages |
The handle_*
function’s contents are not as important for the cobalt
functionality,
so it’s skipped in this tutorial.
The handle_new_subscription
function sends a message to the blockchain.info
,
which will send a confirmation or rejection back.
The handle_rejection
and handle_update
will take the json values
and forward them to the subscription channel.
On the consumer side, our server will just forward data to the client.
If the client inputs data, we’ll close the websocket immediately.
We’re using as_tuple
to ignore potential errors.
cobalt::promise<void> read_and_close(beast::websocket::stream<socket_type> & st, beast::flat_buffer buf)
{
system::error_code ec;
co_await st.async_read(buf, asio::as_tuple(cobalt::use_op));
co_await st.async_close(beast::websocket::close_code::going_away, asio::as_tuple(cobalt::use_op));
st.next_layer().close(ec);
}
Next, we’re running the session that the users sends
cobalt::promise<void> run_session(beast::websocket::stream<socket_type> st,
cobalt::channel<subscription> & subc)
try
{
http::request<http::empty_body> req;
beast::flat_buffer buf;
co_await http::async_read(st.next_layer(), buf, req); (1)
// check the target
auto r = urls::parse_uri_reference(req.target());
if (r.has_error() || (r->segments().size() != 2u)) (2)
{
http::response<http::string_body> res{http::status::bad_request, 11};
res.body() = r.has_error() ? r.error().message() :
"url needs two segments, e.g. /btc/usd";
co_await http::async_write(st.next_layer(), res);
st.next_layer().close();
co_return ;
}
co_await st.async_accept(req); (3)
auto sym = std::string(r->segments().front()) + "-" +
std::string(r->segments().back());
boost::algorithm::to_upper(sym);
// close when data gets sent
auto p = read_and_close(st, std::move(buf)); (4)
auto ptr = std::make_shared<cobalt::channel<json::object>>(1u); (5)
co_await subc.write(subscription{sym, ptr}); (6)
while (ptr->is_open() && st.is_open()) (7)
{
auto bb = json::serialize(co_await ptr->read());
co_await st.async_write(asio::buffer(bb));
}
co_await st.async_close(beast::websocket::close_code::going_away,
asio::as_tuple(cobalt::use_op)); (8)
st.next_layer().close();
co_await p; (9)
}
catch(std::exception & e)
{
std::cerr << "Session ended with exception: " << e.what() << std::endl;
}
1 | Read the http request, because we want the path |
2 | Check the path, e.g. /btc/usd . |
3 | Accept the websocket |
4 | Start reading & close if the consumer sends something |
5 | Create the channel to receive updates |
6 | Send a subscription requests to run_blockchain_info |
7 | While the channel & websocket are open, we’re forwarding data. |
8 | Close the socket & ignore the error |
9 | Since the websocket is surely closed by now, wait for the read_and_close to close. |
With run_session
and run_blockchain_info
written, we can not move on to main:
cobalt::main co_main(int argc, char * argv[])
{
acceptor_type acc{co_await cobalt::this_coro::executor,
asio::ip::tcp::endpoint (asio::ip::tcp::v4(), 8080)};
std::cout << "Listening on localhost:8080" << std::endl;
constexpr int limit = 10; // allow 10 ongoing sessions
cobalt::channel<subscription> sub_manager; (1)
co_await join( (2)
run_blockchain_info(sub_manager),
cobalt::with( (3)
cobalt::wait_group(
asio::cancellation_type::all,
asio::cancellation_type::all),
[&](cobalt::wait_group & sessions) -> cobalt::promise<void>
{
while (!co_await cobalt::this_coro::cancelled) (4)
{
if (sessions.size() >= limit) (5)
co_await sessions.wait_one();
auto conn = co_await acc.async_accept(); (6)
sessions.push_back( (7)
run_session(
beast::websocket::stream<socket_type>{std::move(conn)},
sub_manager));
}
})
);
co_return 0;
}
1 | Create the channel to manage subscriptions |
2 | Use join to run both tasks in parallel. |
3 | Use an cobalt scope to provide a wait_group . |
4 | Run until cancelled. |
5 | When we’ve reached the limit we wait for one task to complete. |
6 | Wait for a new connection. |
7 | Insert the session into the wait_group . |
Main is using join
because one task failing should cancel the other one.
delay op
We’ve used the use_op
so far, to use an implicit operation based on asio’s completion token mechanic.
We can however implement our own ops, that can also utilize the await_ready
optimization.
Unlike immediate completion, the coroutine will never suspend when await_ready
returns true.
To leverage this coroutine feature, cobalt
provides an easy way to create a skipable operation:
struct wait_op final : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
void ready(cobalt::handler<system::error_code> h ) override (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) override (3)
{
tim.async_wait(std::move(complete));
}
};
cobalt::main co_main(int argc, char * argv[])
{
asio::steady_timer tim{co_await asio::this_coro::executor,
std::chrono::milliseconds(std::stoi(argv[1]))};
co_await wait_op(tim); (4)
co_return 0; //
}
1 | Declare the op. We inherit op to make it awaitable. |
2 | The pre-suspend check is implemented here |
3 | Do the wait if we need to |
4 | Use the op just like any other awaitable. |
This way we can minimize the amounts of coroutine suspensions.
While the above is used with asio, you can also use these handlers with any other callback based code.
Generator with push value
Coroutines with push values are not as common, but can simplify certain issues significantly.
Since we’ve already got a json_reader in the previous example, here’s how we can write a json_writer that gets values pushed in.
The advantage of using a generator is the internal state management.
cobalt::generator<system::error_code, json::object>
json_writer(websocket_type & ws)
try
{
char buffer[4096];
json::serializer ser;
while (ws.is_open()) (1)
{
auto val = co_yield system::error_code{}; (2)
while (!ser.done())
{
auto sv = ser.read(buffer);
co_await ws.cobalt_write({sv.data(), sv.size()}); (3)
}
}
co_return {};
}
catch (system::system_error& e)
{
co_return e.code();
}
catch (std::exception & e)
{
std::cerr << "Error reading: " << e.what() << std::endl;
throw;
}
1 | Keep running as long as the socket is open |
2 | co_yield the current error and retrieve a new value. |
3 | Write a frame to the websocket |
Now we can use the generator like this:
auto g = json_writer(my_ws);
extern std::vector<json::value> to_write;
for (auto && tw : std::move(to_write))
{
if (auto ec = co_await g(std::move(tw)))
return ec; // yield error
}
Advanced examples
More examples are provided in the repository as code only. All examples are listed below.
An http client that performs a single http get request. |
|
Using the |
|
Using nanobind to integrate cobalt with python. It uses python’s asyncio as executor and allows C++ to co_await python functions et vice versa. |
|
Adopting |
|
Creating a |
|
Using worker threads with |
|
Using an |
|
The example used by the delay section |
|
The example used by the delay op section |
|
The example used by the echo server section |
|
The example used by the price ticker section |
|
The example used by the channel reference |
Design
Concepts
This library has two fundamental concepts:
-
coroutine
An awaitable is an expression that can be used with co_await
from within a coroutine, e.g.:
co_await delay(50ms);
However, a coroutine promise can define an await_transform
,
i.e. what is actually valid to use with co_await
expression depends on the coroutine.
Thus, we should redefine what an awaitable is:
An awaitable is a type that can be co_await
-ed from within a coroutine,
which promise does not define await_transform
.
A pseudo-keyword
is a type that can be used in a coroutines that is adds special
functionality for it due to its promise await_transform
.
All the verbs in the this_coro namespace are such pseudo-keywords.
auto exec = co_await this_coro::executor;
This library exposes a set of enable_* base classes for promises,
to make the creation of custom coroutines easy.
This includes the enable_awaitables, which provides an await_transform
that just forward awaitables.
|
A coroutine in the context of this documentation refers to an asynchronous coroutine, i.e. synchronous coroutines like std::generator are not considered.
All coroutines except main are also actual awaitables.
Executors
Since everything is asynchronous the library needs to use an event-loop.
Because everything is single-threaded, it can be assumed that there is exactly one executor
per thread, which will suffice for 97% of use-cases.
Therefore, there is a thread_local
executor that gets used as default
by the coroutine objects (although stored by copy in the coroutine promise).
Likewise, there is one executor
type used by the library,
which defaults to asio::any_io_executor
.
If you write your own coroutine, it should hold a copy of the executor,
and have a get_executor function returning it by const reference.
|
Using Strands
While strands can be used, they are not compatible with the thread_local
executor.
This is because they might switch threads, thus they can’t be thread_local
.
If you wish to use strands (e.g. through a spawn) the executor for any promise, generator or channel must be assigned manually.
In the case of a channel this is a constructor argument,
but for the other coroutine types, asio::executor_arg
needs to be used.
This is done by having asio::executor_arg_t
(somewhere) in the argument
list directly followed by the executor to be used in the argument list of the coroutine, e.g.:
cobalt::promise<void> example_with_executor(int some_arg, asio::executor_arg_t, cobalt::executor);
This way the coroutine-promise can pick up the executor from the third argument, instead of defaulting to the thread_local one.
The arguments can of course be defaulted, to make them less inconvenient, if they are sometimes with a thread_local executor.
cobalt::promise<void> example_with_executor(int some_arg,
asio::executor_arg_t = asio::executor_arg,
cobalt::executor = cobalt::this_thread::get_executor());
If this gets omitted on a strand an exception of type asio::bad_allocator
is thrown,
or - worse - the wrong executor is used.
polymorphic memory resource
Similarly, the library uses a thread_local pmr::memory_resource
to allocate
coroutine frames & to use as allocator on asynchronous operations.
The reason is, that users may want to customize allocations,
e.g. to avoid locks, limit memory usage or monitor usage.
pmr
allows us to achieve this without introducing unnecessary template parameters,
i.e. no promise<T, Allocator>
complexity.
Using pmr
however does introduce some minimal overheads,
so a user has the option to disable by defining BOOST_COBALT_NO_PMR
.
op uses an internal resource optimized for asio’s allocator usages
and gather, race and join use a monotonic resource to miminize allocations.
Both still work with BOOST_COBALT_NO_PMR
defined, in which case they’ll use new/delete
as upstream allocations.
If you write your own coroutine, it should have a get_allocator function
returning a pmr::polymorphic_allocator<void> .
|
cancellation
cobalt uses implicit cancellation based on asio::cancellation_signal
.
This is mostly used implicitly (e.g. with race),
so that there is very little explicit use in the examples.
If you write custom coroutine it must return a cancellation_slot from a
get_cancellation_slot function in order to be able to cancel other operations.
|
If you write a custom awaitable, it can use that function in await_suspend to receive cancellation signals. |
Promise
The main coroutine type is a promise
, which is eager.
The reason to default to this, is that the compiler can optimize out
promises that do not suspend, like this:
cobalt::promise<void> noop()
{
co_return;
}
Awaiting the above operation is in theory a noop, but practically speaking, compilers aren’t there as of 2023.
Select
The most important synchronization mechanism is the race
function.
It awaits multiple awaitables in a pseudo-random order and will return the result of the first one completion, before disregarding the rest.
That is, it initiates the co_await
in a pseudo-random order and stops once one
awaitable is found to be ready or completed immediately.
cobalt::generator<int> gen1();
cobalt::generator<double> gen2();
cobalt::promise<void> p()
{
auto g1 = gen1();
auto g2 = gen2();
while (!co_await cobalt::this_coro::cancelled)
{
switch(auto v = co_await race(g1, g2); v.index())
{
case 0:
printf("Got int %d\n", get<0>(v));
break;
case 1:
printf("Got double %f\n", get<1>(v));
break;
}
}
}
The race
must however internally wait for all awaitable to complete
once it initiates to co_await
.
Therefor, once the first awaitable completes,
it tries to interrupt the rest, and if that fails cancels them.
race
is the preferred way to trigger cancellations, e.g:
cobalt::promise<void> timeout();
cobalt::promise<void> work();
race(timeout(), work());
interrupt_await
If it naively cancelled it would however lose data.
Thus, the concept of interrupt_await
is introduced,
which tells the awaitable (that supports it)
to immediately resume the awaiter and return or throw an ignored value.
struct awaitable
{
bool await_ready() const;
template<typename Promise>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
T await_resume();
void interrupt_await() &;
};
If the interrupt_await
doesn’t result in immediate resumption (of h
),
race
will send a cancel signal.
race
applies these with the correct reference qualification:
auto g = gen1();
race(g, gen2());
The above will call a interrupt_await() &
function for g1
and interrupt_await() &&
for g2
if available.
Generally speaking, the coroutines in cobalt support lvalue interruption, i.e. interrupt_await() & .
channel operations are unqualified, i.e. work in both cases.
|
Associators
cobalt
uses the associator
concept of asio, but simplifies it.
That is, it has three associators that are member functions of an awaiting promise.
-
const executor_type & get_executor()
(alwaysexecutor
, must return by const ref) -
allocator_type get_allocator()
(alwayspmr::polymorphic_allocator<void>
) -
cancellation_slot_type get_cancellation_slot()
(must have the same IF asasio::cancellation_slot
)
cobalt
uses concepts to check if those are present in its await_suspend
functions.
That way custom coroutines can support cancellation, executors and allocators.
In a custom awaitable you can obtain them like this:
struct my_awaitable
{
bool await_ready();
template<typename T>
void await_suspend(std::corutine_handle<P> h)
{
if constexpr (requires (Promise p) {p.get_executor();})
handle_executor(h.promise().get_executor();
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cl = h.promise().get_cancellation_slot()).is_connected())
cl.emplace<my_cancellation>();
}
void await_resume();
};
Cancellation gets connected in a co_await
expression (if supported by the coroutine & awaitable),
including synchronization mechanism like race.
Threading
This library is single-threaded by design, because this simplifies resumption and thus more performant handling of synchronizations like race. race would need to lock every raceed awaitable to avoid data loss which would need to be blocking and get worse with every additional element.
you can’t have any coroutines be resumed on a different thread than created on, except for a task (e.g. using spawn). |
The main technical reason is that the most efficient way of switching coroutines is by returning the handle
of the new coroutine from await_suspend
like this:
struct my_awaitable
{
bool await_ready();
std::coroutine_handle<T> await_suspend(std::coroutine_handle<U>);
void await_resume();
};
In this case, the awaiting coroutine will be suspended before await_suspend is called, and the coroutine returned is resumed. This of course doesn’t work if we need to go through an executor.
This doesn’t only apply to awaited coroutines, but channels, too.
The channels in this library use an intrusive list of awaitables
and may return the handle of reading (and thus suspended) coroutine
from a write_operation’s await_suspend
.
Reference
cobalt/main.hpp
The easiest way to get started with an cobalt application is to use the co_main
function with the following signature:
cobalt::main co_main(int argc, char *argv[]);
Declaring co_main
will add a main
function that performs all the necessary steps to run a coroutine on an event loop.
This allows us to write very simple asynchronous programs.
cobalt::main co_main(int argc, char *argv[])
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | get the executor main running on |
2 | Use it with an asio object |
3 | co_await an cobalt operation |
The main promise will create an asio::signal_set
and uses it for cancellation.
SIGINT
becomes total , while SIGTERM
becomes terminal cancellation.
The cancellation will not be forwarded to detached coroutines. The user will need to take care to end then on cancellation, since the program otherwise doesn’t allow graceful termination. |
Executor
It will also create an asio::io_context
to run on, which you can get through the this_coro::executor
.
It will be assigned to the cobalt::this_thread::get_executor()
.
Memory Resource
It also creates a memory resource that will be used as a default for internal memory allocations.
It will be assigned to the thread_local
to the cobalt::this_thread::get_default_resource()
.
Promise
Every coroutine has an internal state, called promise
(not to be confused with the cobalt::promise
).
Depending on the coroutine properties different things can be co_await
-ed, like we used in the example above.
They are implemented through inheritance, and shared among different promise types
The main promise has the following properties.
Specification
-
declaring
co_main
will implicitly declare amain
function -
main
is only present whenco_main
is defined. -
SIGINT
andSIGTERM
will cause cancellation of the internal task.
cobalt/promise.hpp
A promise is an eager coroutine that can co_await
and co_return
values. That is, it cannot use co_yield
.
cobalt::promise<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
Promises are by default attached.
This means, that a cancellation is sent when the promise
handles goes out of scope.
A promise can be detached by calling detach
or by using the prefix +
operator.
This is a runtime alternative to using detached.
A detached promise will not send a cancellation on destruction.
cobalt::promise<void> my_task();
cobalt::main co_main(int argc, char *argv[])
{
+my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | By using + the task gets detached. Without it, the compiler would generate a nodiscard warning. |
Executor
The executor is taken from the thread_local
get_executor function, unless a asio::executor_arg
is used
in any position followed by the executor argument.
cobalt::promise<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
The memory resource is taken from the thread_local
get_default_resource function,
unless a std::allocator_arg
is used in any position followed by a polymorphic_allocator
argument.
cobalt::promise<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Return>
struct [[nodiscard]] promise
{
promise(promise &&lhs) noexcept;
promise& operator=(promise && lhs) noexcept;
// enable `co_await`. (1)
auto operator co_await ();
// Ignore the return value, i.e. detach it. (2)
void operator +() &&;
// Cancel the promise.
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if the result is ready
bool ready() const;
// Check if the promise can be awaited.
explicit operator bool () const; (3)
// Detach or attach
bool attached() const;
void detach();
void attach();
// Get the return value. If !ready() this function has undefined behaviour.
Return get();
};
1 | Supports Interrupt Wait |
2 | This allows to create promise running in parallel with a simple +my_task() expression. |
3 | This allows code like while (p) co_await p; |
Promise
The coroutine promise (promise::promise_type
) has the following properties.
cobalt/generator.hpp
A generator is an eager coroutine that can co_await
and co_yield
values to the caller.
cobalt::generator<int> example()
{
printf("In coro 1\n");
co_yield 2;
printf("In coro 3\n");
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n");
printf("In main %d\n", co_await f);
printf("In main %d\n", co_await f);
return 0;
}
Which will generate the following output
In main 0 In coro 1 In main 1 In main 2 In coro 3 In main 4
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram participant main; Note left of main: "In main 0" main->>+example: example() Note right of example: "In coro 1" example-->>main: co_yield 2 Note left of main: "In main 2" main-->>+example: co_await f Note right of example: "In coro 3" example->>main: co_return 3 Note left of main: "In main 4"
Values can be pushed into the generator, when Push
(the second template parameter) is set to non-void:
cobalt::generator<int, int> example()
{
printf("In coro 1\n");
int i = co_yield 2;
printf("In coro %d\n", i);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main %d\n", co_await f(3)); (1)
co_return 0;
}
1 | The pushed value gets passed through operator() to the result of co_yield . |
Which will generate the following output
In main 0 In coro 1 In main 2 In coro 3
Lazy
A generator can be turned lazy by awaiting initial.
This co_await
expression will produce the Push
value.
This means the generator will wait until it’s awaited for the first time,
and then process the newly pushed value and resume at the next co_yield.
cobalt::generator<int, int> example()
{
int v = co_await cobalt::this_coro::initial;
printf("In coro %d\n", v);
co_yield 2;
printf("In coro %d\n", v);
co_return 4;
}
cobalt::main co_main(int argc, char * argv[])
{
printf("In main 0\n");
auto f = example(); // call and let it run until the first co_yield
printf("In main 1\n"); // < this is now before the co_await initial
printf("In main %d\n", co_await f(1));
printf("In main %d\n", co_await f(3));
return 0;
}
Which will generate the following output
In main 0 In main 1 In coro 1 In main 2 In coro 3 In main 4
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram participant main; Note left of main: "In main 0" main->>+example: example() Note right of example: "In coro 1" example-->>main: co_yield 2 Note left of main: "In main 2" main-->>+example: co_await f Note right of example: "In coro 3" example->>main: co_return 3 Note left of main: "In main 4"
Executor
The executor is taken from the thread_local
get_executor function, unless a asio::executor_arg
is used
in any position followed by the executor argument.
cobalt::generator<int> my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
The memory resource is taken from the thread_local
get_default_resource function,
unless a std::allocator_arg
is used in any position followed by a polymorphic_allocator
argument.
cobalt::generator<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Yield, typename Push = void>
struct [[nodiscard]] generator
{
// Movable
generator(generator &&lhs) noexcept = default;
generator& operator=(generator &&) noexcept = default;
// True until it co_returns & is co_awaited after (1)
explicit operator bool() const;
// Cancel the generator. (3)
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// Check if a value is available
bool ready() const;
// Get the returned value. If !ready() this function has undefined behaviour.
Yield get();
// Cancel & detach the generator.
~generator();
// an awaitable that results in value of Yield
.
using generator_awaitable = unspecified;
// Present when Push
!= void
generator_awaitable operator()( Push && push);
generator_awaitable operator()(const Push & push);
// Present when Push
== void
, i.e. can co_await
the generator directly.
generator_awaitable operator co_await (); (2)
};
1 | This allows code like while (gen) co_await gen: |
2 | Supports Interrupt Wait |
3 | A cancelled generator maybe be resumable |
Promise
The generator promise has the following properties.
cobalt/task.hpp
A task is a lazy coroutine that can co_await
and co_return
values. That is, it cannot use co_yield
.
cobalt::task<void> delay(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
}
cobalt::main co_main(int argc, char *argv[])
{
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
Unlike a promise, a task can be awaited or spawned on another executor than it was created on.
Executor
Since a task
it lazy, it does not need to have an executor on construction.
It rather attempts to take it from the caller or awaiter if present.
Otherwise, it’ll default to the thread_local executor.
Memory Resource
The memory resource is NOT taken from the thread_local
get_default_resource function,
but pmr::get_default_resource(),
unless a `std::allocator_arg
is used in any position followed by a polymorphic_allocator
argument.
cobalt::task<int> my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Outline
template<typename Return>
struct [[nodiscard]] task
{
task(task &&lhs) noexcept = default;
task& operator=(task &&) noexcept = default;
// enable `co_await`
auto operator co_await ();
};
Tasks can be used synchronously from a sync function by calling run(my_task()) .
|
Promise
The task promise has the following properties.
use_task
The use_task
completion token can be used to create a task from an cobalt_
function.
This is less efficient than use_op as it needs to allocate a coroutine frame,
but has a simpler return type and supports Interrupt Wait.
cobalt/detached.hpp
A detached is an eager coroutine that can co_await
but not co_return
values.
That is, it cannot be resumed and is usually not awaited.
cobalt::detached delayed_print(std::chrono::milliseconds ms)
{
asio::steady_timer tim{co_await cobalt::this_coro::executor, ms};
co_await tim.async_wait(cobalt::use_op);
printf("Hello world\n");
}
cobalt::main co_main(int argc, char *argv[])
{
delayed_print();
co_return 0;
}
Detached is used to run coroutines in the background easily.
cobalt::detached my_task();
cobalt::main co_main(int argc, char *argv[])
{
my_task(); (1)
co_await delay(std::chrono::milliseconds(50));
co_return 0;
}
1 | Spawn off the detached coro. |
A detached can assign itself a new cancellation source like this:
cobalt::detached my_task(asio::cancellation_slot sl)
{
co_await this_coro::reset_cancellation_source(sl);
// do somework
}
cobalt::main co_main(int argc, char *argv[])
{
asio::cancellation_signal sig;
my_task(sig.slot()); (1)
co_await delay(std::chrono::milliseconds(50));
sig.emit(asio::cancellation_type::all);
co_return 0;
}
Executor
The executor is taken from the thread_local
get_executor function, unless a asio::executor_arg
is used
in any position followed by the executor argument.
cobalt::detached my_gen(asio::executor_arg_t, asio::io_context::executor_type exec_to_use);
Memory Resource
The memory resource is taken from the thread_local
get_default_resource function,
unless a std::allocator_arg
is used in any position followed by a polymorphic_allocator
argument.
cobalt::detached my_gen(std::allocator_arg_t, pmr::polymorphic_allocator<void> alloc);
Promise
The thread detached has the following properties.
cobalt/op.hpp
An operation in cobalt
is an awaitable wrapping an asio
operation.
use_op
The use_op
token is the direct to create an op,
i.e. using cobalt::use_op
as the completion token will create the required awaitable.
auto tim = cobalt::use_op.as_default_on(asio::steady_timer{co_await cobalt::this_coro::executor});
co_await tim.async_wait();
Depending on the completion signature the co_await
expression may throw.
Signature | Return type | Exception |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
any exception |
|
|
any exception |
use_op will never complete immediately, i.e. await_ready will always return false, but always suspend the coroutine.
|
Hand coded Operations
Operations are a more advanced implementation of the cobalt/op.hpp feature.
This library makes it easy to create asynchronous operations with an early completion condition, i.e. a condition that avoids suspension of coroutines altogether.
We can for example create a wait_op
that does nothing if the timer is already expired.
struct wait_op : cobalt::op<system::error_code> (1)
{
asio::steady_timer & tim;
wait_op(asio::steady_timer & tim) : tim(tim) {}
bool ready(cobalt::handler<system::error_code> ) (2)
{
if (tim.expiry() < std::chrono::steady_clock::now())
h(system::error_code{});
}
void initiate(cobalt::completion_handler<system::error_code> complete) (3)
{
tim.async_wait(std::move(complete));
}
};
1 | Inherit op with the matching signature await_transform picks it up |
2 | Check if the operation is ready - called from await_ready |
3 | Initiate the operation if its not ready. |
cobalt/concepts.hpp
Awaitable
An awaitable is an expression that can be used with co_await
.
template<typename Awaitable, typename Promise = void>
concept awaitable_type = requires (Awaitable aw, std::coroutine_handle<Promise> h)
{
{aw.await_ready()} -> std::convertible_to<bool>;
{aw.await_suspend(h)};
{aw.await_resume()};
};
template<typename Awaitable, typename Promise = void>
concept awaitable =
awaitable_type<Awaitable, Promise>
|| requires (Awaitable && aw) { {std::forward<Awaitable>(aw).operator co_await()} -> awaitable_type<Promise>;}
|| requires (Awaitable && aw) { {operator co_await(std::forward<Awaitable>(aw))} -> awaitable_type<Promise>;};
awaitables in this library require that the coroutine promise
return their executor by const reference if they provide one. Otherwise it’ll use this_thread::get_executor() .
|
Enable awaitables
Inheriting enable_awaitables
will enable a coroutine to co_await anything through await_transform
that would be co_await
-able in the absence of any await_transform
.
cobalt/this_coro.hpp
The this_coro
namespace provides utilities to access the internal state of a coroutine promise.
Pseudo-awaitables:
// Awaitable type that returns the executor of the current coroutine.
struct executor_t {}
constexpr executor_t executor;
// Awaitable type that returns the cancellation state of the current coroutine.
struct cancellation_state_t {};
constexpr cancellation_state_t cancellation_state;
// Reset the cancellation state with custom or default filters.
constexpr unspecified reset_cancellation_state();
template<typename Filter>
constexpr unspecified reset_cancellation_state(
Filter && filter);
template<typename InFilter, typename OutFilter>
constexpr unspecified reset_cancellation_state(
InFilter && in_filter,
OutFilter && out_filter);
// get & set the throw_if_cancelled setting.
unspecified throw_if_cancelled();
unspecified throw_if_cancelled(bool value);
// Set the cancellation source in a detached.
unspecified reset_cancellation_source();
unspecified reset_cancellation_source(asio::cancellation_slot slot);
// get the allocator the promise
struct allocator_t {};
constexpr allocator_t allocator;
// get the current cancellation state-type
struct cancelled_t {};
constexpr cancelled_t cancelled;
// set the over-eager mode of a generator
struct initial_t {};
constexpr initial_t initial;
Await Allocator
The allocator of a coroutine supporting enable_await_allocator
can be obtained the following way:
co_await cobalt::this_coro::allocator;
In order to enable this for your own coroutine you can inherit enable_await_allocator
with the CRTP pattern:
struct my_promise : cobalt::enable_await_allocator<my_promise>
{
using allocator_type = __your_allocator_type__;
allocator_type get_allocator();
};
If available the allocator gets used by use_op |
Await Executor
The allocator of a coroutine supporting enable_await_executor
can be obtained the following way:
co_await cobalt::this_coro::executor;
In order to enable this for your own coroutine you can inherit enable_await_executor
with the CRTP pattern:
struct my_promise : cobalt::enable_await_executor<my_promise>
{
using executor_type = __your_executor_type__;
executor_type get_executor();
};
If available the executor gets used by use_op |
Memory resource base
The promise_memory_resource_base
base of a promise will provide a get_allocator
in the promise taken from
either the default resource or one passed following a std::allocator_arg
argument.
Likewise, it will add operator new
overloads so the coroutine uses the same memory resource for its frame allocation.
Throw if cancelled
The promise_throw_if_cancelled_base
provides the basic options to allow operation to enable a coroutines
to turn throw an exception when another actual awaitable is awaited.
co_await cobalt::this_coro::throw_if_cancelled;
Cancellation state
The promise_cancellation_base
provides the basic options to allow operation to enable a coroutines
to have a cancellation_state that is resettable by
reset_cancellation_state
co_await cobalt::this_coro::reset_cancellation_state();
For convenience there is also a short-cut to check the current cancellation status:
asio::cancellation_type ct = (co_await cobalt::this_coro::cancellation_state).cancelled();
asio::cancellation_type ct = co_await cobalt::this_coro::cancelled; // same as above
cobalt/this_thread.hpp
Since everything is single threaded this library provides an executor & default memory-resource for every thread.
namespace boost::cobalt::this_thread
{
pmr::memory_resource* get_default_resource() noexcept; (1)
pmr::memory_resource* set_default_resource(pmr::memory_resource* r) noexcept; (2)
pmr::polymorphic_allocator<void> get_allocator(); (3)
typename asio::io_context::executor_type & get_executor(); (4)
void set_executor(asio::io_context::executor_type exec) noexcept; (5)
}
1 | Get the default resource - will be pmr::get_default_resource unless set |
2 | Set the default resource - returns the previously set one |
3 | Get an allocator wrapping (1) |
4 | Get the executor of the thread - throws if not set |
5 | Set the executor of the current thread. |
The coroutines will use these as defaults, but keep a copy just in case.
The only exception is the initialization of an cobalt-operation, which will use the this_thread::executor to rethrow from. |
cobalt/channel.hpp
Channels can be used to exchange data between different coroutines on a single thread.
Outline
template<typename T>
struct channel
{
// create a channel with a buffer limit, executor & resource.
explicit
channel(std::size_t limit = 0u,
executor executor = this_thread::get_executor(),
pmr::memory_resource * resource = this_thread::get_default_resource());
// not movable.
channel(channel && rhs) noexcept = delete;
channel & operator=(channel && lhs) noexcept = delete;
using executor_type = executor;
const executor_type & get_executor();
// Closes the channel
~channel();
bool is_open() const;
// close the operation, will cancel all pending ops, too
void close();
// an awaitable that yields T
using read_op = unspecified;
// an awaitable that yields void
using write_op = unspecified;
// read a value to a channel
read_op read();
// write a value to the channel
write_op write(const T && value);
write_op write(const T & value);
write_op write( T && value);
write_op write( T & value);
// write a value to the channel if T is void
};
Description
Channels are a tool for two coroutines to communicate and synchronize.
const std::size_t buffer_size = 2;
channel<int> ch{exec, buffer_size};
// in coroutine (1)
co_await ch.write(42);
// in coroutine (2)
auto val = co_await ch.read();
1 | Send a value to the channel - will block until it can be sent |
2 | Read a value from the channel - will block until a value is awaitable. |
Both operations maybe be blocking depending on the channel buffer size.
If the buffer size is zero, a read
& write
will need to occur at the same time,
i.e. act as a rendezvous.
If the buffer is not full, the write operation will not suspend the coroutine; likewise if the buffer is not empty, the read operation will not suspend.
If two operations complete at once (as is always the case with an empty buffer), the second operation gets posted to the executor for later completion.
A channel type can be void , in which case write takes no parameter.
|
The channel operations can be cancelled without losing data. This makes them usable with race.
generator<variant2::variant<int, double>> merge(
channel<int> & c1,
channel<double> & c2)
{
while (c1 && c2)
co_yield co_await race(c1, c2);
}
Example
cobalt::promise<void> producer(cobalt::channel<int> & chan)
{
for (int i = 0; i < 4; i++)
co_await chan.write(i);
chan.close();
}
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
while (c.is_open())
std::cout << co_await c.read() << std::endl;
co_await p;
co_return 0;
}
Additionally, a channel_reader
is provided to make reading channels more convenient & usable with
BOOST_COBALT_FOR.
cobalt::main co_main(int argc, char * argv[])
{
cobalt::channel<int> c;
auto p = producer(c);
BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
std::cout << value << std::endl;
co_await p;
co_return 0;
}
cobalt/with.hpp
The with
facility provides a way to perform asynchronous tear-down of coroutines.
That is it like an asynchronous destructor call.
struct my_resource
{
cobalt::promise<void> await_exit(std::exception_ptr e);
};
cobalt::promise<void> work(my_resource & res);
cobalt::promise<void> outer()
{
co_await cobalt::with(my_resource(), &work);
}
The teardown can either be done by providing an await_exit
member function or a tag_invoke
function
that returns an awaitable or by providing the teardown as the third argument to with
.
using ws_stream = beast::websocket::stream<asio::ip::tcp::socket>>;
cobalt::promise<ws_stream> connect(urls::url); (1)
cobalt::promise<void> disconnect(ws_stream &ws); (2)
auto teardown(const boost::cobalt::with_exit_tag & wet , ws_stream & ws, std::exception_ptr e)
{
return disconnect(ws);
}
cobalt::promise<void> run_session(ws_stream & ws);
cobalt::main co_main(int argc, char * argv[])
{
co_await cobalt::with(co_await connect(argv[1]), &run_session, &teardown);
co_return 0;
}
1 | Implement websocket connect & websocket initiation |
2 | Implement an orderly shutdown. |
The std::exception_ptr is null if the scope is exited without exception.
NOTE: It’s legal for the exit functions to take the exception_ptr by reference and modify it.
|
cobalt/race.hpp
The race
function can be used to co_await
one awaitable out of a set of them.
It can be called as a variadic function with multiple awaitable or as on a range of awaitables.
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_wait()
{
co_await cobalt::race(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::race(aws); (2)
}
1 | Wait for a variadic set of awaitables |
2 | wait for a vector of awaitables |
The first parameter so race
can be a uniform random bit generator.
extern promise<void> pv1, pv2;
std::vector<promise<void>> pvv;
std::default_random_engine rdm{1};
// if everything returns void race returns the index
std::size_t r1 = co_await race(pv1, pv2);
std::size_t r2 = co_await race(rdm, pv1, pv2);
std::size_t r3 = co_await race(pvv);
std::size_t r4 = co_await race(rdm, pvv);
// variant if not everything is void. void become monostate
extern promise<int> pi1, pi2;
variant2::variant<monostate, int, int> r5 = co_await race(pv1, pi1, pi2);
variant2::variant<monostate, int, int> r6 = co_await race(rdm, pv1, pi1, pi2);
// a range returns a pair of the index and the result if non-void
std::vector<promise<int>> piv;
std::pair<std::size_t, int> r7 = co_await race(piv);
std::pair<std::size_t, int> r8 = co_await race(rdm, piv);
Interrupt Wait
When arguments are passed as rvalue reference, the race will attempt to use .interrupt_await
on the awaitable to signal the awaitable to complete now and that the result will be ignored.
If supported, the Awaitable must resume the awaiting coroutine before interrupt_await
returns.
If the race
doesn’t detect the function, it will send a cancellation.
This means that you can reuse race like this:
cobalt::promise<void> do_wait()
{
auto t1 = task1();
auto t2 = task2();
co_await cobalt::race(t1, t2); (1)
co_await cobalt::race(t1, t2); (2)
}
1 | Wait for the first task to complete |
2 | Wait for the other task to complete |
The race
will invoke the functions of the awaitable
as if used in a co_await
expression
or not evaluate them at all.
left_race
The left_race
functions are like race
but follow a strict left-to-right scan.
This can lead to starvation issues, which is why this is not the recommended default, but can
be useful for prioritization if proper care is taken.
Outline
// Concept for the random number generator.
template<typename G>
concept uniform_random_bit_generator =
requires ( G & g)
{
{typename std::decay_t<G>::result_type() } -> std::unsigned_integral; // is an unsigned integer type
// T Returns the smallest value that G's operator() may return. The value is strictly less than G::max(). The function must be constexpr.
{std::decay_t<G>::min()} -> std::same_as<typename std::decay_t<G>::result_type>;
// T Returns the largest value that G's operator() may return. The value is strictly greater than G::min(). The function must be constexpr.
{std::decay_t<G>::max()} -> std::same_as<typename std::decay_t<G>::result_type>;
{g()} -> std::same_as<typename std::decay_t<G>::result_type>;
} && (std::decay_t<G>::max() > std::decay_t<G>::min());
// Variadic race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, awaitable ... Promise>
awaitable race(URBG && g, Promise && ... p);
// Ranged race with a custom random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all,
uniform_random_bit_generator URBG, range<awaitable> PromiseRange>
awaitable race(URBG && g, PromiseRange && p);
// Variadic race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable race(Promise && ... p);
// Ranged race with the default random number generator
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable race(PromiseRange && p);
// Variadic left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable left_race(Promise && ... p);
// Ranged left race
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable left_race(PromiseRange && p);
Selecting an empty range will cause an exception to be thrown. |
cobalt/gather.hpp
The gather
function can be used to co_await
multiple awaitables
at once with cancellations being passed through.
The function will gather all completion and return them as system::result
,
i.e. capture conceptions as values. One awaitable throwing an exception will not cancel the others.
It can be called as a variadic function with multiple Awaitable or as on a range of awaitables.
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_gather()
{
co_await cobalt::gather(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::gather(aws); (2)
}
1 | Wait for a variadic set of awaitables |
2 | Wait for a vector of awaitables |
The gather
will invoke the functions of the awaitable
as if used in a co_await
expression.
extern promise<void> pv1, pv2;
std::tuple<system::result<int>, system::result<int>> r1 = co_await gather(pv1, pv2);
std::vector<promise<void>> pvv;
pmr::vector<system::result<void>> r2 = co_await gather(pvv);
extern promise<int> pi1, pi2;
std::tuple<system::result<monostate>,
system::result<monostate>,
system::result<int>,
system::result<int>> r3 = co_await gather(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<system::result<int>> r4 = co_await gather(piv);
Outline
// Variadic gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable gather(Promise && ... p);
// Ranged gather
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable gather(PromiseRange && p);
cobalt/join.hpp
The join
function can be used to co_await
multiple awaitable at once with properly connected cancellations.
The function will gather all completion and return them as values, unless an exception is thrown. If an exception is thrown, all outstanding ops are cancelled (or interrupted if possible) and the first exception gets rethrown.
void will be returned as variant2::monostate in the tuple, unless all awaitables yield void.
|
It can be called as a variadic function with multiple Awaitable or as on a range of awaitables.
cobalt::promise<void> task1();
cobalt::promise<void> task2();
cobalt::promise<void> do_join()
{
co_await cobalt::join(task1(), task2()); (1)
std::vector<cobalt::promise<void>> aws {task1(), task2()};
co_await cobalt::join(aws); (2)
}
1 | Wait for a variadic set of awaitables |
2 | Wait for a vector of awaitables |
The join
will invoke the functions of the awaitable
as if used in a co_await
expression.
extern promise<void> pv1, pv2;
/* void */ co_await join(pv1, pv2);
std::vector<promise<void>> pvv;
/* void */ co_await join(pvv);
extern promise<int> pi1, pi2;
std::tuple<monostate, monostate, int, int> r1 = co_await join(pv1, pv2, pi1, pi2);
std::vector<promise<int>> piv;
pmr::vector<int> r2 = co_await join(piv);
Outline
// Variadic join
template<asio::cancellation_type Ct = asio::cancellation_type::all, awaitable... Promise>
awaitable join(Promise && ... p);
// Ranged join
template<asio::cancellation_type Ct = asio::cancellation_type::all, range<awaitable>>
awaitable join(PromiseRange && p);
Selecting an on empty range will cause an exception. |
cobalt/wait_group.hpp
The wait_group
function can be used to manage
multiple coroutines of type promise<void>
.
It works out of the box with cobalt/with.hpp, by having the matching await_exit
member.
Essentially, a wait_group
is a dynamic list of
promises that has a race
function (wait_one
),
a gather
function (wait_all
) and will clean up on scope exit.
struct wait_group
{
// create a wait_group
explicit
wait_group(asio::cancellation_type normal_cancel = asio::cancellation_type::none,
asio::cancellation_type exception_cancel = asio::cancellation_type::all);
// insert a task into the group
void push_back(promise<void> p);
// the number of tasks in the group
std::size_t size() const;
// remove completed tasks without waiting (i.e. zombie tasks)
std::size_t reap();
// cancel all tasks
void cancel(asio::cancellation_type ct = asio::cancellation_type::all);
// wait for one task to complete.
wait_one_op wait_one();
// wait for all tasks to complete
wait_op wait();
// wait for all tasks to complete
wait_op operator co_await ();
// when used with with
, this will receive the exception
// and wait for the completion
// if ep
is set, this will use the exception_cancel
level,
// otherwise the normal_cancel
to cancel all promises.
wait_op await_exit(std::exception_ptr ep);
};
cobalt/spawn.hpp
The spawn
functions allow to run task on an asio executor
/execution_context
and consume the result with a completion token.
auto spawn(Context & context, task<T> && t, CompletionToken&& token);
auto spawn(Executor executor, task<T> && t, CompletionToken&& token);
Spawn will dispatch its initiation and post the completion.
That makes it safe to use task to run the task on another executor
and consume the result on the current one with use_op.
That is, spawn
can be used to cross threads.
Example
cobalt::task<int> work();
int main(int argc, char *argv[])
{
asio::io_context ctx{BOOST_ASIO_CONCURRENCY_HINT_1};
auto f = spawn(ctx, work(), asio::use_future);
ctx.run();
return f.get();
}
The caller needs to make sure that the executor is not running on multiple threads
concurrently, e,g, by using a single-threaded asio::io_context or a strand .
|
cobalt/run.hpp
The run
function is similar to spawn but running synchronously.
It will internally setup an execution context and the memory resources.
This can be useful when integrating a piece of cobalt code into a synchronous application.
Outline
// Run the task and return it's value or rethrow any exception.
T run(task<T> t);
Example
cobalt::task<int> work();
int main(int argc, char *argv[])
{
return run(work());
}
cobalt/thread.hpp
The thread type is another way to create an environment that is similar to main
, but doesn’t use a signal_set
.
cobalt::thread my_thread()
{
auto exec = co_await cobalt::this_coro::executor; (1)
asio::steady_timer tim{exec, std::chrono::milliseconds(50)}; (2)
co_await tim.async_wait(cobalt::use_op); (3)
co_return 0;
}
1 | get the executor thread running on |
2 | Use it with an asio object |
3 | co_await an cobalt operation |
To use a thread you can use it like a std::thread
:
int main(int argc, char * argv[])
{
auto thr = my_thread();
thr.join();
return 0;
}
A thread is also an awaitable
(including cancellation).
cobalt::main co_main(int argc, char * argv[])
{
auto thr = my_thread();
co_await thr;
co_return 0;
}
Destructing a detached thread will cause a hard stop (io_context::stop ) and join the thread.
|
Nothing in this library, except for awaiting a cobalt/thread.hpp and cobalt/spawn.hpp, is thread-safe.
If you need to transfer data across threads, you’ll need a thread-safe utility like asio::concurrent_channel .
You cannot share any cobalt primitives between threads,
with the sole exception of being able to spawn a task onto another thread’s executor.
|
Executor
It will also create an asio::io_context
to run on, which you can get through the this_coro::executor
.
It will be assigned to the cobalt::this_thread::get_executor()
.
Memory Resource
It also creates a memory resource that will be used as a default for internal memory allocations.
It will be assigned to the thread_local
to the cobalt::this_thread::get_default_resource()
.
Outline
struct thread
{
// Send a cancellation signal
void cancel(asio::cancellation_type type = asio::cancellation_type::all);
// Allow the thread to be awaited. NOOP if the thread is invalid.
auto operator co_await() &-> detail::thread_awaitable; (1)
auto operator co_await() && -> detail::thread_awaitable; (2)
// Stops the io_context & joins the executor
~thread();
/// Move constructible
thread(thread &&) noexcept = default;
using executor_type = executor;
using id = std::thread::id;
id get_id() const noexcept;
// Add the functions similar to `std::thread`
void join();
bool joinable() const;
void detach();
executor_type get_executor() const;
};
1 | Supports Interrupt Wait |
2 | Always forward cancel |
Promise
The thread promise has the following properties.
cobalt/result.hpp
Awaitables can be modified to return system::result
or
std::tuple
instead of using exceptions.
// value only
T res = co_await foo();
// as result
system::result<T, std::exception_ptr> res = co_await cobalt::as_result(foo());
// as tuple
std::tuple<std::exception_ptr, T> res = co_await cobalt::as_tuple(foo());
Awaitables can also provide custom ways to handle results and tuples,
by providing await_resume
overloads using cobalt::as_result_tag
and cobalt::as_tuple_tag
.:
your_result_type await_resume(cobalt::as_result_tag);
your_tuple_type await_resume(cobalt::as_tuple_tag);
This allows an awaitable to provide other error types than std::exception_ptr
,
for example system::error_code
. This is done by op and channel.
// example of an op with result system::error_code, std::size_t
system::result<std::size_t> await_resume(cobalt::as_result_tag);
std::tuple<system::error_code, std::size_t> await_resume(cobalt::as_tuple_tag);
Awaitables are still allowed to throw exceptions, e.g. for critical exceptions such as OOM. |
cobalt/async_for.hpp
For types like generators a BOOST_COBALT_FOR
macro is provided, to emulate an for co_await
loop.
cobalt::generator<int> gen();
cobalt::main co_main(int argc, char * argv[])
{
BOOST_COBALT_FOR(auto i, gen())
printf("Generated value %d\n", i);
co_return 0;
}
cobalt/error.hpp
In order to make errors easier to manage, cobalt provides an error_category
to be used with
boost::system::error_code
.
enum class error
{
moved_from,
detached,
completed_unexpected,
wait_not_ready,
already_awaited,
allocation_failed
};
system::error_category & cobalt_category();
system::error_code make_error_code(error e);
cobalt/config.hpp
The config adder allows to config some implementation details of boost.cobalt.
executor_type
The executor type defaults to boost::asio::any_io_executor
.
You can set it to boost::asio::any_io_executor
by defining BOOST_COBALT_CUSTOM_EXECUTOR
and adding a boost::cobalt::executor
type yourself.
Alternatively, BOOST_COBALT_USE_IO_CONTEXT
can be defined
to set the executor to boost::asio::io_context::executor_type
.
pmr
Boost.cobalt can be used with different pmr implementations, defaulting to std::pmr
.
The following macros can be used to configure it:
-
BOOST_COBALT_USE_STD_PMR
-
BOOST_COBALT_USE_BOOST_CONTAINER_PMR
-
BOOST_COBALT_USE_CUSTOM_PMR
If you define BOOST_COBALT_USE_CUSTOM_PMR
you will need to provide a boost::cobalt::pmr
namespace,
that is a drop-in replacement for std::pmr
.
Alternatively, the pmr
use can be disabled with
-
BOOST_COBALT_NO_PMR
.
In this case, cobalt will use a non-pmr monotonic resource for the synchronization functions (race, gather and join).
use_op
uses a small-buffer-optimized resource which’s size can be set by defining
BOOST_COBALT_SBO_BUFFER_SIZE
and defaults to 4096 bytes.
cobalt/leaf.hpp
Async provides integration with boost.leaf. It provides functions similar to leaf that take an awaitables instead of a function object and return an awaitable.
template<awaitable TryAwaitable, typename ... H >
auto try_catch(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_all(TryAwaitable && try_coro, H && ... h );
template<awaitable TryAwaitable, typename ... H >
auto try_handle_some(TryAwaitable && try_coro, H && ... h );
See the leaf documentation for details.
In-Depth
Custom Executors
One of the reasons cobalt defaults to asio::any_io_executor
is that it is a type-erased executor, i.e. you can provide your own event-loop without needing to recompile cobalt
.
However, during the development of the Executor TS, the executor concepts got a bit unintuitive, to put it mildly.
Ruben Perez wrote an excellent blog post, which I am shamelessly going to draw from.
Definition
An executor is a type that points to the actual event loop and is (cheaply) copyable,
which supports properties (see below) is equality comparable and has an execute
function.
execute
struct example_executor
{
template<typename Fn>
void execute(Fn && fn) const;
};
The above function executes fn
in accordance with its properties.
Properties
A property can be queried, preferred or required, e.g.:
struct example_executor
{
// get a property by querying it.
asio::execution::relationship_t &query(asio::execution::relationship_t) const
{
return asio::execution::relationship.fork;
}
// require an executor with a new property
never_blocking_executor require(const execution::blocking_t::never_t);
// prefer an executor with a new property. the executor may or may not support it.
never_blocking_executor prefer(const execution::blocking_t::never_t);
// not supported
example_executor prefer(const execution::blocking_t::always_t);
};
Properties of the asio::any_io_executor
In order to wrap an executor in an asio::any_io_executor
two properties are required:
-
`execution::context_t
-
execution::blocking_t::never_t
That means we need to either make them require-able (which makes no sense for context) or return the expected value
from query
.
The execution::context_t
query should return asio::execution_context&
like so:
struct example_executor
{
asio::execution_context &query(asio::execution::context_t) const;
};
The execution context is used to manage lifetimes of services that manage lifetimes io-objects, such as asio’s timers & sockets. That is to say, by providing this context, all of asio’s io works with it.
The execution_context must remain alive after the executor gets destroyed.
|
The following may be preferred:
-
execution::blocking_t::possibly_t
-
execution::outstanding_work_t::tracked_t
-
execution::outstanding_work_t::untracked_t
-
execution::relationship_t::fork_t
-
execution::relationship_t::continuation_
That means you might want to support them in your executor for optimizations.
The blocking
property
As we’ve seen before, this property controls whether the function passed to execute()
can be run immediately, as part of execute()
, or must be queued for later execution.
Possible values are:
-
asio::execution::blocking.never
: never run the function as part ofexecute()
. This is whatasio::post()
does. -
asio::execution::blocking.possibly
: the function may or may not be run as part ofexecute()
. This is the default (what you get when callingio_context::get_executor
). -
asio::execution::blocking.always
: the function is always run as part ofexecute()
. This is not supported byio_context::executor
.
The relationship
property
relationship
can take two values:
-
asio::execution::relationship.continuation
: indicates that the function passed toexecute()
is a continuation of the function callingexecute()
. -
asio::execution::relationship.fork
: the opposite of the above. This is the default (what you get when callingio_context::get_executor()
).
Setting this property to continuation
enables some optimizations
in how the function gets scheduled. It only has effect if the function
is queued (as opposed to run immediately). For io_context
, when set, the function
is scheduled to run in a faster, thread-local queue, rather than the context-global one.
The outstanding_work_t
property
outstanding_work
can take two values:
-
asio::execution::outstanding_work.tracked
: indicates that while the executor is alive, there’s still work to do. -
asio::execution::outstanding_work.untracked
: the opposite of the above. This is the default (what you get when callingio_context::get_executor()
).
Setting this property to tracked
means that the event loop will not return as long as the executor
is alive.
A minimal executor
With this, let’s look at the interface of a minimal executor.
struct minimal_executor
{
minimal_executor() noexcept;
asio::execution_context &query(asio::execution::context_t) const;
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
template<class F>
void execute(F && f) const;
bool operator==(minimal_executor const &other) const noexcept;
bool operator!=(minimal_executor const &other) const noexcept;
};
See example/python.cpp
for an implementation using python’s asyncio event-loop.
|
Adding a work guard.
Now, let’s add in a require
function for the outstanding_work
property, that uses multiple types.
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const {return *this; }
};
struct untracked_executor : minimal_executor
{
untracked_executor() noexcept;
constexpr tracked_executor require(asio::execution::outstanding_work:: tracked_t) const {return *this;}
constexpr untracked_executor require(asio::execution::outstanding_work::untracked_t) const;
};
Note that it is not necessary to return a different type from the require
function, it can also be done like this:
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor require(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor require(asio::execution::outstanding_work::untracked_t) const;
};
If we wanted to use prefer
it would look as shown below:
struct trackable_executor : minimal_executor
{
trackable_executor() noexcept;
constexpr trackable_executor prefer(asio::execution::outstanding_work:: tracked_t) const;
constexpr trackable_executor prefer(asio::execution::outstanding_work::untracked_t) const;
};
Summary
As you can see, the property system is not trivial, but quite powerful. Implementing a custom executor is a problem category of its own, which is why this documentation doesn’t do that. Rather, there is an example of how to wrap a python event loop in an executor.
Below are some reading recommendations.
Stackless
C++20 coroutines are stackless, meaning they don’t have their own stack.
A stack in C++ describes the callstack, i.e. all the function frames stacked. A function frame is the memory a function needs to operate, i.e. a slice of memory to store its variables and information such as the return address.
The size of a function frame is known at compile time, but not outside the compile unit containing its definition. |
int bar() {return 0;} // the deepest point of the stack
int foo() {return bar();}
int main()
{
return foo();
}
The call stack in the above example is:
main()
foo()
bar()
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram main->>+foo: call foo->>+bar: call bar->>-foo: return foo->>-main: return
Coroutines can be implemented a stackful, which means that it allocates a fixes chunk of memory and stacks function frames similar to a thread. C++20 coroutines are stackless, i.e. they only allocate their own frame and use the callers stack on resumption. Using our previous example:
fictional_eager_coro_type<int> example()
{
co_yield 0;
co_yield 1;
}
void nested_resume(fictional_eager_coro_type<int>& f)
{
f.resume();
}
int main()
{
auto f = example();
nested_resume(f);
f.reenter();
return 0;
}
This will yield a call stack similar to this:
main()
f$example()
nested_resume()
f$example()
f$example()
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram participant main participant nested_resume main->>+example: create & call example-->>main: co_yield main->>+nested_resume: call nested_resume-->>example: resume example-->>nested_resume: co_yield nested_resume->>-main: return main-->>example: resume example->>-main: co_return
The same applies if a coroutine gets moved accross threads.
Lazy & eager
Coroutines are lazy if they only start execution of its code after it gets resumed, while an eager one will execute right-away until its first suspension point (i.e. a co_await
, co_yield
or co_return
expression.)
lazy_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
lazy.resume();
printf("resumed twice\n");
return 0;
}
Which will produce output like this:
enter main
constructed coro
Entered coro
resumed once
Coro Done
resumed twice
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram participant main; Note left of main: "enter main" main-->>+lazy: co_example() Note left of main: "constructed coro" main->>lazy: resume() Note right of lazy: "Entered coro lazy-->>main: co_yield 0 Note left of main: "resumed once" main-->>+lazy: resume() Note right of lazy: "Coro done" lazy->>main: co_return Note left of main: "resumed twice"
Whereas an eager coro would look like this:
eager_coro co_example()
{
printf("Entered coro\n");
co_yield 0;
printf("Coro done\n");
}
int main()
{
printf("enter main\n");
auto lazy = co_example();
printf("constructed coro\n");
lazy.resume();
printf("resumed once\n");
return 0;
}
Which will produce output like this:
enter main
Entered coro
constructed coro
resume once
Coro Done
Failed to generate image: mmdc failed: Error: Failed to launch the browser process! /root/.cache/puppeteer/chrome/linux-1108766/chrome-linux/chrome: error while loading shared libraries: libatk-1.0.so.0: cannot open shared object file: No such file or directory TROUBLESHOOTING: https://pptr.dev/troubleshooting at Interface.onClose (file:///root/.nvm/versions/node/v18.18.1/lib/node_modules/@mermaid-js/mermaid-cli/node_modules/@puppeteer/browsers/lib/esm/launch.js:253:24) at Interface.emit (node:events:529:35) at Interface.close (node:internal/readline/interface:534:10) at Socket.onend (node:internal/readline/interface:260:10) at Socket.emit (node:events:529:35) at endReadableNT (node:internal/streams/readable:1368:12) at process.processTicksAndRejections (node:internal/process/task_queues:82:21) sequenceDiagram participant main; Note left of main: "enter main" main->>lazy: co_example() Note right of lazy: "Entered coro lazy-->>main: co_yield 0 Note left of main: "constructed coro" main-->>+lazy: resume() Note right of lazy: "Coro done" lazy->>main: co_return Note left of main: "resumed once"
Benchmarks
Run on 11th Gen Intel® Core™ i7-1185G7 @ 3.00GHz
Posting to an executor
The benchmark is running the following code, with cobalt’s task, asio::awaitable
and `asio’s
stackful coroutine (boost.context) based.
cobalt::task<void> atest()
{
for (std::size_t i = 0u; i < n; i++)
co_await asio::post(cobalt::use_op);
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
2472 |
2098 |
awaitable |
2432 |
2253 |
stackful |
3655 |
3725 |
Running noop coroutine in parallel
This benchmark uses an asio::experimental::channel
that has a size of zero,
to read & write in parallel to it. It uses gather with cobalt
and an awaitable_operator
in the asio::awaitable
.
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
for (std::size_t i = 0u; i < n; i++)
co_await cobalt::gather(
chan.async_send(system::error_code{}, cobalt::use_task),
chan.async_receive(cobalt::use_task));
}
asio::awaitable<void> awtest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 0u};
using boost::asio::experimental::awaitable_operators::operator&&;
for (std::size_t i = 0u; i < n; i++)
co_await (
chan.async_send(system::error_code{}, asio::use_awaitable)
&&
chan.async_receive(asio::use_awaitable));
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1563 |
1468 |
awaitable |
2800 |
2805 |
Immediate
This benchmark utilizes the immediate completion, by using a channel with a size of 1, so that every operation is immediate.
cobalt::task<void> atest()
{
asio::experimental::channel<void(system::error_code)> chan{co_await cobalt::this_coro::executor, 1u};
for (std::size_t i = 0u; i < n; i++)
{
co_await chan.async_send(system::error_code{}, cobalt::use_op);
co_await chan.async_receive(cobalt::use_op);
}
}
gcc 12 | clang 16 | |
---|---|---|
cobalt |
1810 |
1864 |
awaitable |
3109 |
4110 |
stackful |
3922 |
4705 |
Channels
In this benchmark asio::experimental::channel and cobalt::channel get compared.
This is similar to the parallel test, but uses the cobalt::channel
instead.
gcc | clang | |
---|---|---|
cobalt |
500 |
350 |
awaitable |
790 |
770 |
stackful |
867 |
907 |
Operation Allocations
This benchmark compares the different possible solutions for the associated allocator of asynchronous operations
gcc | clang | |
---|---|---|
std::allocator |
1136 |
1139 |
cobalt::monotonic |
1149 |
1270 |
pmr::monotonic |
1164 |
1173 |
cobalt::sbo |
1021 |
1060 The latter method is used internally by cobalt. |
Requirements
Libraries
Boost.cobalt requires a C++20 compilers and directly depends on the following boost libraries:
-
boost.asio
-
boost.system
-
boost.circular_buffer
-
boost.intrusive
-
boost.smart_ptr
-
boost.container (for clang < 16)
Compiler
This library is supported since Clang 14, Gcc 10 & MSVC 19.30 (Visual Studio 2022).
Gcc versions 12.1 and 12.2 appear to have a bug for coroutines with out stack variables as can be seen [here](https://godbolt.org/z/6adGcqP1z) and should be avoided for coroutines. |
Clang only added std::pmr
support in 16, so older clang versions use boost::contianer::pmr
as a drop-in replacement.
Some if not all MSVC versions have a broken coroutine implementation, that this library needs to workaround. This may cause non-deterministic behaviour and overhead. |
A coroutine continuation may be done in the awaitable returned from a final_suspend
, like this:
// in promise
auto final_suspend() noexcept
{
struct final_awaitable
{
std::coroutine_handle<void> continuation{std::noop_coroutine()}; (1)
bool await_ready() const noexcept;
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> h) noexcept
{
auto cc = continuation;
h.destroy(); (2)
return cc;
}
void await_resume() noexcept {}
};
return final_awaitable{my_continuation};
};
1 | The continuation |
2 | Self-destroying the coroutine before continuation |
The final_suspend does not properly suspend the coroutine on MSVC, so that the h.destroy()
will cause
double destruction of elements on the coroutine frame.
Therefor, msvc will need to post the destruction, to do it out of line.
This will cause overhead and make the actual freeing of memory not deterministic.
Acknowledgements
This library would not have been possible without the CppAlliance and its founder Vinnie Falco. Vinnie trusted me enough to let me work on this project, while himself having very different views on how such a library should be designed.
Thanks also go to Ruben Perez & Richard Hodges for listening to my design problems and giving me advice & use-cases. Furthermore, this library would not have been possible without the great boost.asio by Chris Kohlhoff.