Executor Classes¶
See also
For complete class and member documentation, see the API Reference.
Executor classes for running pipeline tasks.
graph TB
Executor["Executor"]
Workers["Worker Threads<br/>(N threads)"]
RunQueue["ConcurrentQueue<br/>(coroutine handles)"]
IoBack["IoBackend"]
SqlPool["SQLite ThreadPool"]
Timer["TimerService"]
Executor --> Workers
Executor --> RunQueue
Executor --> IoBack
Executor --> SqlPool
Executor --> Timer
Workers --> |dequeue + resume| RunQueue
Executor¶
The executor owns worker threads, I/O backends, a SQLite thread pool, and a
timer service. It is the runtime engine that pulls coroutine handles from a
lock-free queue and resumes them on its thread pool. An Executor is created
automatically by Pipeline, but can also be instantiated directly for testing
or embedding.
Creating and starting an executor:
ExecutorConfig config{
.num_threads = 8,
.io_backend_type = io::IoBackendType::AUTO
};
auto executor = std::make_unique<Executor>(config);
executor->start();
// ... submit work ...
executor->shutdown();
Submitting work:
The primary way to submit work is through coroutine handles or tracked coroutines:
// Low-level: enqueue a raw coroutine handle (~20ns, lock-free)
executor->enqueue(some_coroutine_handle);
// Tracked: enqueue a Coro with progress tracking
TaskIndex id = executor->enqueue_tracked(std::move(my_coro), "parse_file");
// DAG task: submit a Task with input and parent tracking
executor->submit_task(task, input, parent_task_id);
Thread-local access:
Worker threads can access their executor via the static current() method:
// Inside a coroutine running on the executor
Executor* exec = Executor::current();
if (exec && exec->has_io_backend()) {
auto& io = exec->io_backend();
// ... perform async I/O ...
}
Lifecycle methods:
start()– spawn worker threads and I/O backendsshutdown()– graceful shutdown, waits for in-flight work to completereset()– prepare for a new execution round after shutdownrequest_shutdown()– signal workers to stop accepting new tasksis_responsive()– used by watchdog to detect hung executors
Configuration¶
ExecutorConfig controls thread pool sizing, I/O backend selection, and
timeout thresholds. All fields have sensible defaults.
struct ExecutorConfig {
std::size_t num_threads = 0; // 0 = hardware_concurrency
std::chrono::seconds idle_timeout{5};
std::chrono::seconds deadlock_timeout{10};
std::size_t io_pool_size = 4;
io::IoBackendType io_backend_type = io::IoBackendType::AUTO;
unsigned io_batch_threshold = 16;
};
Key fields:
num_threads– Number of worker threads. Set to0(the default) to usestd::thread::hardware_concurrency().idle_timeout/deadlock_timeout– Thresholds for the watchdog to detect idle or hung workers.io_pool_size– Size of the dedicated I/O thread pool backing theIoBackend.io_backend_type– Selects the async I/O implementation (AUTO,IO_URING,THREAD_POOL).io_batch_threshold– Minimum number of I/O operations to batch before submitting to the backend.
Example – high-throughput configuration:
ExecutorConfig config{
.num_threads = 16,
.io_pool_size = 8,
.io_backend_type = io::IoBackendType::IO_URING,
.io_batch_threshold = 32,
};
Progress Tracking¶
The executor maintains a task registry that tracks the state, timing, and
parent-child relationships of every submitted task. This powers the
get_progress() API used for monitoring, debugging, and building progress
bars.
Querying progress:
ExecutorProgress progress = executor->get_progress();
std::cout << "submitted=" << progress.total_tasks_submitted
<< " running=" << progress.tasks_running
<< " completed=" << progress.tasks_completed
<< " failed=" << progress.tasks_failed << "\n";
for (auto& w : progress.workers) {
std::cout << "worker " << w.worker_id
<< (w.is_idle ? " idle" : " busy")
<< " queue=" << w.local_queue_depth << "\n";
}
Task states:
Each tracked task transitions through these states:
QUEUED– waiting in the run queueRUNNING– currently executing on a worker threadWAITING– suspended, waiting for child tasksCOMPLETED– finished successfullyFAILED– finished with an error
Progress tree:
ExecutorProgress::root_tasks contains a recursive TaskProgress tree.
Each node exposes:
progress_percentage– 0–100 based on completed subtasksqueued_duration_ms/execution_duration_ms– timing breakdownlocation– human-readable queue location (e.g."executing_on_worker_3")children– nested subtask progress
Checking recent errors:
for (auto& [task_id, msg] : progress.recent_errors) {
std::cerr << "task " << task_id << " failed: " << msg << "\n";
}
See also
For complete class and member documentation, see the API Reference.