Coroutine Primitives

Namespace: dftracer::utils::coro

For usage guide and examples, see Coroutine API.

template<typename T>
class AsyncGenerator

AsyncGenerator<T> - Asynchronous lazy sequence generator.

Supports internal co_await via symmetric transfer. Combinators (map, flat_map, filter, take, concat) enable fluent composition.

Operators: gen > func map: T -> U gen >> func flat_map: T -> AsyncGenerator

Public Types

using value_type = T

Public Functions

inline explicit AsyncGenerator(std::coroutine_handle<promise_type> handle)
inline ~AsyncGenerator()
AsyncGenerator(const AsyncGenerator&) = delete
AsyncGenerator &operator=(const AsyncGenerator&) = delete
inline AsyncGenerator(AsyncGenerator &&other) noexcept
inline AsyncGenerator &operator=(AsyncGenerator &&other) noexcept
inline NextAwaitable next()
inline bool done() const
inline bool has_exception() const
inline void rethrow_if_exception()
template<typename F, std::enable_if_t<detail::is_map_fn_v<F, T>, int> = 0>
inline auto map(F &&func) &&
template<typename F, std::enable_if_t<detail::is_flat_map_fn_v<F, T>, int> = 0>
inline auto flat_map(F &&func) &&
template<typename F>
inline AsyncGenerator<T> filter(F &&pred) &&
inline AsyncGenerator<T> take(std::size_t n) &&
inline AsyncGenerator<T> concat(AsyncGenerator<T> other) &&
template<typename F, std::enable_if_t<detail::is_generator_factory_v<F, T>, int> = 0>
inline AsyncGenerator<T> concat(F &&factory) &&
template<typename F, std::enable_if_t<detail::is_map_fn_v<F, T>, int> = 0>
inline auto operator>(F &&func) &&

operator> : map &#8212; transform each element (T -> U).

template<typename F, std::enable_if_t<detail::is_flat_map_fn_v<F, T>, int> = 0>
inline auto operator>>(F &&func) &&

operator>> : flat_map &#8212; each element produces a sub-generator, results are flattened (T -> AsyncGenerator

Friends

inline friend AsyncGenerator<T> operator|(AsyncGenerator<T> &&lhs, AsyncGenerator<T> &&rhs)

operator| with generator : concat (lazy under the hood).

template<typename F, std::enable_if_t<detail::is_generator_factory_v<F, T>, int> = 0>
inline friend AsyncGenerator<T> operator|(AsyncGenerator<T> &&lhs, F &&factory)

operator| with factory : lazy concat factory() called after lhs is exhausted.

class NextAwaitable

Public Functions

inline explicit NextAwaitable(std::coroutine_handle<promise_type> handle)
inline bool await_ready() const noexcept
inline std::coroutine_handle await_suspend(std::coroutine_handle<> awaiting) noexcept
inline std::optional<T> await_resume()
struct promise_type

Public Functions

inline AsyncGenerator get_return_object()
inline std::suspend_always initial_suspend() noexcept
inline auto yield_value(T value) noexcept
inline auto final_suspend() noexcept
inline void return_void() noexcept
inline void unhandled_exception()

Public Members

std::optional<T> current_value_
std::exception_ptr exception_
std::coroutine_handle continuation_ = {}
class AsyncMutex

Lock-free async mutex for coroutines.

Ownership is not tied to any thread. A coroutine holding the lock can migrate across threads freely. Waiting coroutines suspend without blocking the thread and are resumed in approximate FIFO order.

Usage:

AsyncMutex mutex;

co_await mutex.lock();
co_await writer.write_line(data);
mutex.unlock();

// Or with scoped lock:
{
    auto guard = co_await mutex.scoped_lock();
    co_await writer.write_line(data);
}

Public Functions

inline AsyncMutex() noexcept
inline ~AsyncMutex()
AsyncMutex(const AsyncMutex&) = delete
AsyncMutex &operator=(const AsyncMutex&) = delete
inline bool try_lock() noexcept
inline AsyncMutexLockOperation lock() noexcept
inline void unlock()
class AsyncMutexGuard

RAII lock guard returned by AsyncMutex::scoped_lock().

Public Functions

inline explicit AsyncMutexGuard(AsyncMutex &mutex) noexcept
inline AsyncMutexGuard(AsyncMutexGuard &&other) noexcept
AsyncMutexGuard(const AsyncMutexGuard&) = delete
AsyncMutexGuard &operator=(const AsyncMutexGuard&) = delete
inline ~AsyncMutexGuard()
class AsyncMutexLockOperation

