Executor Classes
=================
.. seealso::
For complete class and member documentation, see the
:doc:`API Reference `.
Executor classes for running pipeline tasks.
.. mermaid::
graph TB
Executor["Executor"]
Workers["Worker Threads
(N threads)"]
RunQueue["ConcurrentQueue
(coroutine handles)"]
IoBack["IoBackend"]
SqlPool["SQLite ThreadPool"]
Timer["TimerService"]
Executor --> Workers
Executor --> RunQueue
Executor --> IoBack
Executor --> SqlPool
Executor --> Timer
Workers --> |dequeue + resume| RunQueue
Executor
--------
The executor owns worker threads, I/O backends, a SQLite thread pool, and a
timer service. It is the runtime engine that pulls coroutine handles from a
lock-free queue and resumes them on its thread pool. An ``Executor`` is created
automatically by ``Pipeline``, but can also be instantiated directly for testing
or embedding.
**Creating and starting an executor:**
.. code-block:: cpp
ExecutorConfig config{
.num_threads = 8,
.io_backend_type = io::IoBackendType::AUTO
};
auto executor = std::make_unique(config);
executor->start();
// ... submit work ...
executor->shutdown();
**Submitting work:**
The primary way to submit work is through coroutine handles or tracked
coroutines:
.. code-block:: cpp
// Low-level: enqueue a raw coroutine handle (~20ns, lock-free)
executor->enqueue(some_coroutine_handle);
// Tracked: enqueue a Coro with progress tracking
TaskIndex id = executor->enqueue_tracked(std::move(my_coro), "parse_file");
// DAG task: submit a Task with input and parent tracking
executor->submit_task(task, input, parent_task_id);
**Thread-local access:**
Worker threads can access their executor via the static ``current()`` method:
.. code-block:: cpp
// Inside a coroutine running on the executor
Executor* exec = Executor::current();
if (exec && exec->has_io_backend()) {
auto& io = exec->io_backend();
// ... perform async I/O ...
}
**Lifecycle methods:**
- ``start()`` -- spawn worker threads and I/O backends
- ``shutdown()`` -- graceful shutdown, waits for in-flight work to complete
- ``reset()`` -- prepare for a new execution round after shutdown
- ``request_shutdown()`` -- signal workers to stop accepting new tasks
- ``is_responsive()`` -- used by watchdog to detect hung executors
Configuration
-------------
``ExecutorConfig`` controls thread pool sizing, I/O backend selection, and
timeout thresholds. All fields have sensible defaults.
.. code-block:: cpp
struct ExecutorConfig {
std::size_t num_threads = 0; // 0 = hardware_concurrency
std::chrono::seconds idle_timeout{5};
std::chrono::seconds deadlock_timeout{10};
std::size_t io_pool_size = 4;
io::IoBackendType io_backend_type = io::IoBackendType::AUTO;
unsigned io_batch_threshold = 16;
};
**Key fields:**
- ``num_threads`` -- Number of worker threads. Set to ``0`` (the default) to
use ``std::thread::hardware_concurrency()``.
- ``idle_timeout`` / ``deadlock_timeout`` -- Thresholds for the watchdog to
detect idle or hung workers.
- ``io_pool_size`` -- Size of the dedicated I/O thread pool backing the
``IoBackend``.
- ``io_backend_type`` -- Selects the async I/O implementation
(``AUTO``, ``IO_URING``, ``THREAD_POOL``).
- ``io_batch_threshold`` -- Minimum number of I/O operations to batch before
submitting to the backend.
**Example -- high-throughput configuration:**
.. code-block:: cpp
ExecutorConfig config{
.num_threads = 16,
.io_pool_size = 8,
.io_backend_type = io::IoBackendType::IO_URING,
.io_batch_threshold = 32,
};
Progress Tracking
-----------------
The executor maintains a task registry that tracks the state, timing, and
parent-child relationships of every submitted task. This powers the
``get_progress()`` API used for monitoring, debugging, and building progress
bars.
**Querying progress:**
.. code-block:: cpp
ExecutorProgress progress = executor->get_progress();
std::cout << "submitted=" << progress.total_tasks_submitted
<< " running=" << progress.tasks_running
<< " completed=" << progress.tasks_completed
<< " failed=" << progress.tasks_failed << "\n";
for (auto& w : progress.workers) {
std::cout << "worker " << w.worker_id
<< (w.is_idle ? " idle" : " busy")
<< " queue=" << w.local_queue_depth << "\n";
}
**Task states:**
Each tracked task transitions through these states:
- ``QUEUED`` -- waiting in the run queue
- ``RUNNING`` -- currently executing on a worker thread
- ``WAITING`` -- suspended, waiting for child tasks
- ``COMPLETED`` -- finished successfully
- ``FAILED`` -- finished with an error
**Progress tree:**
``ExecutorProgress::root_tasks`` contains a recursive ``TaskProgress`` tree.
Each node exposes:
- ``progress_percentage`` -- 0--100 based on completed subtasks
- ``queued_duration_ms`` / ``execution_duration_ms`` -- timing breakdown
- ``location`` -- human-readable queue location (e.g. ``"executing_on_worker_3"``)
- ``children`` -- nested subtask progress
**Checking recent errors:**
.. code-block:: cpp
for (auto& [task_id, msg] : progress.recent_errors) {
std::cerr << "task " << task_id << " failed: " << msg << "\n";
}
.. seealso::
For complete class and member documentation, see the
:doc:`API Reference `.