Scheduler & Watchdog¶
See also
For complete class and member documentation, see the API Reference.
Task scheduling, dependency tracking, and timeout monitoring.
All classes are in the dftracer::utils namespace.
For executor configuration and progress tracking, see Executor Classes. For task creation and DAG building, see Task System.
graph TB
subgraph Scheduling["Task Scheduling"]
Scheduler["Scheduler"]
Watchdog["Watchdog"]
Executor["Executor"]
end
subgraph Policies["Error Handling"]
ErrorPolicy["ErrorPolicy enum"]
ErrorHandler["ErrorHandler callback"]
end
subgraph Monitoring["Monitoring"]
TaskExec["TaskExecution"]
TimeoutCB["TimeoutCallback"]
WarningCB["WarningCallback"]
end
Scheduler --> Executor
Scheduler --> Watchdog
Scheduler --> ErrorPolicy
Scheduler --> ErrorHandler
Watchdog --> TaskExec
Watchdog --> TimeoutCB
Watchdog --> WarningCB
Scheduler¶
DAG task scheduler with dependency tracking and error handling.
The Scheduler manages the lifecycle of tasks in a directed acyclic graph (DAG):
Validates task dependencies and input types
Prepares inputs from parent task results (using combiners)
Submits ready tasks to the Executor
Tracks completion and propagates results to children
Handles errors according to the configured ErrorPolicy
Error policies:
FAIL_FAST— Stop on first error (default)CONTINUE— Continue processing remaining tasksRETRY— Retry failed tasks (with backoff)
Timeout support:
Global timeout for entire pipeline execution
Per-task timeout with configurable defaults
Watchdog thread monitors for deadlocks and stalls
Usage example:
auto executor = std::make_unique<Executor>(ExecutorConfig{.num_threads = 4});
Scheduler scheduler(executor.get());
scheduler.set_error_policy(ErrorPolicy::FAIL_FAST);
scheduler.set_global_timeout(std::chrono::seconds(300));
scheduler.set_progress_callback([](size_t completed, size_t total) {
std::cout << completed << "/" << total << " tasks done\n";
});
// Schedule a root task
auto task = make_task([](CoroScope& ctx, const std::any& input) -> CoroTask<int> {
co_return 42;
});
scheduler.schedule(task, std::any{});
Watchdog¶
Independent monitoring thread for timeout detection and responsiveness checks.
The Watchdog runs on a separate thread and periodically checks:
Global timeout — Has the entire pipeline exceeded its time budget?
Per-task timeout — Has any individual task exceeded its timeout?
Responsiveness — Is the executor still making progress?
When a timeout is detected, the Watchdog invokes the configured callback and can request executor shutdown for graceful termination.
Timeout hierarchy:
Per-task timeout (set via
set_default_task_timeout())Global timeout (set via
set_global_timeout())Deadlock detection (configured at construction)
Usage example:
using namespace std::chrono_literals;
Watchdog watchdog(
100ms, // check interval
30s, // global timeout
10s, // default task timeout
5s // deadlock detection threshold
);
watchdog.set_timeout_callback([](const std::string& msg) {
std::cerr << "TIMEOUT: " << msg << "\n";
});
watchdog.set_warning_callback([](const std::string& task, int64_t ms) {
std::cerr << "WARNING: " << task << " running for " << ms << "ms\n";
});
watchdog.start();
// ... run pipeline ...
watchdog.stop();
TaskExecution¶
Active task execution metadata tracked by the Watchdog:
task— Shared pointer to the trackedTaskstart_time— When the task started executingtimeout— Task-specific timeout (0 = no timeout)warning_logged— Whether a slow-task warning has been logged
See Core Runtime for full struct definition.