Awaitable returned by AsyncMutex::lock().

On co_await: if mutex is free, acquires immediately (no suspend). If contended, suspends and pushes onto the waiter stack.

Public Functions

inline explicit AsyncMutexLockOperation(AsyncMutex &mutex) noexcept
inline bool await_ready() const noexcept
inline bool await_suspend(std::coroutine_handle<> awaiter) noexcept
inline void await_resume() const noexcept
template<typename T>
class Channel : public std::enable_shared_from_this<Channel<T>>

Channel<T> - Producer-consumer queue for streaming data

Usage:

auto channel = make_channel<Chunk>(1000);

// channel->producer() increments the producer count immediately,
// so the channel never transiently sees zero producers while
// coroutines are still being scheduled.
auto task = make_task(
    [ch = channel->producer()](CoroScope& ctx) mutable
        -> CoroTask<void> {
        auto guard = ch.guard();  // adopts slot, RAII release
        for (auto chunk : read_chunks())
            co_await ch.send(std::move(chunk));
        // ~ProducerGuard auto-releases; channel closes when last exits
    });

Public Functions

inline explicit Channel(std::size_t capacity = 0)

Constructor

Parameters:

capacity – Maximum number of items in queue (0 = unlimited)

inline ~Channel()
Channel(const Channel&) = delete
Channel &operator=(const Channel&) = delete
Channel(Channel&&) = delete
Channel &operator=(Channel&&) = delete
inline ChannelProducer<T> producer()

Get a send-side handle with a pre-registered producer slot. Use this when capturing a channel into a coroutine lambda:

ch = channel->producer() mutable -> CoroTask<…> { auto guard = ch.guard(); co_await ch.send(…); }

The producer count is incremented immediately (on the caller’s thread), not when the coroutine starts.

inline ChannelConsumer<T> consumer()

Get a receive-side handle for capturing into a coroutine lambda:

ch = channel->consumer() -> CoroTask<…> { while (auto item = co_await ch.receive()) { … } }

inline bool try_send(const T &item)

Try to send without blocking

Parameters:

item – Item to send

Returns:

true if sent, false if queue full or closed

inline bool try_send(T &&item)
inline bool try_receive(T &item)

Try to receive without blocking

Parameters:

item – Output parameter for received item

Returns:

true if received, false if queue empty

inline ReceiveAwaitable receive()
inline std::optional<T> blocking_receive()
inline SendAwaitable send(const T &item)
inline SendAwaitable send(T &&item)
inline void close()

Close channel No more items can be sent after this

inline bool is_closed() const

Check if channel is closed

inline bool is_closed_and_done() const

Check if channel is closed and no producers remain

inline std::size_t num_producers() const

Get number of active producers

inline std::size_t size() const

Get current queue size (approximate, lock-free)

inline std::size_t capacity() const

Get channel capacity

inline bool empty() const

Check if queue is empty (approximate)

inline bool full() const

Check if queue is full (approximate)

Public Static Functions

static inline std::shared_ptr<Channel<T>> make(std::size_t capacity = 0)

Helper to create shared_ptr Channel<T>

class ProducerGuard

RAII guard for producer tracking Automatically closes channel when last producer exits

Public Functions

inline explicit ProducerGuard(Channel *ch)

Register a new producer slot.

inline ProducerGuard(Channel *ch, Adopt)

Adopt an existing producer registration (no increment).

inline ProducerGuard(Channel *ch, std::shared_ptr<Channel> shared, Adopt)

Adopt with shared_ptr to extend channel lifetime.

inline ~ProducerGuard()
ProducerGuard(const ProducerGuard&) = delete
ProducerGuard &operator=(const ProducerGuard&) = delete
inline ProducerGuard(ProducerGuard &&other) noexcept
inline ProducerGuard &operator=(ProducerGuard &&other) noexcept
struct Adopt

Tag type: adopt an already-registered producer slot (no increment).

class ReceiveAwaitable

Public Functions

inline explicit ReceiveAwaitable(Channel *channel)
ReceiveAwaitable(const ReceiveAwaitable&) = delete
ReceiveAwaitable &operator=(const ReceiveAwaitable&) = delete
ReceiveAwaitable(ReceiveAwaitable&&) = delete
ReceiveAwaitable &operator=(ReceiveAwaitable&&) = delete
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline std::optional<T> await_resume()
inline ~ReceiveAwaitable()
struct ReceiveWaiterNode

Public Members

std::coroutine_handle handle
std::optional<T> *result = {nullptr}
dftracer::utils::Executor *executor = {nullptr}
ReceiveWaiterNode *next = {nullptr}
class SendAwaitable

