Mircea Baja - 7 June 2025 # Structured concurrency
detached
wait_any
--- # Structured programming
sequential
goto
if
loop
function call
[Nathaniel J. Smith: Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) - `goto` corresponds to low level assembly `jump` instructions - ends in spagetti code: graphs that do not obviously reduce to sequential (i.e. linear) - replaced with constructs that reduce to sequential - in modern languages even `goto` is tamed (only jumps within the same function) - but kept for the three cases where it's needed, new case once every 20 years or so - structured programming is what enables further high level language constructs like RAII --- # Low level concurrency APIs ```cpp int WSAAPI WSARecv( [in] SOCKET s, [in, out] LPWSABUF lpBuffers, [in] DWORD dwBufferCount, [out] LPDWORD lpNumberOfBytesRecvd, [in, out] LPDWORD lpFlags, [in] LPWSAOVERLAPPED lpOverlapped, [in] LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine ); ``` - Can be called with a pointer to the completion routine (a callback) - Until that routine is called (or cancelled): the socket, buffer, overlapped structure (often a derived type holding more data) - need to be kept alive - they might be accessed during or after the call to `WSARecv` - Calling the function is just the beginning, how do we tie things at the end? --- # Unstructured concurrency
goto
detached
[Nathaniel J. Smith: Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) Low level C APIs expose a detached behaviour, which on one side might be necessary*, like `jump` statements in assembly, but we want that wrapped in structured concurrency primitives that are easier to use correctly. Detach/fire and forget has issues: lifetime, sharing data e.g. by value, error propagation. We don't want to continue to stay low level and error prone. (*) By the end of this presentation it might be worth going back on the necessity of this kind of interface --- # Example bad ```cpp int value = get_value(); std::shared_ptr
sharedFoo = get_foo(); { const auto lambda = [value, sharedFoo]() -> std::future
{ co_await something(); // "sharedFoo" and "value" have already been destroyed // the "shared" pointer didn't accomplish anything }; lambda(); } // the lambda closure object has now gone out of scope ``` - [CP.51: Do not use capturing lambdas that are coroutines](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp51-do-not-use-capturing-lambdas-that-are-coroutines), similar incorrect recommendations elsewhere: [A capturing lambda can be a coroutine, but you have to save your captures while you still can](https://devblogs.microsoft.com/oldnewthing/20211103-00/?p=105870) - the example uses unstructured concurrency: the `lambda()` is assumed to be eager and run in detached mode - the coding guideline, instead of pointing out to the unstructured concurrency as a bad thing, makes recommendations about the usage of `shared_ptr` (additional allocations), avoiding using lambda captures, gives up on RAII --- # Example good: wait_any
wait_any
Easy to say "don't use detached", but what to do instead? What are some examples of structured concurrency primitives? e.g. `wait_any`: - we started three tasks - the second completed first - the other two are cancelled - continues when all three are completed (cancelled or otherwise) - returns the value of winner (second task in this case) --- # Example good: wait_all
wait_all
- we started three tasks - the second completed first, nevermind - continues when all three are completed - returns the value of each (e.g. as a tuple) --- # wait_all cancel
wait_all
- but when task fails then the remaining are cancelled - there are quite a lot of similarities in this behaviour to `wait_any` --- # Structured concurrency --- # The basics should work - variable declarations - if/else - for loops - function calls - RAII - lambdas - pass argument by reference - We've seen that the C++ core guidelines give up on this, [CP.53: Parameters to coroutines should not be passed by reference](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp53-parameters-to-coroutines-should-not-be-passed-by-reference) - We'll see that the sender/receiver framework partially gives up on this and has "algorithms" for basic stuff e.g. `just` for variable declaration, `repeat` for loops, `then` for function calls and care is required for things being passed to such algorithms (reference, value, moved etc. has to be done careful and deliberate). --- # Example good ```cpp task
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - we have local variables: `buff`, `length` - RAII works: `buff` gets destroyed on scope exit - but currently it can't be used for joining async work - we have a `while(true)` loop - `async_read_some` takes a reference to the buffer variable --- # Basic: continuation
continuation
- do this, THEN do that --- # Example bad: std::future ```cpp std::future
f = go_async_work(some_arg); // attach continuation (hypothetical, not in std) f.then([](int x) { // use x }); ``` - a model where async work is started eager has the issue that attaching the continuation is potentially racing with the async work, therefore synchronization is required between: - the async work providing the result into the `std::future` - and the `std::future` setting a continuation to be called when a result is available - a better model is to create lazy work which needs to be started later, but gives the opportunity to set the continuation before the async work is started, thus avoiding races --- # Example good: lazy ```cpp task
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - for example `co_await async_write(...)` - `async_echo` runs uninterrupted until a `co_await` - `async_write` is lazy, does not introduce concurrency yet - `co_await` introduces concurrency - but `async_echo` only continues when `co_await` completes - concurrency introduced by `co_await` does not outlive the parent - in this form it creates a parent-child relationship that makes `co_await` behave **logically** like a function call (except that execution can be interleaved at that point with other coroutines while this one waits for the child to complete --- # Chain of continuations
chain
- it is dynamic over time: further links are added to the right or they go away - it has a root, it starts somewhere - it has a leaf terminal - only the leaf has at a certain point in time a pending operation that in effect the whole chain is waiting on - only the leaf needs to register a `stop_callback` to react to cancellation - if this is a chain of coroutines then it can go to the root and destroy the root coroutine, which in turn via RAII will destroy it's child, which in turn via RAII ... --- # Example ```cpp task
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - `async_echo` might have a parent - the parent of `async_echo` (etc.), `async_echo` and e.g. `async_read_some` make a chain - `async_read_some` and `async_write` are leaves (at different points in time) - and illustrate how a chain shortens and lengthens --- # Chain root ```cpp co
async_foo() { co_return 42; }; TEST(run_co_return_int) { int result = run(async_foo()); ASSERT_EQ(42, result); } ``` - `run` blocks the thread until `async_foo` completes and returns whatever `async_foo` `co_return`s. - the chain root needs to have the callback that is called when the entry point in the chain eventually completes - in the meantime this `run` needs to go though the list of ready tasks and timers - there can be all sort of such `run` functions that differ on what they actually do until the entry points completes --- # Ready queue + timers heap
ready queue
timers heap
- a useful run loop uses a ready queue and timers heap (directly or indirectly). Example shows a T1 model directly using them --- # Run loops, sync_wait Not all the run loops are the same: - the simplest one `sync_wait` does not even "run" or "loop" anything - it effectively blocks the thread waiting for something else to do the work - used in `main` for "demo" examples - if used in a thread from a thread pool it can/will case the issue where the thread is taken out of the "active" ones in the pool - some can run their own ready queue and timers heap directly - others use (blocking) APIs that are required by yet other APIs (e.g. in Windows `ReadFileEx` requires a `WaitFor...Ex`) - unfortunately different sets of APIs require different kinds of run loops --- # Detached entry in main - the whole point of this presentation is: avoid detached, what to do instead However: - in an embedded environment it makes sense to setup things in main, and exit main while work continues driven e.g. by interrupts, making the entry point detached --- # Chain leaf ```cpp task
async_echo(socket s) { buffer buff; while (true) { std::size_t length = co_await async_read_some(s, buff); co_await async_write(s, buff, length); } } ``` - the leaves `async_read_some`, `async_write` - they have to (eventually) provide an awaiter that implements `await_ready`, `await_suspend` and `await_result` - coroutines require a (nominal) allocation - leaves are not coroutines, this can avoid allocations (in most cases) - as you think about the (temporarily dynamic) tree of a shrinking and growing chain: there are a lot of leaves, therefore a lot of allocations are avoided - from the programmer's point of view writing a leaf is a different (and relatively difficult) world from using them in a coroutine (simple) --- # Elementary leaves ```cpp // sleep for 10 seconds co_await async_sleep_for(10s); // give other chains a chance to run co_await async_yield(); // check if cancelled co_await async_noop(); // suspend forever (i.e. until cancelled) co_await async_suspend_forever(); ``` --- # wait_any
wait_any
- is a leaf for a chain, but then it creates roots for the children chains - each of those roots has callbacks - when the callback for a child chain is called - if it's the first one, then cancel the others - when no error: returns the result of the first chain - and info: which one completed? - also if the chain for which `wait_any` is a leaf is cancelled then the remaining children are cancelled --- # Callback, callback, callback The terminology is a little bit overloaded: - a root of a chain has some work to do when it's cancelled (e.g. destroy the first coroutine in the chain that will recursively destroy the chain): **cancellation callback** for a root of a chain - similarly some work to do when it completes successfully: **completion callback** - a good tool to implement cancellation is the triad: `stop_source`/`stop_stoken`/**`stop_callback`** - the `stop_callback` takes a functor of the work to do when cancelled (which eventually leads to the cancellation callback at the root of the chain to be called - **our class `callback`** captures a `void*` and a function pointer and can be used: - as such a functor for `stop_callback`, though our `stop_callback` integrates better with it that the `std::[inplace_]stop_callback` does - or as a chain root cancellation or completion callback - **a completion routine** for a C API such as the one for `WSARecv` is called a callback - we'll have a more complete picture when we talk about [cancellation](/presentations/2025-07-18-cancellation.html) --- # wait_any ```cpp co
async_some_wait() { auto result = co_await async_wait_any( async_suspend_forever(), async_sleep_for(std::chrono::seconds(0)) ); co_return result.index; } TEST(wait_any_inside_co) { auto result = run(async_some_wait()); ASSERT_EQ(1, result); } ``` --- # wait_all
wait_all
- is a leaf for a chain, but then it creates roots for the children chains - each of those roots has callbacks - when the callback for a child chain is called - if it's the first one to have an error, then cancel the others - when no error: it returns a tuple of the results of each chain --- # wait_all ```cpp TEST(wait_all_tree) { auto result = run(coro_st::async_wait_any( async_suspend_forever(), async_wait_all( std::invoke([]() -> co
{ co_return 42; }), async_yield() ) )); ASSERT_EQ(1, result.index); ASSERT_EQ(42, std::get<0>(result.value)); } ``` --- # wait_for ```cpp co
async_some_int() { co_await async_yield(); co_return 42; } TEST(wait_for_int_has_value) { auto result = run(async_wait_for( async_some_int(), std::chrono::hours(24) )); ASSERT_TRUE(result.has_value()); ASSERT_EQ(42, result.value()); } ``` - it's so common do to an operation, but only willing to wait a certain amount of time - could use `wait_any`, but `wait_for` is easier to use for this common scenario - returns a `std::nullopt` if the time was hit and work was cancelled --- # stop_when ```cpp std::optional
result1 = co_await async_wait_for( async_some_int(), std::chrono::minutes(2) )); std::optional
result2 = co_await async_stop_when( async_some_int(), async_sleep_for(std::chrono::minutes(2)) )); std::optional
result3 = co_await async_stop_when( async_some_int(), async_ctrl_c_pressed() )); ``` - is a generalisation of `wait_for`, where the "stopping" async activity is provided explicitly --- # Nursery - a dynamic version of `wait_[all|any]` - nursery = "that's where the children live" - partially relaxes structured concurrency principles to achieve functionality that's hard otherwise --- # Nursery: the server case
server
connection
connection
- main chain loops waiting for connections - each connection spawns a new chain - orderly shutdown --- # Nursery: the server case ```cpp co
async_accept_loop(nursery& n) { auto ep = co_await async_tcp_listen(8080); while (true) { auto socket = co_await async_tcp_accept(ep); n.spawn_child(async_echo, std::move(socket)); } } co
async_server() { nursery n; co_await n.async_run( async_accept_loop(n)); } ``` - the entry point usually takes it by reference - has `spawn_child` and `request_stop` - `spawn_child` has captures arguments similar to `std::bind` - e.g. to pass reference use `std::ref` - allocated to outlive the scope of the call --- # Nursery: the proxy detection case
proxy 0
proxy 1
proxy 2
- try a proxy - 100ms later try another one - first one to complete wins --- # Nursery: the proxy detection case ```cpp co
async_try_proxy(nursery& n, const std::vector
& proxies, size_t i, std::optional
& result) { if (i != 0) { co_await async_sleep_for(100ms); } if (i + 1 < proxies.size()) { n.spawn_child(async_try_proxy, std::ref(n), std::ref(proxies), i + 1, std::ref(result)); } try { co_await async_actually_try_proxy(proxy); result = i; n.request_stop(); } catch(...) { } } co
> async_detect_proxy(const std::vector
& proxies) { std::optional
result; nursery n; co_await n.async_run( async_try_proxy(n, proxies, 0, result)); co_return result; } ``` --- # Mildly unstructured (again) - we'll see in the cancellation presentation that actually the structured concurrency primitives imply a two step process: setting the continuation before starting the work - but in the (good) examples I hid that detail and stuck to the structured concurrency principles that make it easy to reason about code - we only relaxed it a bit for the nursery - but not all the libraries are strict about it - bad (in my opinion) --- # Example bad ```cpp co
async_should_not_compile() { auto x = async_foo("some string"); co_await std::move(x); } ``` - some libraries allow the above to compile, but this creates issues similar to the generator - we've seen that the C++ core guidelines give up on this, [CP.53: Parameters to coroutines should not be passed by reference](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp53-parameters-to-coroutines-should-not-be-passed-by-reference) - despite plenty of cases where things should be passed by reference and the case where "values" of the like `std::string_view` are still references in disguise - my observation is that programmers when taught about such issue nod, but then end up learning the hard way (i.e. via bugs) and end up drawing the wrong conclusion (i.e. don't use references for coroutine parameters) instead of using/creating libraries that stop bad patterns --- # Questions?