Mircea Baja - 7 June 2025 (Updated 30 Oct 2025) # Structured concurrency
detached
wait_any
--- # Motivation - academical statements Concurrent APIs commonly provide API calls that take some form of callback. Such a call initiates some concurrency and returns before the callback/activity completes. This "detached" behaviour requires the user of the API to maintain the lifetime of all the objects called directly or indirectly from the callback, until the activity "joins" when it completes or is cancelled. This is not easy in practice, not automatic and error prone. The idea of structured concurrency is that "detached" operations are similar to "goto" statements. Similar to "if/else", "for loops" and function calls, systematic usage of structured concurrency makes code easier to understand and safer. In a C++ coroutine environment, the structured concurrency is build on top of primitives like "when_all", "when_any", "nursery", "wait_for", "stop_when", and even simple lazy task coroutines (if implemented correctly). They automatically join in the current scope, making object lifetime questions easier to reason about. --- # Structured programming
sequential
goto
if
loop
function call
[Nathaniel J. Smith: Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) - `goto` corresponds to low level assembly `jump` instructions - ends in spagetti code: graphs that do not obviously reduce to sequential (i.e. linear) - replaced with constructs that reduce to sequential - in modern languages even `goto` is tamed (only jumps within the same function) - but kept for the three cases where it's needed, new case once every 20 years or so - structured programming is what enables further high level language constructs like RAII --- # Low level concurrency APIs ```cpp int WSAAPI WSARecv( [in] SOCKET s, [in, out] LPWSABUF lpBuffers, [in] DWORD dwBufferCount, [out] LPDWORD lpNumberOfBytesRecvd, [in, out] LPDWORD lpFlags, [in] LPWSAOVERLAPPED lpOverlapped, [in] LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine ); ``` - Can be called with a pointer to the completion routine (a callback) - Until that routine is called (or cancelled): the socket, buffer, overlapped structure (often a derived type holding more data) - need to be kept alive - they might be accessed during or after the call to `WSARecv` - Calling the function is just the beginning, how do we tie things at the end? --- # Unstructured concurrency
goto
detached
[Nathaniel J. Smith: Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) Low level C APIs expose a detached behaviour, which on one side might be necessary*, like `jump` statements in assembly, but we want that wrapped in structured concurrency primitives that are easier to use correctly. Detach/fire and forget has issues: lifetime, sharing data e.g. by value, error propagation. We don't want to continue to stay low level and error prone. (*) By the end of this presentation it might be worth going back on the necessity of this kind of interface --- # Structured concurrency - primitives We want to identify what are the structured concurrency primitives (in addition to the nursery from Nathaniel's article). I discuss: - lazy coroutines/parent-child-parent continuation - chains - the run loops - chain leaves - `async_sleep_for`, `async_yield`, `async_noop`, `async_suspend_forever` - brancing further chains - `async_wait_any` - `async_wait_all` - `async_wait_for` - `async_stop_when` - `nursery::async_run` --- # Example good: wait_all
wait_all
- we started three tasks - the second completed first, nevermind - continues when all three are completed - returns the value of each (e.g. as a tuple) --- # wait_all cancel
wait_all
- but if the second task fails, then the remaining are told to cancel - code after `wait_all` only gets executed when all the tasks complete either successfully or they completed the cancellation - detached behaviour looks like `goto` - `wait_all` looks like function calls, `if`/`else` blocks etc. --- # Example good ```cpp co
> async_search_multiple( const std::string& encoded_query) { proxy p{ "proxy.example.com", 8080 }; auto result = co_await async_wait_all( async_http_get(p, "https://google.com/search?q=" + encoded_query), async_http_get(p, "https://bing.com/search?q=" + encoded_query), async_http_get(p, "https://duckduckgo.com/?q=" + encoded_query) ); return result; } ``` - `co_await async_wait_all(...)` returns only when the work for all `async_http_get` operations completed. - the object `proxy p` will naturally be stay alive due to scope rules - RAII works - more subtly: the string temporaries created as arguments for each `async_http_get` will still be alive until the `;` after `async_wait_all` is reached and that's the case only when both `async_http_get` operations completed --- # Example bad ```cpp co
async_should_not_compile() { auto x = async_foo("some string"); co_await std::move(x); } ``` - some libraries allow the above to compile, but this creates issues similar to the [`std::generator` "spot the bug" cases](/presentations/2025-05-20-generator.html) - currently the C++ core guidelines give up on this: [CP.53: Parameters to coroutines should not be passed by reference](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp53-parameters-to-coroutines-should-not-be-passed-by-reference) and [CP.51: Do not use capturing lambdas that are coroutines](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp51-do-not-use-capturing-lambdas-that-are-coroutines) - I disagree with the current form of the CP.51 and CP.53 (see next slides), I think libraries should fail the code above at compilation stage (using lazy coroutines where only unmoveable temporaries can be co_await-ed). They are legacy thinking from when `std::future` was thought of as a good idea - my observation is that programmers when taught about such issue nod, do not internalize the solution and then end up learning the hard way (i.e. via bugs) or end up drawing the wrong conclusion (i.e. don't use references for coroutine parameters) instead of using/creating libraries that stop bad patterns --- # Example bad - CP.51 ```cpp int value = get_value(); std::shared_ptr
sharedFoo = get_foo(); { const auto lambda = [value, sharedFoo]() -> std::future
{ co_await something(); // "sharedFoo" and "value" have already been destroyed // the "shared" pointer didn't accomplish anything }; lambda(); } // the lambda closure object has now gone out of scope ``` - [CP.51: Do not use capturing lambdas that are coroutines](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp51-do-not-use-capturing-lambdas-that-are-coroutines), similar incorrect recommendations elsewhere: [A capturing lambda can be a coroutine, but you have to save your captures while you still can](https://devblogs.microsoft.com/oldnewthing/20211103-00/?p=105870) - the example uses unstructured concurrency: the `lambda()` is assumed to be eager and run in detached mode (calling `lambda()` starts concurrent work, `co_await` is not required, and work continues after `;` following the `lambda()` call) - the coding guideline, instead of pointing out to the unstructured concurrency as a bad thing, makes recommendations about the usage of `shared_ptr` (additional allocations), avoiding using lambda captures, gives up on RAII --- # Example bad - CP.53 ```cpp // claimed bad std::future
do_something(const std::shared_ptr
& input) { co_await something(); co_return *input + 1; // DANGER: the reference to input may no longer be valid } // claimed good, but still bad std::future
do_something(std::shared_ptr
input) { co_await something(); co_return *input + 1; // input is a copy that is still valid here } ``` - [CP.53: Parameters to coroutines should not be passed by reference](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp53-parameters-to-coroutines-should-not-be-passed-by-reference) - the example uses unstructured concurrency: the `do_someting()` is assumed to be eager and the parameters no longer available after a `co_await` - the coding guideline, instead of pointing out to the unstructured concurrency as a bad thing, makes recommendations about the usage of `shared_ptr` (additional allocations) - despite plenty of cases where things should be passed by reference and the case where "values" of the like `std::string_view` are still references in disguise --- # Structured concurrency --- # The basics should work - variable declarations - if/else - for loops - function calls - RAII - lambdas - pass argument by reference - We've seen that the C++ core guidelines give up on this, [CP.53: Parameters to coroutines should not be passed by reference](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp53-parameters-to-coroutines-should-not-be-passed-by-reference) - We'll see that the sender/receiver framework partially gives up on this and has "algorithms" for basic stuff e.g. `just` for variable declaration, `repeat` for loops, `then` for function calls and care is required for things being passed to such algorithms (reference, value, moved etc. has to be done careful and deliberate). --- # Example good ```cpp co
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - we have local variables: `buff`, `length` - we have a `while(true)` loop - `async_read_some` and `async_write` can take a reference to the `buff` variable - `buff` outlives the execution of `async_read_some` and `async_write` - `buff` gets destroyed on scope exit - RAII works: - but currently it can't be used for joining async work --- # Basic: continuation
continuation
- do this, THEN do that --- # Example bad: std::future ```cpp std::future
f = go_async_work(some_arg); // attach continuation (hypothetical, not in std) f.then([](int x) { // use x }); ``` - a model where async work is started eager has the issue that attaching the continuation is potentially racing with the async work, therefore synchronization is required between: - the async work providing the result into the `std::future` - and the `std::future` setting a continuation to be called when a result is available - a better model is to create lazy work which needs to be started later, but gives the opportunity to set the continuation before the async work is started, thus avoiding races - I think that `std::future/promise` is a broken model where Nathaniel's critique of detached work applies --- # Example good: lazy ```cpp co
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - for example `co_await async_write(...)` - `async_echo` runs uninterrupted until a `co_await` - `async_write` is lazy, does not introduce concurrency yet - `co_await` introduces concurrency - but `async_echo` only continues when `co_await` completes - concurrency introduced by `co_await` does not outlive the parent - in this form it creates a parent-child relationship that makes `co_await` behave **logically** like a function call (except that execution can be interleaved at that point with other coroutines while this one waits for the child to complete --- # Chain of continuations
chain
- it is dynamic over time: further links are added to the right or they go away - it has a root, it starts somewhere - it has a leaf terminal - only the leaf has at a certain point in time a pending operation that in effect the whole chain is waiting on - only the leaf needs to register a `stop_callback` to react to cancellation - if this is a chain of coroutines then it can go to the root and destroy the root coroutine, which in turn via RAII will destroy it's child, which in turn via RAII ... --- # Example ```cpp co
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - `async_echo` might have a parent - the parent of `async_echo` (etc.), `async_echo` and e.g. `async_read_some` make a chain - `async_read_some` and `async_write` are leaves (at different points in time) - and illustrate how a chain shortens and lengthens --- # Chain root ```cpp co
async_foo() { co_return 42; }; TEST(run_co_return_int) { int result = run(async_foo()).value(); ASSERT_EQ(42, result); } ``` - `run` blocks the thread until `async_foo` completes and returns whatever `async_foo` `co_return`s - in fact a `std::optional` of that: `run` returns `std::nullopt` when no value was returned due to chain being cancelled - the chain root needs to have the callback that is called when the entry point in the chain eventually completes or cancelled - in the meantime this `run` needs to go though the list of ready tasks and timers - there can be all sort of such `run` functions that differ on what they actually do until the entry points completes --- # Ready queue + timers heap
ready queue
timers heap
- a useful run loop uses a ready queue and timers heap (directly or indirectly). Example shows a T1 model directly using them --- # Run loops, sync_wait Not all the run loops are the same: - the simplest one `sync_wait` does not even "run" or "loop" anything - it effectively blocks the thread waiting for something else to do the work - used in `main` for "demo" examples - if used in a thread from a thread pool it can/will case the issue where the thread is taken out of the "active" ones in the pool - some can run their own ready queue and timers heap directly - others use (blocking) APIs that are required by yet other APIs (e.g. in Windows `ReadFileEx` requires a `WaitFor...Ex`) - unfortunately different sets of APIs require different kinds of run loops --- # Detached entry in main - the whole point of this presentation is: avoid detached, what to do instead However: - in an embedded environment it makes sense to setup things in main, and exit main while work continues driven e.g. by interrupts, making the entry point detached --- # Chain leaf ```cpp co
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - the leaves `async_read_some`, `async_write` - they have to (eventually) provide an awaiter that implements `await_ready`, `await_suspend` and `await_result` - coroutines require a (nominal) allocation - leaves are not coroutines, this can avoid allocations (in most cases) - as you think about the (temporarily dynamic) tree of a shrinking and growing chain: there are a lot of leaves, therefore a lot of allocations are avoided - from the programmer's point of view writing a leaf is a different (and relatively difficult) world from using them in a coroutine (simple) --- # Elementary leaves ```cpp // sleep for 10 seconds co_await async_sleep_for(10s); // give other chains a chance to run co_await async_yield(); // check if cancelled co_await async_noop(); // suspend forever (i.e. until cancelled) co_await async_suspend_forever(); ``` - primitives such as `async_sleep_for` have to be noexcept (no allocations either), because they act as recovery tools (on error catch exceptions, sleep, try again later) - `async_yield` is more like `std::this_thread::yield()` than `co_yield` - `async_yield` and `async_noop` are useful to voluntarily allow progress (unlike typical system threads, coroutines are non-preemptive) - `async_suspend_forever` is useful as interop e.g. to keep a `std::jthread` variable alive and destruct/stop thread on cancellation --- # wait_any
wait_any
- is a leaf for a chain, but then it creates roots for the children chains - each of those roots has callbacks - when the callback for a child chain is called - if it's the first one, then cancel the others - when no error: returns the result of the first chain - and info: which one completed? - also if the chain for which `wait_any` is a leaf is cancelled then the remaining children are cancelled --- # Callback, callback, callback The terminology is a little bit overloaded: - a root of a chain has some work to do when it's cancelled (e.g. destroy the first coroutine in the chain that will recursively destroy the chain): **cancellation callback** for a root of a chain - similarly some work to do when it completes successfully: **completion callback** - a good tool to implement cancellation is the triad: `stop_source`/`stop_stoken`/**`stop_callback`** - the `stop_callback` takes a functor of the work to do when cancelled (which eventually leads to the cancellation callback at the root of the chain to be called) - **our class `callback`** captures a `void*` and a function pointer and can be used: - as such a functor for `stop_callback`, though our `stop_callback` integrates better with it that the `std::[inplace_]stop_callback` does - or as a chain root cancellation or completion callback - **a C API completion routine** for a C API such as the one for `WSARecv` is called a callback - we'll have a more complete picture when we talk about [cancellation](/presentations/2025-07-18-cancellation-leaves.html) --- # wait_any ```cpp co
async_some_wait() { auto result = co_await async_wait_any( async_suspend_forever(), async_sleep_for(std::chrono::seconds(0)) ); co_return result.index; } TEST(wait_any_inside_co) { auto result = run(async_some_wait()).value(); ASSERT_EQ(1, result); } ``` --- # wait_all
wait_all
- is a leaf for a chain, but then it creates roots for the children chains - each of those roots has callbacks - when the callback for a child chain is called - if it's the first one to have an error, then cancel the others - when no error: it returns a tuple of the results of each chain --- # wait_all ```cpp TEST(wait_all_tree) { auto result = run(coro_st::async_wait_any( async_suspend_forever(), async_wait_all( std::invoke([]() -> co
{ co_return 42; }), async_yield() ) )).value(); ASSERT_EQ(1, result.index); ASSERT_EQ(42, std::get<0>(result.value)); } ``` --- # wait_[any|all] variants - there are all sort of variations possible - for wait_any it's useful to allow a mixture of children having a common non-void result and children that return void - there could be variations regarding what's an error (the simplest behaviour is to have exceptions as errors) - our wait_all returns a tuple of all the results and treats exceptions as errors - but we could have a variant that return a tuple of optional of results and ignores errors - or we could return a vector of results (though all the children have to return the same type) --- # wait_for ```cpp co
async_some_int() { co_await async_yield(); co_return 42; } TEST(wait_for_int_has_value) { auto result = run(async_wait_for( async_some_int(), std::chrono::hours(24) )).value(); ASSERT_TRUE(result.has_value()); ASSERT_EQ(42, result.value()); } ``` - it's so common do to an operation, but only willing to wait a certain amount of time - could use `wait_any`, but `wait_for` is easier to use for this common scenario - returns a `std::nullopt` if the time was hit and work was cancelled --- # stop_when ```cpp std::optional
result1 = co_await async_wait_for( async_some_int(), std::chrono::minutes(2) )); std::optional
result2 = co_await async_stop_when( async_some_int(), async_sleep_for(std::chrono::minutes(2)) )); std::optional
result3 = co_await async_stop_when( async_some_int(), async_ctrl_c_pressed() )); ``` - is a generalisation of `wait_for`, where the "stopping" async activity is provided explicitly --- # Nursery - a dynamic version of `wait_[all|any]` - nursery = "that's where the children live" - partially relaxes structured concurrency principles to achieve functionality that's hard otherwise - care is required when spawning a child - but it still joins on all remaining children before continuing --- # Nursery: the server case
server
connection
connection
- main chain loops waiting for connections - each connection spawns a new chain - orderly shutdown --- # Nursery: the server case ```cpp co
async_accept_loop(nursery& n) { auto ep = co_await async_tcp_listen(8080); while (true) { auto socket = co_await async_tcp_accept(ep); n.spawn_child(async_echo, std::move(socket)); } } co
async_server() { nursery n; co_await n.async_run( async_accept_loop(n)); } ``` - the entry point usually takes it by reference - has `spawn_child` and `request_stop` - `spawn_child` has captures arguments similar to `std::bind` - e.g. to pass reference use `std::ref` - allocated to outlive the scope of the call --- # Nursery: the proxy detection case
proxy 0
proxy 1
proxy 2
- try a proxy - 100ms later try another one - first one to complete wins --- # Nursery: the proxy detection case ```cpp co
async_try_proxy(nursery& n, const std::vector
& proxies, size_t i, std::optional
& result) { if (i != 0) { co_await async_sleep_for(100ms); } if (i + 1 < proxies.size()) { n.spawn_child(async_try_proxy, std::ref(n), std::ref(proxies), i + 1, std::ref(result)); } try { co_await async_actually_try_proxy(proxy); result = i; n.request_stop(); } catch(...) { } } co
> async_detect_proxy(const std::vector
& proxies) { std::optional
result; nursery n; co_await n.async_run( async_try_proxy(n, proxies, 0, result)); co_return result; } ``` --- # Nursery: the proxy detection case - the method shown is not the only one - e.g. there is no need to continue to sleep if the earlier proxy detection completed unsuccessfully - or we could use a proxy generator in the main chain of the nursery (with sleeps) and spawn child attempts from there, the first child to succeed requests a stop - the point is that the nursery has flexibility that is usually not available with `wait_[all|any]`, but it (typically) comes to the cost of an additional allocation for each spawned child whereas `wait_[all|any]` can avoid that (due to the fact that the number of children is statically known) --- # Famous last words - structured concurrency makes reasoning easy - RAII works - detached/callback behaviour makes it difficult to reason about lifetime - cancellation adds additional complexity (covered on separate presentation(s)) - it should be handled in library implementation, to limit the difficult area in the code - we only relaxed it a bit for the nursery - care is required around `spawn_child` - but not all the libraries are strict about it - there are plenty of bad/questionable example around - as we've seen in the C++ core guidelines - just because it's in the standard it does not mean it's good in all respects (e.g. the `std::generator` "spot the bug", the legacy of `std::future/promise`) - do not give up RAII lightly! --- # Questions?