Public Functions

inline SendAwaitable(Channel *channel, const T &item)
inline SendAwaitable(Channel *channel, T &&item)
SendAwaitable(const SendAwaitable&) = delete
SendAwaitable &operator=(const SendAwaitable&) = delete
SendAwaitable(SendAwaitable&&) = delete
SendAwaitable &operator=(SendAwaitable&&) = delete
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline bool await_resume()
inline ~SendAwaitable()
struct SendWaiterNode

Public Members

std::coroutine_handle handle
std::optional<T> *item = {nullptr}
bool *sent = {nullptr}
dftracer::utils::Executor *executor = {nullptr}
SendWaiterNode *next = {nullptr}
template<typename T>
class ChannelConsumer

ChannelConsumer<T> - Receive-side handle for Channel<T>

Holds a raw pointer for operations and optionally a shared_ptr to keep the channel alive when created from a shared_ptr channel.

Usage:

[ch = channel->consumer()](CoroScope& ctx)
    -> CoroTask<void> {
    while (auto item = co_await ch.receive()) {
        process(*item);
    }
}

Public Functions

inline explicit ChannelConsumer(Channel<T> *ch)
inline explicit ChannelConsumer(std::shared_ptr<Channel<T>> ch)
~ChannelConsumer() = default
inline ChannelConsumer(ChannelConsumer &&other) noexcept
inline ChannelConsumer &operator=(ChannelConsumer &&other) noexcept
inline ChannelConsumer(const ChannelConsumer &other)
inline ChannelConsumer &operator=(const ChannelConsumer &other)
inline auto receive() const
inline std::optional<T> blocking_receive() const
inline bool is_closed() const
template<typename T>
class ChannelProducer

Send-side handle that pre-registers a producer slot on construction.

Create via channel->producer() or channel.producer() before spawning a coroutine, then capture by value into the lambda. The producer count is incremented eagerly (on the caller’s thread), so the channel never transiently sees zero producers while coroutines are still being scheduled.

Inside the coroutine, call guard() to get an RAII ProducerGuard that decrements the count when the coroutine finishes. If the ChannelProducer is destroyed without calling guard() (e.g. the task was never scheduled), it decrements automatically.

Usage:

auto channel = make_channel<Batch>(100);
auto task = make_task(
    [ch = channel->producer()](CoroScope& ctx) mutable
        -> CoroTask<void> {
        auto guard = ch.guard();
        co_await ch.send(Batch{...});
    });

Public Functions

inline explicit ChannelProducer(Channel<T> *ch)

Construct from raw pointer (stack-allocated channels).

inline explicit ChannelProducer(std::shared_ptr<Channel<T>> ch)

Construct from shared_ptr (heap-allocated channels).

inline ~ChannelProducer()
inline ChannelProducer(ChannelProducer &&other) noexcept
inline ChannelProducer &operator=(ChannelProducer &&other) noexcept
ChannelProducer(const ChannelProducer&) = delete
ChannelProducer &operator=(const ChannelProducer&) = delete
inline Channel<T>::ProducerGuard guard()

Adopt the pre-registered slot as an RAII guard. Call this once inside the coroutine body.

inline auto send(const T &item)

Send an item through the channel.

inline auto send(T &&item)

Send an item through the channel (move).

class Coro

Lightweight fire-and-forget coroutine type.

Unlike CoroTask<T>, Coro has no return value (communicate through channels), no continuation chain (flat scheduling), and integrates with JoinHandle for structured concurrency.

Coro is the internal execution primitive. Users interact with Task (the DAG API); internally, each ready Task is wrapped in a run_task() coroutine and enqueued to the Executor.

Public Types

using promise_type = CoroPromise

Public Functions

inline explicit Coro(std::coroutine_handle<CoroPromise> h)
inline ~Coro()
inline Coro(Coro &&o) noexcept
inline Coro &operator=(Coro &&o) noexcept
Coro(const Coro&) = delete
Coro &operator=(const Coro&) = delete
inline std::coroutine_handle<CoroPromise> handle() const
inline bool done() const
inline std::coroutine_handle<CoroPromise> release()

Release ownership of the coroutine handle. After this call, the Coro no longer owns the handle. FinalAwaiter will schedule deferred destruction for released handles (via Executor::schedule_destroy). Used when handing the handle to the Executor’s run queue.

struct CoroPromise

Public Functions

inline Coro get_return_object()
inline std::suspend_always initial_suspend() noexcept
inline FinalAwaiter final_suspend() noexcept
inline void return_void() noexcept
inline void unhandled_exception()

