Task System¶
See also
For complete class and member documentation, see the API Reference.
Task-based DAG execution and coroutine scoping for structured concurrency.
classDiagram
class dftracer__utils__CoroScope["CoroScope"]
dftracer__utils__CoroScope : +spawn() SpawnFuture
dftracer__utils__CoroScope : +spawn() SpawnFuture
dftracer__utils__CoroScope : +spawn() SpawnFuture
class dftracer__utils__NoOpTask["NoOpTask"]
class dftracer__utils__Task["Task"]
dftracer__utils__Task : +depends_on() shared_ptr
dftracer__utils__Task : +depends_on() shared_ptr
dftracer__utils__Task : +depends_on() shared_ptr
class dftracer__utils__TaskHandle["TaskHandle"]
dftracer__utils__TaskHandle : +get() void
dftracer__utils__TaskHandle : +wait() void
dftracer__utils__TaskHandle : +done() bool
class dftracer__utils__TaskInfo["TaskInfo"]
class dftracer__utils__TaskProgress["TaskProgress"]
class dftracer__utils__TaskResult["TaskResult"]
dftracer__utils__TaskResult : +set_value() void
dftracer__utils__TaskResult : +set_exception() void
dftracer__utils__TaskResult : +set_cancelled() void
dftracer__utils__Task <|-- dftracer__utils__NoOpTask
CoroScope¶
Lightweight structured concurrency scope using Coro + JoinHandle.
CoroScope is the primary context type passed to task lambdas. It provides:
spawn()returningSpawnFuture<T>for all coroutines (void and typed). The return value can be ignored for fire-and-forget usage, orco_await’d to wait for that specific coroutine.Channel operations (send/receive)
Producer-consumer patterns with helpers
Structured cancellation support
Automatic join() on scope exit
CoroScope replaces the old TaskScope with unified coroutine scheduling, eliminating the Task/Scheduler overhead for lightweight work.
Basic spawning:
Fire-and-forget (return value ignored):
scope.spawn([](CoroScope& s) -> CoroTask<void> {
// do work
co_return;
});
Await a void spawn:
co_await scope.spawn([](CoroScope& s) -> CoroTask<void> {
// caller suspends until this completes
co_return;
});
Typed result with SpawnFuture:
int result = co_await scope.spawn([](CoroScope& s) -> CoroTask<int> {
co_return 42;
});
Or capture the future for later:
auto future = scope.spawn([](CoroScope& s) -> CoroTask<int> {
co_return 42;
});
// ... do other work ...
int result = co_await future;
Channel patterns:
Spawn producers and consumers:
auto channel = make_channel<int>(100);
scope.spawn_producers(channel, 2, [](CoroScope& s, size_t id) -> CoroTask<void> {
for (int i = 0; i < 10; ++i)
co_await channel.send(i);
co_return;
});
scope.spawn_consumers(channel, 2, [](CoroScope& s, int value) -> CoroTask<void> {
process(value);
co_return;
});
Structured concurrency:
Must call co_await scope.join() before the scope is destroyed.
This waits for all spawned coroutines to complete.
Cancellation:
Check is_cancellation_requested() to support graceful cancellation:
while (!scope.is_cancellation_requested()) {
// do work
co_await maybe_yield();
}
TaskResult¶
Lightweight one-shot result holder for Task completion.
TaskResult supports both blocking wait (for tests) and co_await (for runtime). It is embedded in Task (~48 bytes) with no heap allocation for shared state.
States:
pending: Not started
running: Execution in progress
value: Completed with value
exception: Completed with exception
cancelled: Cancelled before completion
Write API (called once by executor):
set_value()- Mark task complete with resultset_exception()- Mark task complete with exceptionset_cancelled()- Mark task as cancelledmark_running()- Mark task as executing
Blocking read API (tests, scheduler, pipeline):
wait()- Block until ready (returns false on timeout)get()- Block until ready, return copy of value (throws on exception)get_ready()- Return value without blocking (asserts ready state)get_exception()- Return exception pointer without blockingis_ready()- Query ready state without blocking
Coroutine read API (runtime):
when_ready()- Awaitable that suspends until result is ready
Memory optimization:
Smart value release via reader tracking:
add_reader()- Register a consumer (called by depends_on())release_reader()- Signal consumer is done (called by Scheduler)
Value is automatically freed when last reader releases, except for terminal tasks (no children) where the value persists for user get().
Task¶
Self-contained DAG node with dependencies.
Task represents a single node in a directed acyclic graph (DAG) of work. Each task:
Owns a TaskResult for result retrieval
Knows its parents and children (DAG structure)
Is immutable after construction (blueprint pattern)
Supports automatic tuple packing for multiple parents
Validates types during edge creation
Fluent API for building DAGs:
Single parent dependency:
auto task2 = make_task([](CoroScope& ctx, const std::any& input) -> CoroTask<int> {
// process input
co_return 42;
})->depends_on(task1);
Multiple parent dependencies:
auto task3 = make_task([](CoroScope& ctx, const std::any& input) -> CoroTask<void> {
// input is a tuple packed by combiner
co_return;
})->depends_on(task1, task2, task3);
Custom combiner for typed inputs:
auto task3 = make_task([](CoroScope& ctx, const std::any& input) -> CoroTask<void> {
// combine task1 and task2 results
co_return;
})->depends_on(task1, task2)
->with_combiner([](int a, std::string b) -> std::any {
return static_cast<std::any>(a + b.size());
});
Task lifecycle:
Created via make_task()
Dependencies added via depends_on()
Optional combiner set via with_combiner()
Scheduled by Scheduler
Executor runs the task function with CoroScope and input
Result stored in TaskResult
Children are enqueued when all parents complete
make_task¶
Create a new Task with a given function.
Migration from Old API¶
The task system has been significantly redesigned:
TaskScope replaced by CoroScope: Lightweight coroutine scoping with JoinHandle
Old Pipeline API removed: Use coroutine + channel patterns instead
Task/Scheduler unified execution: Tasks dispatch to CoroScope internally
SpawnFuture for all spawns:
spawn()always returnsSpawnFuture<T>(includingSpawnFuture<void>), enablingco_awaiton any spawnChannel expanded: Bounded capacity, async send/receive, producer tracking
Breaking changes:
Old when_all() pattern no longer exists; use scope.spawn() for implicit synchronization
Tasks no longer support complex continuation chains; use channels for communication
Type safety is stricter: edge creation validates input/output types