Mircea Baja - 18 July 2025 # Cancellation at the leaves of chains
--- # Motivation - This is in a way a continuation of the structured concurrency discussion - We've seen that structured concurrency primitives like `when_any`, `when_all`, the ones involving timeouts etc. require cancellation support - The topic of cancellation is complex enough that it deservers a few separate presentations - This explores using a variant of `std::stop_source|stop_token|stop_callback` like e.g. `std::inplace_stop_source|inplace_stop_token|inplace_stop_callback` that has the desired `noexcept` behaviour - will informally drop the `inplace_` prefix, but mean it - other single-threaded options are also possible for T1 threading models for example - **Code is still slideware that demonstrates various specific issues, not production library quality code** --- # Cancellation at the leaves at chains - could poll `stop_requested` - but that's unusual - most register a stop callback - pattern to use in multi-threaded environments - **Slideware aspect: awaiters in the following samples should be made non-copyable and non-moveable (same as say the sender/receiver operations)** --- # async_noop ```cpp co_await async_noop(); ``` - noop continues immediately - but if cancelled then it cancels the chain - probably this is a rarely need to use primitive, but it has the advantage that the core code fits comfortably on a slide --- # async_noop: poll ```cpp struct noop_awaiter { context& ctx_; noop_awaiter(context& ctx) noexcept : ctx_{ ctx } {} bool await_ready() const noexcept { return !ctx_.get_stop_token().stop_requested(); } void await_suspend(std::coroutine_handle<>) noexcept { ctx_.schedule_cancellation_callback(); } constexpr void await_resume() const noexcept { } }; // noop_awaiter ``` - cancellation callback is scheduled, not called directly --- # Code dihotomy - writing a low level primitive like `async_noop` is way harder than using it - there is a separation between writing "library" functions and using them - this is not new: it's harder to write a RAII type than just use it - but it is IMPORTANT that the language/library does not make the writing such primitives prohibitivly complex for the mere mortals - in particular my experience with `boost::asio` is that it was possible to use it, but actually implementing similar library functions was way harder (practically did not do it) --- # Cancellation is multistep - e.g. we've seen: ```cpp ctx_.schedule_cancellation_callback(); ``` - we did not call directly the chain root cancellation callback then and there, but we deferred it via the re-schedule - remember as we discussed in [callback, callback, callback](/presentations/2025-06-07-structured-concurrency.html) the term is unfortunately overloaded --- # Cancellation is multistep - we're on a leaf and either: - polling on the stop token `stop_requested()` returns true - a stop_callback is invoked (this is likely done on the caller of `request_stop()` - in more complex cases than `noop` we need to call some C API to initiate cancellation, which will complete later - but even in simple cases like `noop` we don't call the cancellation callback immediately we schedule later - the cancellation callback will eventually do things like destroy the chain - if we do that immediately: - we raise questions about stack size/overflow - lots of care is required because we destroy the very object (e.g. awaitable) that implements the stop_callback invocation - therefore to avoid that, the cancellation is at least two steps: - the initiator calls `request_stop()` that leads to a tree of stop callbacks invokations for the leaves - this then returns - the actual cancellation callback indicating the completion of the cancellation work is called later (e.g. by the scheduler/run loop) --- # Cancellation is multistep - cancellation is not a single step synchronous activity - someone calls `request_stop()` e.g. on a `stop_token` or a `nursery` - that will initiate the cancellation (e.g. will often trigger work at the leaves of chains) - but (typically) it returns before the whole cancellation completes - the cancellation is really completed when the `co_await` for the `wait_any`, `wait_all`, `nursery.async_run` etc. returns --- # async_yield ```cpp co_await async_yield(); ``` - give other chains a chance to run - also cancel chain if cancelled --- # async_yield: poll ```cpp struct yield_awaiter { context& ctx_; yield_awaiter(context& ctx) noexcept : ctx_{ ctx } {} constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { if (ctx_.get_stop_token().stop_requested()) { ctx_.schedule_cancellation_callback(); return; } ctx_.schedule_coroutine_resume(handle); } constexpr void await_resume() const noexcept { } }; // yield_awaiter ``` --- # async_suspend_forever ```cpp co_await async_suspend_forever(); ``` - suspends forever - nothing is forever, can be cancelled - really "it's suspend until stopped because it's cancelled by something else" - name similar to other functions e.g. `serve_forever` for a Python socket server --- # async_suspend_forever: callback ```cpp struct suspend_forever_awaiter { context& ctx_; std::optional
> stop_cb_; suspend_forever_awaiter(context& ctx) noexcept : ctx_{ ctx } {} constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<>) noexcept { configure_cancellation(); } constexpr void await_resume() const noexcept { } ``` - notice the optional stop_callback - this is our single threaded stop_callback, but might be a `std::inplace_stop_callback` --- # async_suspend_forever: callback ```cpp private: void configure_cancellation() noexcept { stop_cb_.emplace( ctx_.get_stop_token(), make_member_callback<&suspend_forever_awaiter::on_cancel>(this)); } void on_cancel() noexcept { stop_cb_.reset(); ctx_.schedule_cancellation_callback(); } }; // suspend_forever_awaiter ``` - `std::optional
> stop_cb_;` is used: - `emplace`: to register the callback (takes the `stop_token` and the actual callback) - to `reset()`: to unregister the callback (in this case once it's fired) --- # stop_callback as a member This is a common pattern that we've seen in the `suspend_forever_awaiter` where we had a `std::optional
> stop_cb_` as a member This is different from the usual examples for `std::stop_token`: ```cpp void in_some_function(std::stop_token st) { std::stop_callback cb(st, []() { // lambda body here }); // do some stuff } ``` In this example the size of the type of `cb` depends on the size of the initializing lambda. But we can't use this pattern for a member variable where the declaration is separated from the initialization (and the size needs to be known at the declaration of the member variable). For the `suspend_forever_awaiter` I used the trick that the `callback` has a fixed size of two pointers. --- # stop_callback as a member ```cpp struct suspend_forever_awaiter { context& ctx_; std::optional
> stop_cb_; // ... void configure_cancellation() noexcept { stop_cb_.emplace( ctx_.get_stop_token(), make_member_callback<&suspend_forever_awaiter::on_cancel>(this)); } void on_cancel() noexcept { stop_cb_.reset(); ctx_.schedule_cancellation_callback(); } }; // suspend_forever_awaiter ``` - instead of this you'll more likely to see ... --- # stop_callback as a member ```cpp struct suspend_forever_awaiter { struct cancel_cb { suspend_forever_awaiter& awaiter_; void operator()() noexcept { awaiter_.on_cancel(); } }; context& ctx_; std::optional
> stop_cb_; // ... void configure_cancellation() noexcept { stop_cb_.emplace( ctx_.get_stop_token(), cancel_cb{this}); } void on_cancel() noexcept { stop_cb_.reset(); ctx_.schedule_cancellation_callback(); } }; // suspend_forever_awaiter ``` --- # async_sleep_for ```cpp co_await async_sleep_for(std::chrono::seconds(5)); ``` - sleeps for a specified duration - uses a node in a timer heap - also cancels if cancelled - would not be efficient to poll for cancellation - uses stop callback to remove node from timer heap --- # async_sleep_for (1/3) ```cpp struct spleep_awaiter { context& ctx_; timer_node timer_node_; std::coroutine_handle<> parent_handle_; std::optional
> stop_cb_; sleep_awaiter(context& ctx, std::chrono::steady_clock::time_point deadline) noexcept : ctx_{ ctx }, timer_node_{ deadline } {} constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { parent_handle_ = handle; schedule_timer(); } constexpr void await_resume() const noexcept { } ``` --- # async_sleep_for (2/3) ```cpp private: void schedule_timer() noexcept { timer_node_.cb = make_member_callback<&sleep_awaiter::on_timer>(this); ctx_.insert_timer_node(timer_node_); stop_cb_.emplace( ctx_.get_stop_token(), make_member_callback<&sleep_awaiter::on_cancel>(this)); } ``` - notice that when we actually do some work (e.g. `insert_timer_node`), that is an additional step from creating the callback via `stop_cb_.emplace`, raising questions like: What happens if the timer fires before we call `stop_cb_.emplace()`? Is the order right, should we emplace and then insert? etc. - this is a toy example that works for a T1 threading model where the timer can't fire until we return to the run loop - will discuss later what to do in a multi-threaded model --- # async_sleep_for (3/3) ```cpp void on_timer() noexcept { stop_cb_.reset(); parent_handle_.resume(); } void on_cancel() noexcept { stop_cb_.reset(); ctx_.remove_timer_node(timer_node_); ctx_.schedule_cancellation_callback(); } }; //sleep_awaiter ``` - note that `parent_handle_.resume()` is fine to be called without stack overflow risks as `on_timer` is called close to the run loop --- # Call API, no cancellation ```cpp struct api_awaiter { std::coroutine_handle<> parent_handle_; // ... constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> handle) noexcept { parent_handle_ = handle; // call C API e.g. AsyncFileRead } int await_resume() const noexcept { // return result } }; // api_awaiter ``` --- # Call API, no cancellation - [As shown here](https://lewissbaker.github.io/2017/11/17/understanding-operator-co-await) a C API that starts concurrent behaviour can be done without any synchronization - **if cancellation is not implemented by the awaiter/operatation** - The issue is that when calling a C API that starts concurrent behaviour, then another thread can do concurrent work e.g. the C API callback and resume the coroutine which will continue from `await_resume` in that other thread - even before the C API returns in `await_suspend` - and the `resume()` could then even destroy the awaiter/operation while `await_suspend` is in progress - But that can be made to work if done carefully: - because the coroutine is suspended when the C API is called, then resume has the correct state to resume from - if care is taken to not access members of the awaiter after the C API is called (by accessing the before, as done for `parent_handle_` in the example, or by storing copies on the stack of `await_suspend` - Also things are easier if cancellation is implemented, but we do not run in a multithreaded model (e.g. the timer example running in a T1 model) --- # Call API, multithreaded - But what to do in a multithreaded model? - The issue is that - if calling a C API like `WSARecv` - we have to emplace the stop_callback optional - but that requires accessing a member of the awaiter/operation - while potentially the C API callback is called from another thread - See [P2300 1.4. Asynchronous Windows socket recv](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2024/p2300r10.html#example-async-windows-socket-recv) where they solve this problem for a sender/receiver operation - the idea is around using an atomic bool --- # Call API, multithreaded ```cpp struct api_awaiter : derived from WSAOVERLAPPED { // ... std::atomic
ready_{false}; // ... std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle) noexcept { parent_handle_ = handle; // call WSAReceive, or even call it in await_ready instead if (result_io_pending) { stop_cb_.emplace(...); if (ready_.exchange(true, std::memory_order_acq_rel)) { // handle completion here e.g stop_cb_.reset() ... } } } void on_c_api_callback() { if (ready_.exchange(true, std::memory_order_acq_rel)) { // handle completion here e.g stop_cb_.reset() ... } } }; // api_awaiter ``` --- # Exchange logic In the toy example I used just `exchange` to mark completion: ```cpp if (ready_.exchange(true, std::memory_order_acq_rel)) { // ... } ``` This caters for the scenario where there are potentially two threads. First thread calls a C API, taking a C callback e.g. `WSAReceive`, then it needs to do some work emplacing an optional called `stop_cb_`. Then it will set `ready_` via `exchange`, see the initial value of `false` and not execute the block. Then (usually) another thread runs the C callback that sees `true` out the exchange and does execute the block that accesses `stop_cb_` (calling `.reset()`). If the timing is different the C callback thread might skip the `if` block and the work will be done by the first thread. But in production you'll do a variation of the code above. --- # Load and exchange [P2300, the "1.4. Asynchronous Windows socket recv" example](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2024/p2300r10.html#example-async-windows-socket-recv) uses a slightly different pattern to mark completion equivalent to: ```cpp if (ready_.load(std::memory_order_acquire) || ready_.exchange(true, std::memory_order_acq_rel)) { // ... } ``` As far as I can see the option to do just `exchange` is sufficient, but both threads will do an `exchange`. With the option `load` followed by `exchange`: typically there will be a `load` and `exchange` from one thread and only a `load` from the other (though it might be that both threads to `load` and `exchange`). This choice [seems to be an optimisation](https://stackoverflow.com/questions/79717187/why-load-and-exchange-an-stdatomicbool) on the lines of [test and tests-and-set](https://en.wikipedia.org/wiki/Test_and_test-and-set) (the idea is `exchange` is expensive). There could also be the option to do just `exchange` on the code path calling the C API and load+exchange from the C callback, because when calling the API typically we `exchange` anyway. --- # Exchange and load and exchange ```cpp struct api_awaiter : derived from WSAOVERLAPPED { // ... std::atomic
ready_{false}; // ... std::coroutine_handle<> await_suspend(std::coroutine_handle<> handle) noexcept { parent_handle_ = handle; // call WSAReceive, or even call it in await_ready instead if (result_io_pending) { stop_cb_.emplace(...); if (ready_.exchange(true, std::memory_order_acq_rel)) { // handle completion here e.g stop_cb_.reset() ... } } } void on_c_api_callback() { if (ready_.load(std::memory_order_acquire) || ready_.exchange(true, std::memory_order_acq_rel)) { // handle completion here e.g stop_cb_.reset() ... } } }; // api_awaiter ``` --- # Exchange and why? (code) ```cpp std::atomic
ready; int foo() { if (ready.load(std::memory_order_acquire) || ready.exchange(true, std::memory_order_acq_rel)) { return 42; } return 52; } ``` --- # Exchange and why? (assembly) ```asm foo(): movzx ecx, byte ptr [rip + ready] mov eax, 42 test cl, cl jne .LBB0_2 mov al, 1 xchg byte ptr [rip + ready], al test al, 1 mov ecx, 42 mov eax, 52 cmovne eax, ecx .LBB0_2: ret ready: .zero 1 ``` `clang x86-64 -O3 -std=c++23` --- # Exchange and why? (timeline) ``` Timeline view: 0123456789 Index 0123456789 012345 [0,0] DeeeER . . . . movzx ecx, byte ptr [rip + ready] [0,1] DeE--R . . . . mov eax, 42 [0,2] .D==eER . . . . test cl, cl [0,3] .D===eER . . . . jne .LBB0_2 [0,4] . DeE--R . . . . mov al, 1 [0,5] . DeeeeeeeeeeeeeeeeER . xchg byte ptr [rip + ready], al [0,6] . D==========eE----R . test al, 1 [0,7] . DeE--------------R . mov ecx, 42 [0,8] . DeE--------------R . mov eax, 52 [0,9] . .D=========eE-----R . cmovne eax, ecx [0,10] . .D=============eeeeER ret ``` `llvm-mca -timeline -iterations=1 -mcpu=btver2` - on some architectures `xchg` does way lot more (`eeeeeeeeeeeeeeeeE`) than `movzx` (`eeeE`) because it has to coordinate cache access/flush etc. between cores of the same processor --- # Questions?