Public Members

std::exception_ptr exception = {nullptr}
std::atomic<std::size_t> *join_counter = {nullptr}

Join group: points to JoinHandle’s atomic counter. FinalAwaiter decrements on completion.

std::atomic<void*> *join_continuation = {nullptr}

Continuation to resume when join group reaches zero. Points to JoinHandle’s atomic<void*>.

Executor *executor = {nullptr}

Executor for global scheduling (set before enqueue).

TaskIndex task_id = {-1}
bool released = {false}

True when Coro::release() transferred ownership to the queue. FinalAwaiter uses this to schedule deferred destruction.

Public Static Functions

static inline void *operator new(std::size_t size)
static inline void operator delete(void *ptr, std::size_t size)
struct FinalAwaiter

Public Functions

inline bool await_ready() noexcept
std::coroutine_handle await_suspend(std::coroutine_handle<CoroPromise> h) noexcept
inline void await_resume() noexcept
template<typename T = void>
class CoroTask

CoroTask<T>

User-facing coroutine task type

Usage:

CoroTask<int> compute_async() {
    auto data = co_await read_file();
    co_return process(data);
}

// In another coroutine:
int result = co_await compute_async();

Public Types

using value_type = T
using result_type = T

Public Functions

inline explicit CoroTask(std::coroutine_handle<promise_type> h)

Constructor from coroutine handle Called by promise_type::get_return_object()

inline ~CoroTask()

Destructor - clean up coroutine state

CoroTask(const CoroTask&) = delete
CoroTask &operator=(const CoroTask&) = delete
inline CoroTask(CoroTask &&other) noexcept
inline CoroTask &operator=(CoroTask &&other) noexcept
inline bool await_ready() const noexcept

Check if coroutine already completed If true, no suspension needed (optimization)

template<typename Promise>
inline std::coroutine_handle await_suspend(std::coroutine_handle<Promise> awaiting_coro) noexcept
inline T await_resume()
inline bool done() const noexcept

Check if coroutine has completed

inline void resume()

Resume coroutine execution (manual control) Only resume if not already done

inline T get()

Get result (blocking, for non-coroutine callers) Resumes coroutine until completion

Throws:

Exception – if coroutine threw

Returns:

The result value

inline bool has_exception() const noexcept

Check if coroutine has pending exception

inline std::coroutine_handle<promise_type> handle() const noexcept

Get coroutine handle (for advanced use)

inline bool is_awaiting_async() const noexcept

Check if coroutine is suspended for async work If true, executor should NOT drive it synchronously

inline void set_awaiting_async(bool value) noexcept

Set/clear async await flag (used by TaskFuture)

template<typename Func>
inline auto then(Func &&func) && -> CoroTask<std::invoke_result_t<Func, T>>

Chain operation using then()

- transform result with a function

Usage:

auto result = co_await compute_async()
    .then([](int x) { return x * 2; })
    .then([](int x) { return std::to_string(x); });

Parameters:

func – Transformation function (T -> U)

Returns:

New CoroTask

template<typename Func>
inline auto tap(Func &&func) && -> CoroTask<T>

Tap operation - inspect value without transforming it

Usage:

auto result = co_await compute_async()
    .tap([](int x) { std::cout << "Got: " << x << "\n"; })
    .then([](int x) { return x * 2; });

Parameters:

func – Inspection function (T -> void)

Returns:

CoroTask<T> with same value

template<typename Func>
inline auto operator>(Func &&func) && -> CoroTask<std::invoke_result_t<Func, T>>

Operator> for chaining (same as then()

)

Usage:

auto result = co_await compute_async()
    > [](int x) { return x * 2; }
    > [](int x) { return std::to_string(x); };

Parameters:

func – Transformation function

Returns:

Transformed CoroTask

Friends

template<typename Func>
inline friend auto operator<(Func &&func, CoroTask<T> &&task) -> CoroTask<std::invoke_result_t<Func, T>>

Operator< for reverse composition (func receives this task’s result)

Usage:

auto result = co_await [](int x) { return std::to_string(x); }
    < compute_async();

Parameters:

func – Transformation function

Returns:

Transformed CoroTask

template<typename U>
inline friend auto operator&(CoroTask<T> &&lhs, CoroTask<U> &&rhs) -> CoroTask<std::tuple<T, U>>

Operator& for parallel composition (AND) - run both tasks, return tuple

Note: In the current synchronous execution model, these run sequentially. For true parallel execution, use

CoroScope::spawn().

Usage:

auto [result1, result2] = co_await (task1() & task2());

Parameters:

other – Second task to run in parallel

