Executor Classes

See also

For complete class and member documentation, see the API Reference.

Executor classes for running pipeline tasks.

        graph TB
    Executor["Executor"]
    Workers["Worker Threads<br/>(N threads)"]
    RunQueue["ConcurrentQueue<br/>(coroutine handles)"]
    IoBack["IoBackend"]
    SqlPool["SQLite ThreadPool"]
    Timer["TimerService"]

    Executor --> Workers
    Executor --> RunQueue
    Executor --> IoBack
    Executor --> SqlPool
    Executor --> Timer
    Workers --> |dequeue + resume| RunQueue
    

Executor

The executor owns worker threads, I/O backends, a SQLite thread pool, and a timer service. It is the runtime engine that pulls coroutine handles from a lock-free queue and resumes them on its thread pool. An Executor is created automatically by Pipeline, but can also be instantiated directly for testing or embedding.

Creating and starting an executor:

ExecutorConfig config{
    .num_threads = 8,
    .io_backend_type = io::IoBackendType::AUTO
};
auto executor = std::make_unique<Executor>(config);
executor->start();

// ... submit work ...

executor->shutdown();

Submitting work:

The primary way to submit work is through coroutine handles or tracked coroutines:

// Low-level: enqueue a raw coroutine handle (~20ns, lock-free)
executor->enqueue(some_coroutine_handle);

// Tracked: enqueue a Coro with progress tracking
TaskIndex id = executor->enqueue_tracked(std::move(my_coro), "parse_file");

// DAG task: submit a Task with input and parent tracking
executor->submit_task(task, input, parent_task_id);

Thread-local access:

Worker threads can access their executor via the static current() method:

// Inside a coroutine running on the executor
Executor* exec = Executor::current();
if (exec && exec->has_io_backend()) {
    auto& io = exec->io_backend();
    // ... perform async I/O ...
}

Lifecycle methods:

  • start() – spawn worker threads and I/O backends

  • shutdown() – graceful shutdown, waits for in-flight work to complete

  • reset() – prepare for a new execution round after shutdown

  • request_shutdown() – signal workers to stop accepting new tasks

  • is_responsive() – used by watchdog to detect hung executors

Configuration

ExecutorConfig controls thread pool sizing, I/O backend selection, and timeout thresholds. All fields have sensible defaults.

struct ExecutorConfig {
    std::size_t num_threads = 0;        // 0 = hardware_concurrency
    std::chrono::seconds idle_timeout{5};
    std::chrono::seconds deadlock_timeout{10};
    std::size_t io_pool_size = 4;
    io::IoBackendType io_backend_type = io::IoBackendType::AUTO;
    unsigned io_batch_threshold = 16;
};

Key fields:

  • num_threads – Number of worker threads. Set to 0 (the default) to use std::thread::hardware_concurrency().

  • idle_timeout / deadlock_timeout – Thresholds for the watchdog to detect idle or hung workers.

  • io_pool_size – Size of the dedicated I/O thread pool backing the IoBackend.

  • io_backend_type – Selects the async I/O implementation (AUTO, IO_URING, THREAD_POOL).

  • io_batch_threshold – Minimum number of I/O operations to batch before submitting to the backend.

Example – high-throughput configuration:

ExecutorConfig config{
    .num_threads = 16,
    .io_pool_size = 8,
    .io_backend_type = io::IoBackendType::IO_URING,
    .io_batch_threshold = 32,
};

Progress Tracking

The executor maintains a task registry that tracks the state, timing, and parent-child relationships of every submitted task. This powers the get_progress() API used for monitoring, debugging, and building progress bars.

Querying progress:

ExecutorProgress progress = executor->get_progress();

std::cout << "submitted=" << progress.total_tasks_submitted
          << " running=" << progress.tasks_running
          << " completed=" << progress.tasks_completed
          << " failed=" << progress.tasks_failed << "\n";

for (auto& w : progress.workers) {
    std::cout << "worker " << w.worker_id
              << (w.is_idle ? " idle" : " busy")
              << " queue=" << w.local_queue_depth << "\n";
}

Task states:

Each tracked task transitions through these states:

  • QUEUED – waiting in the run queue

  • RUNNING – currently executing on a worker thread

  • WAITING – suspended, waiting for child tasks

  • COMPLETED – finished successfully

  • FAILED – finished with an error

Progress tree:

ExecutorProgress::root_tasks contains a recursive TaskProgress tree. Each node exposes:

  • progress_percentage – 0–100 based on completed subtasks

  • queued_duration_ms / execution_duration_ms – timing breakdown

  • location – human-readable queue location (e.g. "executing_on_worker_3")

  • children – nested subtask progress

Checking recent errors:

for (auto& [task_id, msg] : progress.recent_errors) {
    std::cerr << "task " << task_id << " failed: " << msg << "\n";
}

See also

For complete class and member documentation, see the API Reference.