Core Runtime¶
Namespace: dftracer::utils
-
template<typename T>
class BufferPool¶ Thread-safe typed buffer pool. Zero allocations after warmup.
- Template Parameters:
T – Buffer type. Must support move semantics.
Subclassed by dftracer::utils::BufferPoolImpl< T, Init, Reset >
-
template<typename T, typename Init, typename Reset = DefaultReset>
class BufferPoolImpl : public dftracer::utils::BufferPool<T>¶
-
class ByteView¶
Non-owning view over a contiguous byte range.
16 bytes (pointer + size). All methods are trivial reinterpret_casts inlined by the compiler — zero overhead over raw pointer + length.
Public Functions
-
ByteView() = default¶
-
inline ByteView(const std::byte *data, std::size_t len)¶
-
inline ByteView(const unsigned char *data, std::size_t len)¶
-
inline ByteView(const char *data, std::size_t len)¶
-
inline ByteView(std::string_view sv)¶
-
inline ByteView(const std::vector<unsigned char> &v)¶
-
inline ByteView(const std::vector<std::byte> &v)¶
-
inline const std::byte *data() const¶
-
inline std::size_t size() const¶
-
inline bool empty() const¶
-
inline std::string_view as_string_view() const¶
-
ByteView() = default¶
-
template<std::size_t MaxLen = 512>
struct ConstString¶ Compile-time string buffer for consteval string concatenation.
Enables building type signatures and display names entirely at compile time. The result lives in .rodata (zero runtime allocation).
- Template Parameters:
MaxLen – Maximum capacity of the buffer (default 512).
-
class CoroScope¶
Lightweight structured concurrency scope using Coro + JoinHandle.
This is the single context type that task lambdas receive. It replaces both CoroScope and CoroScope with a unified API:
spawn() returning SpawnFuture<T> for all coroutines (void and typed)
io::async_read/write/open/close for async I/O
receive() for channel consumption
Cancellation support
All spawned work items are lightweight Coro instances enqueued directly to the Executor’s run_queue_ — no Task objects, no Scheduler overhead, no when_all().
spawn() always returns SpawnFuture<T>. For void coroutines the return value can be ignored (fire-and-forget) or co_await’d to wait for that specific coroutine to complete.
Usage:
auto task = make_task([](CoroScope& scope) -> coro::CoroTask<void> { // Fire-and-forget (return value ignored): scope.spawn([](CoroScope& s) -> coro::CoroTask<void> { co_return; }); // Await a void spawn: co_await scope.spawn([](CoroScope& s) -> coro::CoroTask<void> { // do work co_return; }); // Await a typed spawn: int result = co_await scope.spawn( [](CoroScope& s) -> coro::CoroTask<int> { co_return 42; }); co_return; });
Public Functions
Construct with an inherited cancellation token (for child scopes spawned from a parent — shares the parent’s cancellation signal).
-
inline ~CoroScope()¶
-
template<typename Func, typename R = typename std::invoke_result_t<Func, CoroScope&>::value_type, std::enable_if_t<std::is_void_v<R>, int> = 0>
inline coro::SpawnFuture<void> spawn(Func &&func)¶ Spawn a void coroutine on the executor’s run queue.
The lambda receives CoroScope& and returns CoroTask<void>. Internally wrapped in a lightweight Coro and enqueued directly.
Returns SpawnFuture<void> that can be co_await’d to wait for this specific coroutine to complete. The return value can be safely ignored for fire-and-forget usage.
The captureless-lambda-with-parameters pattern ensures coroutine parameters are copied into the coroutine frame, avoiding the dangling-capture bug.
Usage:
// Fire-and-forget (existing usage, still works): scope.spawn([](CoroScope& s) -> coro::CoroTask<void> { co_return; }); // Awaitable (new): co_await scope.spawn([](CoroScope& s) -> coro::CoroTask<void> { // do work co_return; });
-
template<typename Func, typename R = typename std::invoke_result_t<Func, CoroScope&>::value_type, std::enable_if_t<!std::is_void_v<R>, int> = 0>
inline coro::SpawnFuture<R> spawn(Func &&func)¶ Spawn a coroutine that returns a typed result.
Returns SpawnFuture<T> that can be co_await’d to retrieve the result. One heap allocation (shared_ptr<SharedState<T>>).
Usage:
auto future = scope.spawn([](CoroScope& s) -> coro::CoroTask<int> { co_return 42; }); int val = co_await future;
-
template<typename UtilityT, typename InputT, typename DecayedUtility = std::remove_reference_t<UtilityT>, typename R = typename DecayedUtility::Output, std::enable_if_t<utilities::detail::has_process_v<DecayedUtility, InputT, R>, int> = 0>
inline coro::SpawnFuture<R> spawn(UtilityT &utility, InputT input)¶
Async receive from channel (shared_ptr).
-
template<typename T, typename Func>
inline void spawn_producer(coro::Channel<T> &channel, Func &&generator_func)¶ Spawn producer feeding Generator<T> into Channel<T> (reference).
Spawn producer feeding Generator<T> into Channel<T> (shared_ptr).
-
template<typename T, typename Func>
inline void spawn_async_producer(coro::Channel<T> &channel, Func &&async_generator_func)¶ Spawn async producer feeding AsyncGenerator<T> into Channel<T> (reference).
Spawn async producer feeding AsyncGenerator<T> into Channel<T> (shared_ptr).
Spawn N producers (shared_ptr).
-
template<typename T, typename Func>
inline void spawn_producers(coro::Channel<T> &channel, std::size_t count, Func &&producer_func)¶ Spawn N producers (reference).
-
template<typename T, typename Func>
inline void spawn_consumers(coro::Channel<T> &channel, std::size_t count, Func &&consumer_func)¶ Spawn N consumers draining Channel<T> (reference).
Spawn N consumers draining Channel<T> (shared_ptr).
Spawn N transform workers (shared_ptr).
-
template<typename TIn, typename TOut, typename Func>
inline void spawn_transforms(coro::Channel<TIn> &input, coro::Channel<TOut> &output, std::size_t count, Func &&transform_func)¶ Spawn N transform workers (reference).
-
inline coro::CoroTask<void> join()¶
Wait for all spawned coroutines to complete. Must be called before CoroScope is destroyed. Idempotent: calling join() a second time is a no-op.
-
inline coro::CoroTask<void> join_all()¶
Alias for join() — backward compatibility with old join_all() callers.
-
inline std::size_t size() const¶
-
inline bool is_joined() const¶
- template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > coro::CoroTask< void > coro_scope (Func &&scope_func)
Create a child scope, run the lambda, and auto-join. Compatibility bridge for code using ctx.coro_scope(…).
- template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > coro::CoroTask< void > scope (Func &&scope_func)
Compatibility bridge for code using ctx.scope(…). Same as coro_scope().
-
inline bool is_cancellation_requested() const¶
-
inline void request_cancellation()¶
-
inline std::shared_ptr<std::atomic<bool>> get_cancellation_token() const¶
-
struct DefaultReset¶
-
class Env¶
-
class Executor¶
Executor - Executes tasks from queue using worker thread pool
Features:
Large thread pool for CPU/IO-bound work
Pulls tasks from queue
Executes task functions
Notifies scheduler on completion via callback
Thread pool size: N threads (default: hardware_concurrency)
Public Functions
-
explicit Executor(const ExecutorConfig &config = {})¶
Constructor
-
~Executor()¶
-
void start()¶
Start the executor (spawn worker threads)
-
void shutdown()¶
Shutdown the executor gracefully
-
void reset()¶
Reset the executor (prepare for new execution)
-
void set_completion_callback(CompletionCallback callback)¶
Set completion callback (called when task finishes)
-
inline TimerService &get_timer_service()¶
Get timer service for timeout operations
-
inline bool is_running() const¶
Check if executor is running
-
inline std::size_t get_num_threads() const¶
Get number of worker threads
-
inline std::size_t get_io_pool_size() const¶
-
inline bool has_io_backend() const noexcept¶
Check if an I/O backend is available
-
inline io::IoBackend &io_backend()¶
Get the I/O backend (must check has_io_backend() first)
-
void request_shutdown()¶
Request graceful shutdown Stops accepting new tasks and waits for current tasks to complete
-
inline bool is_shutdown_requested() const¶
Check if shutdown was requested
-
bool is_responsive() const¶
Check if executor is responsive (making progress)
Used by watchdog to detect if executor is hung. Returns false if executor appears to be stuck or unresponsive.
-
ExecutorProgress get_progress() const¶
Get full progress report
-
void schedule_coroutine_resumption(std::coroutine_handle<> handle)¶
Schedule a coroutine handle to be resumed on the executor’s thread pool This is a lightweight operation that submits the resumption as work
This is useful for when_all and other coroutine combinators that need to resume coroutines from completion callbacks without directly calling .resume()
- Parameters:
handle – The coroutine handle to resume
-
void enqueue(std::coroutine_handle<> handle)¶
Enqueue a coroutine handle for execution on the thread pool. This is the primary submission method for goroutines — all lightweight work funnels through here. Cost: ~20ns (lock-free queue push + atomic signal).
- Parameters:
handle – The coroutine handle to resume
Enqueue a Coro with progress tracking in task_registry_.
-
void mark_coro_completed(TaskIndex id)¶
Submit a Task for execution via a Coro (Phase 3 path). Creates a run_task() Coro, registers in task_registry_, and enqueues the released handle to run_queue_.
- Parameters:
task – The DAG task to execute
input – Task input
parent_task_id – Parent task ID for tracking (-1 for root)
-
void schedule_destroy(std::coroutine_handle<> handle)¶
Schedule a completed Coro handle for deferred destruction. Called from CoroPromise::FinalAwaiter for released handles.
- Parameters:
handle – The coroutine handle at final_suspend
Public Static Functions
Friends
- friend struct coro::CoroPromise
-
struct ExecutorConfig¶
-
struct ExecutorProgress¶
Executor progress report
Public Members
-
std::size_t total_tasks_submitted¶
-
std::size_t tasks_queued¶
-
std::size_t tasks_running¶
-
std::size_t tasks_completed¶
-
std::size_t tasks_failed¶
-
std::vector<std::size_t> worker_queue_depths¶
-
std::vector<TaskProgress> root_tasks¶
-
std::vector<WorkerStatus> workers¶
-
std::vector<std::pair<TaskIndex, std::string>> recent_errors¶
-
struct WorkerStatus¶
-
std::size_t total_tasks_submitted¶
-
class MutableByteView¶
Non-owning mutable view over a contiguous byte range.
For buffers that need to be written into (e.g., inflate output buffers). Implicitly converts to ByteView for reading.
Public Functions
-
MutableByteView() = default¶
-
inline MutableByteView(std::byte *data, std::size_t len)¶
-
inline MutableByteView(unsigned char *data, std::size_t len)¶
-
inline MutableByteView(char *data, std::size_t len)¶
-
inline MutableByteView(std::vector<unsigned char> &v)¶
-
inline MutableByteView(std::string &s)¶
-
inline std::byte *data() const¶
-
inline std::size_t size() const¶
-
inline bool empty() const¶
-
MutableByteView() = default¶
-
struct NoOpReset¶
-
class NoOpTask : public dftracer::utils::Task¶
NoOpTask - Pass-through task for multiple source or destination nodes
This task is automatically created by Pipeline when multiple source or destination tasks are specified. It simply acts as a synchronization point.
Purpose:
Ensures every pipeline has exactly one source/destination node
Provides trackability for multiple independent starting/ending points
Returns void to bypass type validation (works with any parent types)
-
class ObjectPool¶
Public Functions
-
inline void *allocate(std::size_t size)¶
-
inline void deallocate(void *block, std::size_t size)¶
-
ObjectPool(const ObjectPool&) = delete¶
-
ObjectPool &operator=(const ObjectPool&) = delete¶
Public Static Functions
-
static inline ObjectPool &instance()¶
-
inline void *allocate(std::size_t size)¶
-
class Pipeline¶
Pipeline - DAG container and orchestrator
Features:
Holds source and destination tasks
Validates DAG before execution (reachability, types, cycles)
Delegates execution to scheduler/executor
Automatically creates NoOpTask for multiple sources
Public Functions
-
explicit Pipeline(const PipelineConfig &config = PipelineConfig::default_config())¶
Constructor with configuration manager
- Parameters:
config – Pipeline configuration (includes name, threads, etc.)
-
~Pipeline()¶
Set source task (single task)
Set multiple source tasks (initializer list - auto-creates NoOpTask as parent)
Set multiple source tasks (vector - auto-creates NoOpTask as parent)
Set multiple source tasks (variadic - auto-creates NoOpTask as parent)
Set destination task (single task - optional, if nullptr all terminal tasks are destinations)
Set multiple destination tasks (initializer list - auto-creates NoOpTask as child)
Set multiple destination tasks (vector - auto-creates NoOpTask as child)
Set multiple destination tasks (variadic - auto-creates NoOpTask as child)
-
bool validate()¶
Validate pipeline before execution
Check reachability from source to destination
Check type compatibility
Check for cycles
-
PipelineOutput execute(const std::any &input = std::any{})¶
Execute the pipeline
- Parameters:
input – Initial input for source task (defaults to empty)
- Returns:
Output from terminal tasks
-
template<typename T, typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, std::any>>>
inline PipelineOutput execute(T &&input)¶ Execute the pipeline with typed input
- Template Parameters:
T – Input type
- Parameters:
input – Initial input for source task
- Returns:
Output from terminal tasks
-
void set_error_policy(ErrorPolicy policy)¶
Set error handling policy
-
void set_progress_callback(std::function<void(std::size_t completed, std::size_t total)> callback)¶
Set progress callback
-
inline const std::string &get_name() const¶
Get pipeline name
-
struct PipelineConfig¶
Configuration for Pipeline execution
Thread Architecture:
Executor threads: Worker pool that executes task functions (compute threads)
Watchdog: Optional monitoring thread for hang detection
Watchdog: Optional monitoring thread for hang detection
Usage (Fluent API): auto config = PipelineConfig() .with_name(“MyPipeline”) .with_compute_threads(16) // CPU-bound work .with_error_policy(ErrorPolicy::FAIL_FAST) .with_error_policy(ErrorPolicy::FAIL_FAST) .with_watchdog(true) .with_global_timeout(std::chrono::seconds(30)) .with_task_timeout(std::chrono::seconds(10));
Public Functions
-
inline PipelineConfig &with_name(std::string pipeline_name)¶
Set pipeline name
-
inline PipelineConfig &with_compute_threads(std::size_t threads)¶
Set number of compute threads (worker threads for CPU-bound work) 0 = hardware_concurrency (default)
-
inline PipelineConfig &with_error_policy(ErrorPolicy policy)¶
Set error handling policy
-
inline PipelineConfig &with_error_handler(ErrorHandler handler)¶
Set custom error handler (automatically sets policy to CUSTOM)
-
inline PipelineConfig &with_watchdog(bool enabled)¶
Enable/disable watchdog
-
inline PipelineConfig &with_global_timeout(std::chrono::seconds timeout)¶
Set global timeout (0 = wait forever)
-
inline PipelineConfig &with_task_timeout(std::chrono::seconds timeout)¶
Set default task timeout (0 = wait forever)
-
inline PipelineConfig &with_watchdog_interval(std::chrono::seconds interval)¶
Set watchdog check interval
-
inline PipelineConfig &with_warning_threshold(std::chrono::seconds threshold)¶
Set long-running task warning threshold
-
inline PipelineConfig &with_executor_idle_timeout(std::chrono::seconds timeout)¶
Set executor idle timeout
-
inline PipelineConfig &with_executor_deadlock_timeout(std::chrono::seconds timeout)¶
Set executor deadlock timeout
-
inline PipelineConfig &with_timeslice(std::chrono::microseconds duration)¶
Set coroutine timeslice duration (0 = disable automatic yielding)
-
inline PipelineConfig &with_io_threads(std::size_t count)¶
Set I/O thread pool size (used by thread pool and epoll backends)
-
inline PipelineConfig &with_io_backend(io::IoBackendType type)¶
Force a specific I/O backend (AUTO = runtime detection)
-
inline PipelineConfig &with_io_batch_size(unsigned threshold)¶
Set I/O batch threshold for batched SQE submission. When pending ops reach this count, they are flushed in one syscall. 0 = per-op submission (no batching).
Public Members
-
std::string name = ""¶
-
std::size_t executor_threads = 0¶
-
ErrorPolicy error_policy = ErrorPolicy::FAIL_FAST¶
-
ErrorHandler error_handler = nullptr¶
-
bool enable_watchdog = true¶
-
std::chrono::seconds global_timeout = {0}¶
-
std::chrono::seconds default_task_timeout = {0}¶
-
std::chrono::seconds watchdog_interval = {1}¶
-
std::chrono::seconds long_task_warning_threshold{300}¶
-
std::chrono::seconds executor_idle_timeout{300}¶
-
std::chrono::seconds executor_deadlock_timeout{600}¶
-
std::chrono::microseconds timeslice_duration{10'000}¶
-
std::size_t io_thread_count = 0¶
-
io::IoBackendType io_backend_type = io::IoBackendType::AUTO¶
-
unsigned io_batch_threshold = 16¶
Public Static Functions
-
static inline PipelineConfig sequential()¶
Create sequential execution configuration (1 thread)
-
static inline PipelineConfig parallel(std::size_t num_threads = 0)¶
Create parallel execution configuration
-
static inline PipelineConfig default_config()¶
Create default configuration
-
static inline PipelineConfig with_timeouts(std::size_t num_threads = 0, std::chrono::seconds global_timeout = std::chrono::seconds(60), std::chrono::seconds task_timeout = std::chrono::seconds(30))¶
Create configuration with timeouts
-
class PipelineError : public std::runtime_error¶
Public Types
-
enum Type¶
Values:
-
enumerator TYPE_MISMATCH¶
-
enumerator TYPE_MISMATCH_ERROR¶
-
enumerator VALIDATION_ERROR¶
-
enumerator EXECUTION_ERROR¶
-
enumerator INITIALIZATION_ERROR¶
-
enumerator OUTPUT_CONVERSION_ERROR¶
-
enumerator TIMEOUT_ERROR¶
-
enumerator INTERRUPTED¶
-
enumerator EXECUTOR_UNRESPONSIVE¶
-
enumerator UNKNOWN_ERROR¶
-
enumerator TYPE_MISMATCH¶
-
enum Type¶
-
struct PipelineOutput : public std::unordered_map<TaskIndex, std::any>¶
-
class Runtime¶
Lightweight wrapper around Executor + Watchdog for running coroutines on a thread pool without Pipeline/Scheduler/DAG overhead. Intended for Python bindings and other non-DAG consumers.
Public Functions
-
explicit Runtime(std::size_t threads = 0)¶
-
explicit Runtime(const ExecutorConfig &config, bool enable_watchdog = true)¶
-
Runtime(const ExecutorConfig &config, std::unique_ptr<Watchdog> watchdog)¶
-
~Runtime()¶
-
TaskHandle submit(coro::CoroTask<void> task, std::string name = "")¶
Async submit returns immediately, task runs on executor.
-
template<typename T>
TypedTaskHandle<T> submit(coro::CoroTask<T> task, std::string name = "")¶ Async submit with typed result.
- template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > TaskHandle scope (std::string name, Func &&func)
Submit a scoped task (provides CoroScope to the lambda). Returns immediately, task runs on executor.
Usage:
auto handle = rt->scope("my_task", [](CoroScope& scope) -> CoroTask<void> { scope.spawn([](CoroScope& s) -> CoroTask<void> { co_return; }); co_await scope.join(); }); handle.get(); // wait when needed
- template<typename UtilityT, typename InputT, typename DecayedUtility = std::remove_reference_t<UtilityT>> inline requires utilities::has_tag_v< utilities::tags::NeedsContext, DecayedUtility > TaskHandle scope (std::string name, UtilityT &utility, InputT input)
Submit a NeedsContext utility with automatic context injection.
Usage:
AggregatorUtility util; rt->scope("aggregator", util, input).get();
-
void wait_all()¶
Wait for all outstanding tasks to complete.
-
ExecutorProgress get_progress() const¶
-
bool is_responsive() const¶
-
void set_global_timeout(std::chrono::milliseconds timeout)¶
-
void set_default_task_timeout(std::chrono::milliseconds timeout)¶
-
void shutdown()¶
-
std::size_t threads() const¶
-
std::size_t io_threads() const¶
-
explicit Runtime(std::size_t threads = 0)¶
-
class Scheduler¶
Scheduler - Manages task scheduling and dependency tracking
Tasks are submitted directly to the executor from the calling thread (source task) or from worker threads (child tasks on completion). No dedicated scheduling thread — fully event-driven, push-based.
Features:
Traverses DAG and checks dependencies
Submits ready tasks directly to executor
Tracks completed tasks
Handles error policies
Supports dynamic task submission
Public Functions
-
Scheduler(Executor *executor, Watchdog *watchdog, const PipelineConfig &config)¶
-
~Scheduler()¶
Schedule execution starting from source task
- Parameters:
source – Starting task
input – Initial input
Schedule execution starting from source task
- Parameters:
source – Starting task
input – Initial input
Called by executor when a task completes
- Parameters:
task – Completed task
Submit a dynamic task (for intra-task parallelism)
-
void set_error_policy(ErrorPolicy policy)¶
Set error handling policy
-
void set_error_handler(ErrorHandler handler)¶
Set custom error handler
-
void set_progress_callback(std::function<void(std::size_t completed, std::size_t total)> callback)¶
Set progress callback
-
void set_global_timeout(std::chrono::milliseconds timeout)¶
Set global timeout (0 = wait forever)
-
void set_default_task_timeout(std::chrono::milliseconds timeout)¶
Set default task timeout (0 = wait forever)
-
void request_shutdown()¶
Request graceful shutdown
-
inline bool is_shutdown_requested() const¶
Check if shutdown was requested
-
void reset()¶
Reset scheduler state
-
inline bool is_running() const¶
Check if scheduler is running
-
void register_task_completion_callback(TaskIndex task_id, std::function<void()> callback)¶
Register callback to be invoked when task completes
Uses ShardedMutex for minimal contention. Multiple tasks can register callbacks concurrently without blocking.
Thread-safe: Only locks the specific shard for this task_id. Other tasks registering callbacks concurrently don’t block.
Usage:
// Called from TaskFuture::await_suspend() scheduler->register_task_completion_callback(task_id, [awaiting]() { awaiting.resume(); // Resume awaiting coroutine });
- Parameters:
task_id – Task identifier
callback – Function to invoke on completion: []() { … }
-
void invoke_completion_callbacks(TaskIndex task_id)¶
Invoke completion callbacks for a task
Only locks the specific shard for this task_id. Other tasks completing concurrently don’t block.
Called by on_task_completed() when a task finishes. Invokes all registered callbacks (e.g., resuming awaiting coroutines).
- Parameters:
task_id – Task identifier
-
struct ScopedFd¶
Public Functions
-
ScopedFd() = default¶
-
inline explicit ScopedFd(int fd)¶
-
inline ~ScopedFd()¶
-
inline void reset()¶
-
inline int get() const¶
Public Members
-
int value = -1¶
-
ScopedFd() = default¶
-
template<typename Map, typename Hash, std::size_t NUM_SHARDS = 64>
class ShardedMap : public dftracer::utils::ShardedMutex<Map, 64>¶ Thread-safe sharded map extending ShardedMutex with map ergonomics.
Adds key-based shard selection (via Hash), operator[], and emplace. The underlying ShardedMutex handles locking and shard storage.
- Template Parameters:
Map – The underlying map type per shard
Hash – Hash function for map keys
NUM_SHARDS – Number of shards (power of 2)
Public Types
-
template<typename T, std::size_t NUM_SHARDS = 64>
class ShardedMutex¶ ShardedMutex<T>
Generic sharded mutex pattern for reducing lock contention
Usage:
ShardedMutex<std::unordered_map<int, Callback>> callbacks; // Write to shard callbacks.with_shard(task_id, [&](auto& map) { map[task_id].push_back(callback); }); // Read from shard callbacks.with_shard(task_id, [&](const auto& map) { if (auto it = map.find(task_id); it != map.end()) { process(it->second); } });
- Template Parameters:
T – Type of data stored per shard
NUM_SHARDS – Number of shards (must be power of 2)
Public Functions
-
ShardedMutex() = default¶
-
inline ShardedMutex(ShardedMutex &&other) noexcept¶
-
inline ShardedMutex &operator=(ShardedMutex &&other) noexcept¶
-
ShardedMutex(const ShardedMutex&) = delete¶
-
ShardedMutex &operator=(const ShardedMutex&) = delete¶
-
template<typename Func>
inline void with_shard(std::size_t key, Func &&func)¶ Execute function with exclusive access to shard
Thread-safe: Only locks the specific shard, other threads can access different shards concurrently.
- Parameters:
key – Shard key (typically task ID, hash, etc.)
func – Function to execute: [](T& data) { … }
-
template<typename Func>
inline void with_shard(std::size_t key, Func &&func) const¶ Execute function with shared (const) access to shard
- Parameters:
key – Shard key
func – Function to execute: [](const T& data) { … }
-
template<typename Func>
inline void for_each_shard(Func &&func)¶ Execute function on ALL shards sequentially Useful for cleanup, aggregation, or global operations
Acquires locks sequentially to avoid deadlock. Not thread-safe with respect to other operations!
- Parameters:
func – Function to execute: [](T& data) { … }
-
template<typename Func>
inline void for_each_shard(Func &&func) const¶ Execute function on ALL shards sequentially (const version)
-
template<typename Func>
inline void for_each_shard_unlocked(Func &&func)¶ Execute function on ALL shards without locking. Only safe when no concurrent access is possible.
-
template<typename Func>
inline bool try_with_shard(std::size_t key, Func &&func)¶ Try to execute function with exclusive access to shard (non-blocking)
- Parameters:
key – Shard key
func – Function to execute if lock acquired
- Returns:
true if function was executed, false if lock not acquired
-
template<typename U = T>
inline auto clear() -> decltype(std::declval<U>().clear(), void())¶ Clear all shards (calls .clear() on each shard’s data) Only available if T has a .clear() method
Public Static Functions
-
static inline constexpr std::size_t num_shards() noexcept¶
Get number of shards (compile-time constant)
-
class StringIntern¶
Public Functions
-
inline StringIntern()¶
-
inline ~StringIntern()¶
-
StringIntern(const StringIntern&) = delete¶
-
StringIntern &operator=(const StringIntern&) = delete¶
-
StringIntern(StringIntern&&) = delete¶
-
StringIntern &operator=(StringIntern&&) = delete¶
-
inline std::uint32_t get_or_insert(std::string_view sv)¶
-
inline void insert_at_id(std::uint32_t id, std::string_view sv)¶
Insert or look up a string at a specific id (for loading a persisted dictionary where ids must be preserved). If the id already holds a different string, the existing binding wins (caller error -> ignored). Safe to call concurrently with other inserts; must be called before any
resolve(id)at that id.
-
inline std::string_view resolve(std::uint32_t id) const¶
-
inline std::string_view intern(std::string_view sv)¶
-
inline std::size_t size() const¶
-
inline void reserve_id_base(std::uint32_t base) noexcept¶
Shift the next-to-assign id counter to
base. Subsequentget_or_insertcalls allocate ids starting atbase. Must be called before anyget_or_inserton this instance. Lock-free: caller ensures no concurrent inserts.
-
inline void enable_deterministic_ids() noexcept¶
Enable deterministic-hash id assignment. When set,
get_or_insertreturns a stable id derived from the string’s content rather than a sequential counter. Same string -> same id in every process, regardless of insertion order. Intended for multi-process workflows (e.g. MPI ranks) where keys that include string ids must be identical across ranks so RocksDB merge operators can combine operands for the same logical key.Collision handling: the 32-bit id is
hash(str) & 0x7FFFFFFFto stay within FAST_CAPACITY-reachable range on lookup whenid < FAST_CAPACITY. Different strings with the same id are chained in the bucket and lookup resolves by string equality, butresolve(id)can only return one of them. For the typical dftracer workload (cat/name/hhash/fhash dictionaries with O(1000) entries) birthday collisions are negligible.Must be called before any
get_or_insert.
Public Static Attributes
-
static constexpr std::size_t FAST_CAPACITY = 1u << 20¶
-
inline StringIntern()¶
-
class Task : public std::enable_shared_from_this<Task>¶
Task - Self-contained DAG node with dependencies
Features:
Fluent API for building DAG (.depends_on())
Owns TaskResult for lightweight result retrieval
Knows parents and children (DAG structure)
Immutable after construction (blueprint pattern)
Type validation during edge creation
Supports automatic tuple packing for multiple parents
Subclassed by dftracer::utils::NoOpTask, dftracer::utils::TypedTask< I, O >
Public Functions
-
template<typename Func>
inline explicit Task(Func &&func, std::string_view name = "", std::source_location loc = std::source_location::current())¶ Constructor with function that takes input and CoroScope
-
virtual ~Task() = default¶
Add a single parent dependency
Add multiple parent dependencies (initializer list) Usage: task->depends_on({parent1, parent2, parent3, …})
Add multiple parent dependencies (variadic, 2+ parents) Usage: task->depends_on(parent1, parent2, parent3, …)
-
std::shared_ptr<Task> with_combiner(std::function<std::any(const std::vector<std::any>&)> combiner)¶
Set custom input combiner for multiple parents Accepts a function that takes std::vector<std::any>&
Set custom input combiner for multiple parents (tuple-based with std::function) Accepts a function that takes specific types Example: with_combiner(std::function<std::any(int, std::string)>([](int a, std::string b) { … }))
Set custom input combiner for multiple parents (lambda/callable) Automatically deduces argument types from lambda Example: task->with_combiner([](int a, std::string b) -> std::any { return a + b.size(); })
Set initial input for this task This allows providing input directly to a task without using dependencies
- Template Parameters:
T – Input type (automatically deduced)
- Parameters:
input – The input value (will be converted to std::any internally)
-
inline std::shared_ptr<Task> with_timeout(std::chrono::milliseconds timeout)¶
Set timeout for this task
- Parameters:
timeout – Timeout duration (0 = no timeout, wait forever)
- Returns:
This task (for method chaining)
-
inline TaskIndex get_id() const¶
Get task ID (pointer address)
-
template<typename T>
inline T get() const¶ Get task result with automatic type casting
- Template Parameters:
T – The expected result type
- Throws:
std::bad_any_cast – if the result cannot be cast to T
- Returns:
The task result cast to type T
-
inline bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) const¶
Wait for task to complete without retrieving the result
- Parameters:
timeout – Timeout duration (0 = wait forever)
- Returns:
true if completed, false if timed out
-
inline const TaskResult &result() const¶
Access the underlying TaskResult
-
inline TaskResult &result()¶
-
inline TaskResult::WhenReadyAwaitable when_ready()¶
Coroutine-friendly wait — co_await task->when_ready()
-
inline std::vector<std::shared_ptr<Task>> get_parents() const¶
Get parent tasks (returns a copy for thread safety)
-
inline std::vector<std::shared_ptr<Task>> get_children() const¶
Get child tasks (returns a copy for thread safety)
-
inline bool is_ready() const¶
Check if all parents have completed
-
inline bool is_completed() const¶
Check if task has completed
-
inline std::type_index get_input_type() const¶
Get input type
-
inline std::type_index get_output_type() const¶
Get output type
-
inline const char *get_name() const¶
Get task name. Returns user name if set, otherwise caller’s function name from source_location (const char*, zero allocation).
-
inline const std::source_location &get_location() const¶
Get source location where the task was created.
-
inline bool has_combiner() const¶
Check if task has custom combiner
-
inline bool has_initial_input() const¶
Check if task has initial input set
-
inline const std::optional<std::any> &get_initial_input() const¶
Get initial input (if set)
-
inline void set_initial_input(std::any input)¶
Set initial input for this task
-
inline std::chrono::milliseconds get_timeout() const¶
Get task timeout (0 = no timeout)
-
inline bool has_timeout() const¶
Check if task has timeout set
Chain operation using then()
- create dependent task with transformation
Usage:
auto task1 = make_task([](CoroScope& ctx) -> CoroTask<int> { co_return 42; }); auto task2 = task1->then([](CoroScope& ctx, int x) -> CoroTask<std::string> { co_return std::to_string(x * 2); });
- Parameters:
func – Transformation function
name – Optional name for the new task
- Returns:
New task that depends on this task
Tap operation - inspect/log value without transformation Creates a pass-through task that executes a side effect
Usage:
auto pipeline = task1 ->tap([](CoroScope& ctx, int x) -> CoroTask<void> { std::cout << "Value: " << x << "\n"; co_return; }, "log") ->then([](CoroScope& ctx, int x) -> CoroTask<int> { co_return x * 2; });
- Parameters:
func – Inspection function (doesn’t return a value or returns same type)
name – Optional name for the tap task
- Returns:
This task (for method chaining)
Operator& for parallel composition (AND) - creates independent tasks Creates a combiner task that depends on both input tasks
Usage:
auto combined = task1 & task2; // Both run in parallel auto [result1, result2] = combined->get<std::tuple<int, std::string>>();
- Parameters:
other – Second task to run in parallel
name – Optional name for the combiner task
- Returns:
Task that waits for both and returns tuple of results
Operator^ for tap/tee composition - send output to both paths Creates a tap task that receives this task’s output as a side effect
Usage:
auto logger = make_task([](CoroScope& ctx, const std::any& x) -> CoroTask<void> { std::cout << "Value: " << std::any_cast<int>(x) << "\n"; co_return; }); auto result = task1 ^ logger; // task1's output goes to logger as side effect // result continues with task1's output type
- Parameters:
tap_task – Task to receive the output (runs as side effect)
- Returns:
This task (for method chaining), output continues from here
-
struct TaskHandle¶
Handle to a submitted void task. Non-blocking by default. Call .wait() to block (re-raises on error) or .done() to check.
-
struct TaskInfo¶
Task information for progress tracking
Public Types
Public Members
-
TaskIndex task_id¶
-
TaskIndex parent_task_id¶
-
std::string name¶
-
std::size_t worker_id¶
-
std::chrono::steady_clock::time_point queued_at¶
-
std::chrono::steady_clock::time_point started_at¶
-
std::chrono::steady_clock::time_point completed_at¶
-
std::vector<TaskIndex> child_task_ids¶
-
std::atomic<std::size_t> completed_children = {0}¶
-
std::string error_message¶
-
TaskIndex task_id¶
-
class TaskResult¶
Lightweight one-shot result holder for Task. Supports both blocking wait (tests, foreground) and co_await (runtime). ~48 bytes embedded in Task. No heap allocation for shared state.
Public Types
Public Functions
-
TaskResult() = default¶
-
~TaskResult() = default¶
-
TaskResult(const TaskResult&) = delete¶
-
TaskResult &operator=(const TaskResult&) = delete¶
-
TaskResult(TaskResult&&) = delete¶
-
TaskResult &operator=(TaskResult&&) = delete¶
-
void set_value(std::any value)¶
-
void set_exception(std::exception_ptr ex)¶
-
void set_cancelled()¶
-
void mark_running()¶
-
void add_reader()¶
Register that one more consumer will read the value. Called by Task::depends_on() — one per child edge.
-
void release_reader()¶
Signal that one consumer is done with the value. Called by Scheduler after preparing a child’s input. When last reader releases, value_ is cleared (memory freed). Terminal tasks (no children) have reader_count_==0 and are never auto-released — their value persists for user get().
-
bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) const¶
Block until ready. Returns false on timeout. timeout of 0ms means wait forever.
-
std::any get() const¶
Block until ready, return copy of value. Throws if exception/cancelled. Safe to call concurrently with release_reader() (serialized by mutex).
-
std::any get_ready() const¶
Return value without blocking. Asserts ready state. Use only when caller KNOWS task is complete (e.g., scheduler processing a ready child whose parent is guaranteed complete). Returns a copy (value may be released after this call).
-
std::exception_ptr get_exception() const¶
Return exception without blocking. nullptr if no exception.
-
bool is_ready() const¶
-
bool has_exception() const¶
-
bool is_cancelled() const¶
-
inline WhenReadyAwaitable when_ready()¶
-
TaskResult() = default¶
-
class TimerService¶
-
Public Functions
-
TimerService() = default¶
-
inline ~TimerService()¶
-
TimerService(const TimerService&) = delete¶
-
TimerService &operator=(const TimerService&) = delete¶
-
TimerService(TimerService&&) = delete¶
-
TimerService &operator=(TimerService&&) = delete¶
-
inline void start()¶
-
inline void stop()¶
-
TimerService() = default¶
-
struct TransparentStringEqual¶
Public Types
-
using is_transparent = void¶
Public Functions
-
inline bool operator()(std::string_view a, std::string_view b) const noexcept¶
-
using is_transparent = void¶
-
struct TransparentStringHash¶
-
class TreiberStack¶
-
template<typename I, typename O>
class TypedTask : public dftracer::utils::Task¶ TypedTask - A type-safe task wrapper with compile-time type checking
This class provides a strongly-typed interface for tasks with known input and output types. It’s an alternative to using lambdas with Task directly.
Example usage:
class MyTask : public TypedTask<int, std::string> { public: std::string apply(CoroScope& ctx, const int& input) override { return “Result: “ + std::to_string(input * 2); } };
auto task = std::make_shared<MyTask>();
Note: For simple cases, prefer using Task with lambdas: auto task = make_task([](int x) { return “Result: “ + std::to_string(x * 2); });
Public Functions
-
virtual ~TypedTask() = default¶
-
template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<!std::is_void_v<I_> && !std::is_void_v<O_>, O_> apply(CoroScope &ctx, const I_ &input)¶ Type-safe execution method - override this in your subclass
Variants based on whether input/output are void:
O apply(CoroScope&, const I&) - Has input and output
void apply(CoroScope&, const I&) - Has input, no output
O apply(CoroScope&) - No input, has output
void apply(CoroScope&) - No input or output
-
template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<!std::is_void_v<I_> && std::is_void_v<O_>, void> apply(CoroScope &ctx, const I_ &input)¶
-
virtual ~TypedTask() = default¶
-
template<typename T>
struct TypedTaskHandle¶ Typed handle that can return a value via .get(). .wait() re-raises stored exceptions (same as .get() but discards value).
-
class Watchdog¶
Watchdog - Monitors pipeline execution and enforces timeouts
Responsibilities:
Monitor global pipeline timeout
Monitor per-task timeouts
Detect when executor becomes unresponsive
Log warnings for long-running tasks
Trigger graceful shutdown on timeout
The watchdog runs in its own thread, independent of the executor and scheduler, so it can detect hangs even if they are stuck.
Public Types
-
using TimeoutCallback = std::function<void(const std::string &reason)>¶
Callback when timeout is detected Parameters: reason (string describing what timed out)
-
using WarningCallback = std::function<void(const std::string &task_name, int64_t elapsed_ms)>¶
Callback for warnings about long-running tasks Parameters: task name, elapsed time in milliseconds
Public Functions
-
explicit Watchdog(std::chrono::milliseconds check_interval = std::chrono::milliseconds(100), std::chrono::milliseconds global_timeout = std::chrono::milliseconds(0), std::chrono::milliseconds default_task_timeout = std::chrono::milliseconds(0), std::chrono::milliseconds warning_threshold = std::chrono::milliseconds(10000))¶
Constructor
- Parameters:
check_interval – How often to check for timeouts
global_timeout – Global pipeline timeout (0 = no timeout)
default_task_timeout – Default per-task timeout (0 = no timeout)
warning_threshold – Warn about tasks running longer than this
-
~Watchdog()¶
-
void start()¶
Start the watchdog thread
-
void stop()¶
Stop the watchdog thread
-
void mark_execution_start()¶
Mark the start of pipeline execution
Register a task as actively executing
- Parameters:
task_id – Task identifier
task – Shared pointer to the task
timeout – Task-specific timeout (0 = use default or no timeout)
-
void unregister_task(TaskIndex task_id)¶
Unregister a task (called when it completes)
- Parameters:
task_id – Task identifier
-
void set_timeout_callback(TimeoutCallback callback)¶
Set callback for timeout events
-
void set_warning_callback(WarningCallback callback)¶
Set callback for warning events
-
void set_global_timeout(std::chrono::milliseconds timeout)¶
Set global timeout
-
void set_default_task_timeout(std::chrono::milliseconds timeout)¶
Set default task timeout
-
inline bool is_running() const¶
Check if watchdog is running
-
inline bool is_shutdown_requested() const¶
Check if shutdown was requested
-
size_t get_active_task_count() const¶
Get number of active tasks being monitored
-
struct TaskExecution¶
Information about an active task execution
-
class MPIUtils¶
MPIUtils - Singleton class for MPI utilities
Usage:
// Initialize (call once after MPI_Init) MPIUtils::instance().initialize();
// Use throughout application int rank = MPIUtils::instance().get_rank(); int size = MPIUtils::instance().get_world_size();
// Cleanup (call before MPI_Finalize if needed) MPIUtils::instance().finalize();
Public Functions
-
bool initialize()¶
Initialize MPI utilities Must be called after MPI_Init
- Returns:
true if MPI is available and initialized
-
void finalize()¶
Finalize MPI utilities (cleanup internal state) Does NOT call MPI_Finalize - that’s the caller’s responsibility
-
inline bool is_initialized() const¶
Check if MPI is initialized and available
-
bool is_mpi_enabled() const¶
Check if MPI is enabled (compiled with MPI support)
-
inline int get_rank() const¶
Get MPI rank (0 if MPI not initialized)
-
inline int get_world_size() const¶
Get MPI world size (1 if MPI not initialized)
-
inline bool is_root() const¶
Check if this is the root rank (rank 0)
-
void barrier()¶
Barrier - synchronize all ranks
-
void broadcast_string(std::string &str, int root = 0)¶
Broadcast a string from root to all ranks
- Parameters:
str – String to broadcast (modified on non-root ranks)
root – Root rank (default 0)
-
void broadcast_uint32_vector(std::vector<std::uint32_t> &values, int root = 0)¶
Broadcast a vector of uint32_t from root to all ranks
- Parameters:
values – Vector to broadcast (modified on non-root ranks)
root – Root rank (default 0)
-
void broadcast_int(int &value, int root = 0)¶
Broadcast a single integer from root to all ranks
- Parameters:
value – Integer to broadcast (modified on non-root ranks)
root – Root rank (default 0)
-
void gather_int(int send_value, std::vector<int> &recv_values, int root = 0)¶
Gather integers from all ranks to root
- Parameters:
send_value – Value to send from this rank
recv_values – Vector to receive values (only valid on root)
root – Root rank (default 0)
-
void gatherv_uint32(const std::vector<std::uint32_t> &send_data, std::vector<std::uint32_t> &recv_data, std::vector<int> &recv_counts, std::vector<int> &displacements, int root = 0)¶
Gatherv - gather variable-sized data from all ranks to root
- Parameters:
send_data – Data to send from this rank
recv_data – Buffer to receive data (only valid on root)
recv_counts – Number of elements from each rank (only valid on root)
displacements – Displacements for each rank (only valid on root)
root – Root rank (default 0)
-
void allgather_int(int send_value, std::vector<int> &recv_values)¶
Allgather - gather data from all ranks to all ranks
- Parameters:
send_value – Value to send from this rank
recv_values – Vector to receive all values (resized to world_size)
-
void allgatherv_char(const std::vector<char> &send_data, std::vector<char> &recv_data, std::vector<int> &recv_sizes, std::vector<int> &displacements)¶
Allgatherv - gather variable-sized data from all ranks to all ranks
- Parameters:
send_data – Data to send from this rank
recv_data – Buffer to receive all data
recv_sizes – Number of elements from each rank
displacements – Displacements for each rank
-
void reduce_sum_size_t(std::size_t send_value, std::size_t &recv_value, int root = 0)¶
Reduce to root - sum operation
- Parameters:
send_value – Value to reduce from this rank
recv_value – Result (only valid on root)
root – Root rank (default 0)
-
void reduce_max_double(double send_value, double &recv_value, int root = 0)¶
Reduce to root - max operation for double
- Parameters:
send_value – Value to reduce from this rank
recv_value – Result (only valid on root)
root – Root rank (default 0)
-
bool initialize()¶