Returns:

CoroTask<std::tuple<T, U>> with both results

inline friend auto operator|(CoroTask<T> &&primary, CoroTask<T> &&fallback) -> CoroTask<T>

Operator| for OR/fallback composition - try first, fall back to second

Usage:

auto result = co_await (primary_task() | fallback_task());

Parameters:

fallback – Fallback task to run if this task fails

Returns:

CoroTask<T> with result from whichever succeeds

struct promise_type : public dftracer::utils::coro::PromiseBase

Public Functions

inline CoroTask<T> get_return_object()
inline std::suspend_always initial_suspend() noexcept
inline FinalAwaiter final_suspend() noexcept
inline void return_value(T value)
inline void unhandled_exception()
inline coro::YieldAwaitable await_transform(coro::YieldAwaitable y) noexcept
template<typename U>
inline auto await_transform(U &&awaitable) noexcept

Public Members

T result_
std::exception_ptr exception_
struct FinalAwaiter

Public Functions

inline bool await_ready() noexcept
inline std::coroutine_handle await_suspend(std::coroutine_handle<promise_type> h) noexcept
inline void await_resume() noexcept
struct FireAndForget

Coroutine that destroys its own frame on completion.

struct promise_type

Public Functions

inline FireAndForget get_return_object()
inline std::suspend_never initial_suspend()
inline std::suspend_never final_suspend() noexcept
inline void return_void()
inline void unhandled_exception()
template<typename T>
class Generator

Generator<T> - Synchronous lazy sequence generator

Usage:

Generator<int> fibonacci(int n) {
    int a = 0, b = 1;
    for (int i = 0; i < n; ++i) {
        co_yield a;
        auto next = a + b;
        a = b;
        b = next;
    }
}

for (int value : fibonacci(10)) {
    std::cout << value << " ";
}

Public Functions

inline explicit Generator(std::coroutine_handle<promise_type> handle)

Construct Generator from coroutine handle

inline ~Generator()

Destructor - clean up coroutine state

Generator(const Generator&) = delete

Move-only semantics

Generator &operator=(const Generator&) = delete
inline Generator(Generator &&other) noexcept
inline Generator &operator=(Generator &&other) noexcept
inline iterator begin()

Begin iterator

inline iterator end()

End iterator

inline bool next()

Manual control: advance to next value

Returns:

true if value available, false if done

inline const T &value() const

Get current value (call after next() returns true)

Throws:

std::runtime_error – if no value available

inline bool done() const

Check if generator is done

inline bool has_exception() const

Check if generator has pending exception

inline void rethrow_if_exception()

Rethrow pending exception

class iterator

Iterator

Public Types

using iterator_category = std::input_iterator_tag
using value_type = T
using difference_type = std::ptrdiff_t
using pointer = const T*
using reference = const T&

Public Functions

inline explicit iterator(std::coroutine_handle<promise_type> handle)

Construct iterator from coroutine handle

inline iterator &operator++()

Advance to next value

inline iterator operator++(int)

Post-increment

inline reference operator*() const

Dereference iterator

inline pointer operator->() const

Member access operator

inline bool operator==(const iterator &other) const

Compare iterators

inline bool operator!=(const iterator &other) const
struct promise_type

Promise type for coroutine

Public Functions

inline Generator get_return_object()

Create Generator from promise

inline std::suspend_always initial_suspend() noexcept

Suspend at start

inline std::suspend_always final_suspend() noexcept

Suspend at end

inline std::suspend_always yield_value(T value)

Store yielded value

inline void return_void() noexcept

Generator completed

inline void unhandled_exception()

Capture exception

Public Members

std::optional<T> current_value_
std::exception_ptr exception_
class JoinHandle

Stack-allocated join barrier for Coro instances.

Uses the cppcoro when_all_counter pattern. pending_ is initialised to 1 (the joiner’s slot) and incremented by 1 for every tracked Coro. Each Coro’s FinalAwaiter and the joiner’s await_suspend each call fetch_sub(1). Whichever decrement brings the counter to zero resumes the joiner &#8212; FinalAwaiter via symmetric transfer, await_suspend by returning the awaiting handle directly.

Initialising the joiner’s slot in the constructor (rather than in join()) ensures that a Coro completing before join() is called can never observe prev==1 and attempt to exchange on continuation_ before the joiner has published it.

Usage: JoinHandle jh; jh.track(coro1); jh.track(coro2); // … enqueue coroutines to executor … co_await jh.join(); // suspends until all complete

Public Functions

JoinHandle() = default
JoinHandle(const JoinHandle&) = delete
JoinHandle &operator=(const JoinHandle&) = delete
JoinHandle(JoinHandle&&) = delete
JoinHandle &operator=(JoinHandle&&) = delete
inline void track(Coro &c)

Register a Coro with this join group. Must be called BEFORE the coroutine is enqueued for execution.

inline JoinAwaitable join()
inline std::size_t pending() const
struct JoinAwaitable

Public Functions

inline bool await_ready() const noexcept
template<typename Promise>
inline std::coroutine_handle await_suspend(std::coroutine_handle<Promise> awaiting) noexcept
inline void await_resume() const noexcept

Public Members

JoinHandle *handle
struct PromiseBase

Subclassed by dftracer::utils::coro::CoroTask< T >::promise_type, dftracer::utils::coro::CoroTask< void >::promise_type

Public Functions

inline void set_awaited_task_id(TaskIndex id)
inline TaskIndex get_awaited_task_id() const
inline void set_scheduler(Scheduler *s)
inline Scheduler *get_scheduler() const
inline void set_executor(Executor *e)
inline Executor *get_executor() const
inline void set_root_promise(PromiseBase *p)
inline PromiseBase *get_root_promise()

Public Members

std::atomic<bool> awaiting_async_ = {false}
std::coroutine_handle continuation_ = {nullptr}
TaskIndex awaited_task_id_ = {-1}
Scheduler *scheduler_ = {nullptr}
Executor *executor_ = {nullptr}
std::atomic<bool> *cancellation_token_ = {nullptr}
PromiseBase *root_promise_ = {nullptr}

Public Static Functions

static inline void *operator new(std::size_t size)
static inline void operator delete(void *ptr, std::size_t size)
template<typename T>
class ReadyAwaitable

Public Functions

inline explicit ReadyAwaitable(T value)
inline bool await_ready() const noexcept
inline void await_suspend(std::coroutine_handle<>) const noexcept
inline T await_resume()
template<typename T>
struct SharedState

Lock-free shared state for SpawnFuture<T>.

Encodes a 3-state machine in a single atomic uintptr_t: 0 = EMPTY &#8212; neither result nor waiter yet 1 = DONE &#8212; result stored, no waiter was registered other &#8212; WAITING &#8212; value is the waiter’s coroutine_handle address

Completion path (producer): Store result, then exchange(DONE). If prev was a handle address, schedule that handle for resumption via the executor.

Await path (consumer): CAS(EMPTY → handle_addr). If CAS fails, state is DONE &#8212; don’t suspend.

One heap allocation per typed spawn (shared_ptr<SharedState<T>>).

Public Functions

inline void complete(T value)

Called by the spawned coroutine on completion.

inline void complete_with_exception(std::exception_ptr e)

Called by the spawned coroutine on exception.

inline bool is_done() const

Check if result is ready without blocking.

Public Members

std::atomic<std::uintptr_t> flag = {EMPTY}
std::optional<T> result
std::exception_ptr exception
Executor *executor = {nullptr}

Public Static Attributes

static constexpr std::uintptr_t EMPTY = 0
static constexpr std::uintptr_t DONE = 1
template<typename T>
class SpawnFuture

Typed future returned by CoroScope::spawn() for non-void coroutines.

Awaitable: co_await on a SpawnFuture<T> suspends the caller until the spawned coroutine completes, then returns the typed result.

Usage:

SpawnFuture<int> future = scope.spawn([](CoroScope& s) -> CoroTask<int> {
    co_return 42;
});
int result = co_await future;  // suspends until spawn completes

Public Types

using result_type = T

Public Functions

inline explicit SpawnFuture(std::shared_ptr<SharedState<T>> state)
SpawnFuture(SpawnFuture&&) noexcept = default
SpawnFuture &operator=(SpawnFuture&&) noexcept = default
SpawnFuture(const SpawnFuture&) = delete
SpawnFuture &operator=(const SpawnFuture&) = delete
inline bool await_ready() const noexcept
inline bool await_suspend(std::coroutine_handle<> awaiting) noexcept
inline T await_resume()
inline bool is_done() const

Check if the spawned coroutine has completed.

inline void detach() noexcept

Detach waiter &#8212; prevents completion from resuming a destroyed handle. Used by when_any to safely clean up losing wrappers. If a waiter handle was registered (via await_suspend), it is extracted and scheduled for deferred destruction since nobody will resume it.

struct SyncScope

RAII guard that suppresses the executor context and timeslice for the current scope, forcing nested async I/O into the synchronous pread/pwrite fallback. Used by CoroTask::get().

Public Functions

inline SyncScope() noexcept
inline ~SyncScope() noexcept
SyncScope(const SyncScope&) = delete
SyncScope &operator=(const SyncScope&) = delete

Public Members

void *saved_exec_
std::chrono::microseconds saved_ts_
template<typename Duration>
class TimeoutAwaitable

TimeoutAwaitable - Awaitable that completes after a duration

Can be used with when_any to implement timeouts:

auto result = co_await when_any({
    io::async_read(fd, buf, len),
    timeout(std::chrono::seconds(5))
});

if (result.index == 1) {
    // Timeout occurred
}

Public Types

using result_type = void

Public Functions

inline explicit TimeoutAwaitable(Duration duration, TimerService *timer_service)
inline ~TimeoutAwaitable()
inline bool await_ready() const noexcept
inline void await_suspend(std::coroutine_handle<> h)
inline void await_resume()
inline std::shared_ptr<std::atomic<bool>> get_cancellation_token() const
template<typename ...Awaitables>
class WhenAllTupleAwaitable

WhenAllTupleAwaitable - Lightweight awaitable wrapping heap-allocated state.

Accepts heterogeneous awaitable types; returns std::tuple of their results. Void result types are mapped to std::monostate in the tuple.

Usage:

auto [a, b, c] = co_await when_all(task_int(), task_str(), task_float());

Public Types

using result_type = std::tuple<when_all_result_t<typename Awaitables::result_type>...>

Public Functions

inline explicit WhenAllTupleAwaitable(Awaitables&&... awaitables)
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline result_type await_resume()
template<typename ...Awaitables>
struct WhenAllTupleState

Shared state for WhenAllTupleAwaitable. Heap-allocated so lifetime extends beyond await_suspend.

Public Functions

inline explicit WhenAllTupleState(Awaitables&&... awaitables)
inline void on_one_complete()
inline void on_exception(std::exception_ptr e)
inline void mark_suspended_and_check_completion()

Public Members

std::tuple<Awaitables...> awaitables_
std::tuple<std::optional<when_all_result_t<typename Awaitables::result_type>>...> results_
std::exception_ptr exception_
std::atomic<bool> has_exception_ = {false}
std::atomic<std::size_t> completed_count_ = {0}
std::coroutine_handle awaiting_coroutine_
Executor *executor_ = {nullptr}
std::atomic<std::uint8_t> sync_state_ = {0}

Public Static Attributes

static constexpr std::size_t total_ = sizeof...(Awaitables)
static constexpr std::uint8_t BIT_SUSPENDED = 1
static constexpr std::uint8_t BIT_COMPLETED = 2
template<typename Awaitable>
class WhenAllVectorAwaitable

WhenAllVectorAwaitable - Lightweight awaitable that wraps shared state

This is a thin handle that points to heap-allocated state. The state is kept alive by shared_ptr until all tasks complete.

Usage:

CoroTask<Data> read_ops;
for (int i = 0; i < 16000; i++) {
    read_ops.push_back(io::async_read(fds[i], bufs[i], lens[i]));
}
auto results = co_await when_all(read_ops);  // Vector<Data>

Public Types

using result_type = std::vector<typename Awaitable::result_type>

Public Functions

inline explicit WhenAllVectorAwaitable(std::vector<Awaitable> awaitables)
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline result_type await_resume()
template<typename Awaitable>
struct WhenAllVectorState

Shared state for WhenAllVectorAwaitable Heap-allocated to ensure lifetime extends beyond await_suspend

Public Functions

inline explicit WhenAllVectorState(std::vector<Awaitable> awaitables)
inline void on_one_complete()
inline void on_exception(std::exception_ptr e)
inline void mark_suspended_and_check_completion()

Public Members

std::vector<Awaitable> awaitables_
std::vector<typename Awaitable::result_type> results_
std::exception_ptr exception_
std::atomic<bool> has_exception_ = {false}
std::atomic<std::size_t> completed_count_ = {0}
std::coroutine_handle awaiting_coroutine_
std::size_t total_
Executor *executor_ = {nullptr}
std::atomic<std::uint8_t> sync_state_ = {0}

Public Static Attributes

static constexpr std::uint8_t BIT_SUSPENDED = 1
static constexpr std::uint8_t BIT_COMPLETED = 2
template<typename Awaitable>
class WhenAnyAwaitable

WhenAnyAwaitable - Completes when first awaitable completes

Usage:

auto result = co_await when_any({
    io::async_read(fd1, buf1, len1),
    io::async_read(fd2, buf2, len2),
    timeout(5s)
});

if (result.index == 2) {
    // Timeout occurred
} else {
    // Got data from index 0 or 1
    process(result.result);
}

Public Types

using result_type = WhenAnyResult<typename Awaitable::result_type>

Public Functions

inline explicit WhenAnyAwaitable(std::vector<Awaitable> awaitables)
~WhenAnyAwaitable() = default
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline result_type await_resume()
template<typename T>
struct WhenAnyResult

WhenAnyResult - Contains index and result of first-completed operation

Template Parameters:

T – Result type

Public Functions

inline void cancel_remaining() const

Helper to request cancellation on all remaining futures This is cooperative cancellation - tasks must check ctx.is_cancellation_requested()

Public Members

std::size_t index

Index of completed operation.

T result

Result from completed operation.

std::vector<std::shared_ptr<std::atomic<bool>>> remaining_cancellation_tokens
template<typename Awaitable>
struct WhenAnySharedState

Public Functions

inline explicit WhenAnySharedState(std::vector<Awaitable> aws)
inline ~WhenAnySharedState()
inline void on_first_complete()
inline void mark_suspended_and_check_completion()

Public Members

std::atomic<bool> completed = {false}
WhenAnyResult<typename Awaitable::result_type> result
std::exception_ptr exception
std::coroutine_handle awaiting_coroutine
std::vector<std::shared_ptr<std::atomic<bool>>> cancellation_tokens
std::atomic<std::size_t> wrappers_done = {0}
std::size_t total_wrappers = {0}
std::vector<Awaitable> awaitables
Executor *executor = {nullptr}
std::atomic<std::uint8_t> sync_state_ = {0}

Public Static Attributes

static constexpr std::uint8_t BIT_SUSPENDED = 1
static constexpr std::uint8_t BIT_COMPLETED = 2
template<typename ...Awaitables>
class WhenAnyTupleAwaitable

Public Types

using result_type = WhenAnyTupleResult<Awaitables...>

Public Functions

inline explicit WhenAnyTupleAwaitable(Awaitables&&... aws)
~WhenAnyTupleAwaitable() = default
inline bool await_ready()
template<typename Promise>
inline bool await_suspend(std::coroutine_handle<Promise> h)
inline result_type await_resume()
template<typename ...Awaitables>
struct WhenAnyTupleResult

WhenAnyTupleResult - Result of heterogeneous when_any.

Template Parameters:

Awaitables – Pack of distinct awaitable types.

Public Functions

template<std::size_t N>
inline auto &get() &

Type-safe index-based access to the winning result. N must equal index at runtime, otherwise std::bad_variant_access. Works correctly even when multiple awaitables share the same result type (e.g. variant<float, int, int, float>).

template<std::size_t N>
inline auto &&get() &&
template<std::size_t N>
inline const auto &get() const &
inline void cancel_remaining() const

Public Members

std::size_t index
template<typename ...Awaitables>
struct WhenAnyTupleState

Public Functions

inline explicit WhenAnyTupleState(Awaitables&&... aws)
inline ~WhenAnyTupleState()
inline void on_first_complete()
inline void mark_suspended_and_check_completion()

Public Members

std::tuple<Awaitables...> awaitables_
std::atomic<bool> completed = {false}
WhenAnyTupleResult<Awaitables...> result
std::exception_ptr exception
std::coroutine_handle awaiting_coroutine
std::vector<std::shared_ptr<std::atomic<bool>>> cancellation_tokens
Executor *executor = {nullptr}
std::atomic<std::uint8_t> sync_state_ = {0}

Public Static Attributes

static constexpr std::size_t total_ = sizeof...(Awaitables)
static constexpr std::uint8_t BIT_SUSPENDED = 1
static constexpr std::uint8_t BIT_COMPLETED = 2
struct YieldAwaitable

Tag type so await_transform can recognize and skip wrapping.

Public Functions

bool await_ready() noexcept
std::coroutine_handle await_suspend(std::coroutine_handle<> h) noexcept
void await_resume() noexcept

Public Members

bool force_ = false
template<typename T>
struct awaitable_result_type : public dftracer::utils::coro::detail::awaitable_result_type_impl<std::decay_t<T>>
template<typename T>
struct coro_task_value_type
template<typename T>
struct generator_value_type
template<typename T>
struct is_awaitable : public dftracer::utils::coro::detail::has_awaitable_interface<T>
template<typename T>
struct is_coro_task : public std::false_type
template<typename Func, typename = void>
struct is_coroutine_function : public std::false_type
template<typename T>
struct is_generator : public std::false_type
template<typename T>
struct unwrap_coro_task

Public Types

using type = T