Core Runtime

Namespace: dftracer::utils

template<typename T>
class BufferPool

Thread-safe typed buffer pool. Zero allocations after warmup.

Template Parameters:

T – Buffer type. Must support move semantics.

Subclassed by dftracer::utils::BufferPoolImpl< T, Init, Reset >

Public Functions

virtual ~BufferPool() = default
virtual T acquire() = 0
virtual void release(T buf) = 0
template<typename T, typename Init, typename Reset = DefaultReset>
class BufferPoolImpl : public dftracer::utils::BufferPool<T>

Public Functions

inline BufferPoolImpl(std::size_t capacity, Init init, Reset reset = Reset{})
inline virtual T acquire() override
inline virtual void release(T buf) override
class ByteView

Non-owning view over a contiguous byte range.

16 bytes (pointer + size). All methods are trivial reinterpret_casts inlined by the compiler &#8212; zero overhead over raw pointer + length.

Public Functions

ByteView() = default
inline ByteView(const std::byte *data, std::size_t len)
inline ByteView(const unsigned char *data, std::size_t len)
inline ByteView(const char *data, std::size_t len)
inline ByteView(std::string_view sv)
inline ByteView(const std::vector<unsigned char> &v)
inline ByteView(const std::vector<std::byte> &v)
inline const std::byte *data() const
inline std::size_t size() const
inline bool empty() const
template<typename T>
inline const T *as() const
inline std::string_view as_string_view() const
inline ByteView subspan(std::size_t offset, std::size_t count) const
inline ByteView subspan(std::size_t offset) const
template<std::size_t MaxLen = 512>
struct ConstString

Compile-time string buffer for consteval string concatenation.

Enables building type signatures and display names entirely at compile time. The result lives in .rodata (zero runtime allocation).

Template Parameters:

MaxLen – Maximum capacity of the buffer (default 512).

Public Functions

consteval ConstString() = default
inline consteval ConstString(std::string_view sv)
inline consteval ConstString &append(std::string_view sv)
inline constexpr std::string_view view() const
inline constexpr operator std::string_view() const

Public Members

char data[MaxLen] = {}
std::size_t len = 0
class CoroScope

Lightweight structured concurrency scope using Coro + JoinHandle.

This is the single context type that task lambdas receive. It replaces both CoroScope and CoroScope with a unified API:

  • spawn() returning SpawnFuture<T> for all coroutines (void and typed)

  • io::async_read/write/open/close for async I/O

  • receive() for channel consumption

  • Cancellation support

All spawned work items are lightweight Coro instances enqueued directly to the Executor’s run_queue_ &#8212; no Task objects, no Scheduler overhead, no when_all().

spawn() always returns SpawnFuture<T>. For void coroutines the return value can be ignored (fire-and-forget) or co_await’d to wait for that specific coroutine to complete.

Usage:

auto task = make_task([](CoroScope& scope) -> coro::CoroTask<void> {
    // Fire-and-forget (return value ignored):
    scope.spawn([](CoroScope& s) -> coro::CoroTask<void> {
        co_return;
    });

    // Await a void spawn:
    co_await scope.spawn([](CoroScope& s) -> coro::CoroTask<void> {
        // do work
        co_return;
    });

    // Await a typed spawn:
    int result = co_await scope.spawn(
        [](CoroScope& s) -> coro::CoroTask<int> {
            co_return 42;
        });
    co_return;
});

Public Functions

inline explicit CoroScope(Executor *executor)
inline CoroScope(Executor *executor, std::shared_ptr<std::atomic<bool>> cancellation_token)

Construct with an inherited cancellation token (for child scopes spawned from a parent &#8212; shares the parent’s cancellation signal).

inline ~CoroScope()
CoroScope(const CoroScope&) = delete
CoroScope &operator=(const CoroScope&) = delete
CoroScope(CoroScope&&) = delete
CoroScope &operator=(CoroScope&&) = delete
template<typename Func, typename R = typename std::invoke_result_t<Func, CoroScope&>::value_type, std::enable_if_t<std::is_void_v<R>, int> = 0>
inline coro::SpawnFuture<void> spawn(Func &&func)

Spawn a void coroutine on the executor’s run queue.

The lambda receives CoroScope& and returns CoroTask<void>. Internally wrapped in a lightweight Coro and enqueued directly.

Returns SpawnFuture<void> that can be co_await’d to wait for this specific coroutine to complete. The return value can be safely ignored for fire-and-forget usage.

The captureless-lambda-with-parameters pattern ensures coroutine parameters are copied into the coroutine frame, avoiding the dangling-capture bug.

Usage:

// Fire-and-forget (existing usage, still works):
scope.spawn([](CoroScope& s) -> coro::CoroTask<void> {
    co_return;
});

// Awaitable (new):
co_await scope.spawn([](CoroScope& s) -> coro::CoroTask<void> {
    // do work
    co_return;
});

template<typename Func, typename R = typename std::invoke_result_t<Func, CoroScope&>::value_type, std::enable_if_t<!std::is_void_v<R>, int> = 0>
inline coro::SpawnFuture<R> spawn(Func &&func)

Spawn a coroutine that returns a typed result.

Returns SpawnFuture<T> that can be co_await’d to retrieve the result. One heap allocation (shared_ptr<SharedState<T>>).

Usage:

auto future = scope.spawn([](CoroScope& s) -> coro::CoroTask<int> {
    co_return 42;
});
int val = co_await future;

template<typename UtilityT, typename InputT, typename DecayedUtility = std::remove_reference_t<UtilityT>, typename R = typename DecayedUtility::Output, std::enable_if_t<utilities::detail::has_process_v<DecayedUtility, InputT, R>, int> = 0>
inline coro::SpawnFuture<R> spawn(UtilityT &utility, InputT input)
template<typename T>
inline auto receive(coro::Channel<T> &channel)

Async receive from channel.

template<typename T>
inline auto receive(std::shared_ptr<coro::Channel<T>> channel)

Async receive from channel (shared_ptr).

template<typename T, typename Func>
inline void spawn_producer(coro::Channel<T> &channel, Func &&generator_func)

Spawn producer feeding Generator<T> into Channel<T> (reference).

template<typename T, typename Func>
inline void spawn_producer(std::shared_ptr<coro::Channel<T>> channel, Func &&generator_func)

Spawn producer feeding Generator<T> into Channel<T> (shared_ptr).

template<typename T, typename Func>
inline void spawn_async_producer(coro::Channel<T> &channel, Func &&async_generator_func)

Spawn async producer feeding AsyncGenerator<T> into Channel<T> (reference).

template<typename T, typename Func>
inline void spawn_async_producer(std::shared_ptr<coro::Channel<T>> channel, Func &&async_generator_func)

Spawn async producer feeding AsyncGenerator<T> into Channel<T> (shared_ptr).

template<typename T, typename Func>
inline void spawn_producers(std::shared_ptr<coro::Channel<T>> channel, std::size_t count, Func &&producer_func)

Spawn N producers (shared_ptr).

template<typename T, typename Func>
inline void spawn_producers(coro::Channel<T> &channel, std::size_t count, Func &&producer_func)

Spawn N producers (reference).

template<typename T, typename Func>
inline void spawn_consumers(coro::Channel<T> &channel, std::size_t count, Func &&consumer_func)

Spawn N consumers draining Channel<T> (reference).

template<typename T, typename Func>
inline void spawn_consumers(std::shared_ptr<coro::Channel<T>> channel, std::size_t count, Func &&consumer_func)

Spawn N consumers draining Channel<T> (shared_ptr).

template<typename TIn, typename TOut, typename Func>
inline void spawn_transforms(std::shared_ptr<coro::Channel<TIn>> input, std::shared_ptr<coro::Channel<TOut>> output, std::size_t count, Func &&transform_func)

Spawn N transform workers (shared_ptr).

template<typename TIn, typename TOut, typename Func>
inline void spawn_transforms(coro::Channel<TIn> &input, coro::Channel<TOut> &output, std::size_t count, Func &&transform_func)

Spawn N transform workers (reference).

inline coro::CoroTask<void> join()

Wait for all spawned coroutines to complete. Must be called before CoroScope is destroyed. Idempotent: calling join() a second time is a no-op.

inline coro::CoroTask<void> join_all()

Alias for join() &#8212; backward compatibility with old join_all() callers.

inline std::size_t size() const
inline bool is_joined() const
inline Executor *get_executor() const
template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > coro::CoroTask< void > coro_scope (Func &&scope_func)

Create a child scope, run the lambda, and auto-join. Compatibility bridge for code using ctx.coro_scope(…).

template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > coro::CoroTask< void > scope (Func &&scope_func)

Compatibility bridge for code using ctx.scope(…). Same as coro_scope().

inline bool is_cancellation_requested() const
inline void request_cancellation()
inline std::shared_ptr<std::atomic<bool>> get_cancellation_token() const
struct DefaultReset

Public Functions

template<typename T>
inline void operator()(T &v) const
class Env

Public Static Functions

template<typename T = std::string_view>
static std::optional<T> get(std::string_view name)
static int rocksdb_max_open_files()
template<>
static std::optional<int> get(std::string_view name)
class Executor

Executor - Executes tasks from queue using worker thread pool

Features:

  • Large thread pool for CPU/IO-bound work

  • Pulls tasks from queue

  • Executes task functions

  • Notifies scheduler on completion via callback

Thread pool size: N threads (default: hardware_concurrency)

Public Types

using CompletionCallback = std::function<void(std::shared_ptr<Task>)>

Public Functions

explicit Executor(const ExecutorConfig &config = {})

Constructor

~Executor()
Executor(const Executor&) = delete
Executor &operator=(const Executor&) = delete
Executor(Executor&&) = delete
Executor &operator=(Executor&&) = delete
void start()

Start the executor (spawn worker threads)

void shutdown()

Shutdown the executor gracefully

void reset()

Reset the executor (prepare for new execution)

void set_completion_callback(CompletionCallback callback)

Set completion callback (called when task finishes)

inline void set_scheduler(Scheduler *scheduler)

Set scheduler reference (for CoroScope)

inline TimerService &get_timer_service()

Get timer service for timeout operations

inline bool is_running() const

Check if executor is running

inline std::size_t get_num_threads() const

Get number of worker threads

inline std::size_t get_io_pool_size() const
inline bool has_io_backend() const noexcept

Check if an I/O backend is available

inline io::IoBackend &io_backend()

Get the I/O backend (must check has_io_backend() first)

inline const io::IoBackend &io_backend() const
void request_shutdown()

Request graceful shutdown Stops accepting new tasks and waits for current tasks to complete

inline bool is_shutdown_requested() const

Check if shutdown was requested

bool is_responsive() const

Check if executor is responsive (making progress)

Used by watchdog to detect if executor is hung. Returns false if executor appears to be stuck or unresponsive.

ExecutorProgress get_progress() const

Get full progress report

void schedule_coroutine_resumption(std::coroutine_handle<> handle)

Schedule a coroutine handle to be resumed on the executor’s thread pool This is a lightweight operation that submits the resumption as work

This is useful for when_all and other coroutine combinators that need to resume coroutines from completion callbacks without directly calling .resume()

Parameters:

handle – The coroutine handle to resume

void enqueue(std::coroutine_handle<> handle)

Enqueue a coroutine handle for execution on the thread pool. This is the primary submission method for goroutines &#8212; all lightweight work funnels through here. Cost: ~20ns (lock-free queue push + atomic signal).

Parameters:

handle – The coroutine handle to resume

TaskIndex enqueue_tracked(coro::Coro coro, std::string name, std::shared_ptr<std::atomic<TaskIndex>> tid_out = nullptr)

Enqueue a Coro with progress tracking in task_registry_.

void mark_coro_completed(TaskIndex id)
void submit_task(std::shared_ptr<Task> task, std::shared_ptr<std::any> input, TaskIndex parent_task_id = -1)

Submit a Task for execution via a Coro (Phase 3 path). Creates a run_task() Coro, registers in task_registry_, and enqueues the released handle to run_queue_.

Parameters:
  • task – The DAG task to execute

  • inputTask input

  • parent_task_id – Parent task ID for tracking (-1 for root)

void schedule_destroy(std::coroutine_handle<> handle)

Schedule a completed Coro handle for deferred destruction. Called from CoroPromise::FinalAwaiter for released handles.

Parameters:

handle – The coroutine handle at final_suspend

Public Static Functions

static Executor *current() noexcept

Get the executor running on the current worker thread (nullptr if the calling thread is not a worker). Thread-local.

static Executor *set_current(Executor *e) noexcept

Set the current-thread executor TLS, returning the old value. Used by CoroTask::get() to suppress async I/O submission when driving a coroutine synchronously.

Friends

friend struct coro::CoroPromise
struct ExecutorConfig

Public Members

std::size_t num_threads = 0
std::chrono::seconds idle_timeout = {5}
std::chrono::seconds deadlock_timeout = {10}
std::size_t io_pool_size = 0
io::IoBackendType io_backend_type = io::IoBackendType::AUTO
unsigned io_batch_threshold = 16
struct ExecutorProgress

Executor progress report

Public Members

std::size_t total_tasks_submitted
std::size_t tasks_queued
std::size_t tasks_running
std::size_t tasks_completed
std::size_t tasks_failed
std::vector<std::size_t> worker_queue_depths
std::vector<TaskProgress> root_tasks
std::vector<WorkerStatus> workers
std::vector<std::pair<TaskIndex, std::string>> recent_errors
struct WorkerStatus

Public Members

std::size_t worker_id
bool is_idle
std::optional<TaskIndex> current_task_id
std::string current_task_name
std::size_t local_queue_depth
class MutableByteView

Non-owning mutable view over a contiguous byte range.

For buffers that need to be written into (e.g., inflate output buffers). Implicitly converts to ByteView for reading.

Public Functions

MutableByteView() = default
inline MutableByteView(std::byte *data, std::size_t len)
inline MutableByteView(unsigned char *data, std::size_t len)
inline MutableByteView(char *data, std::size_t len)
inline MutableByteView(std::vector<unsigned char> &v)
inline MutableByteView(std::string &s)
inline std::byte *data() const
inline std::size_t size() const
inline bool empty() const
template<typename T>
inline T *as() const
inline operator ByteView() const
struct NoOpReset

Public Functions

template<typename T>
inline void operator()(T&) const
class NoOpTask : public dftracer::utils::Task

NoOpTask - Pass-through task for multiple source or destination nodes

This task is automatically created by Pipeline when multiple source or destination tasks are specified. It simply acts as a synchronization point.

Purpose:

  • Ensures every pipeline has exactly one source/destination node

  • Provides trackability for multiple independent starting/ending points

  • Returns void to bypass type validation (works with any parent types)

Public Functions

inline explicit NoOpTask(std::string name = "__noop__")
virtual ~NoOpTask() = default
class ObjectPool

Public Functions

inline void *allocate(std::size_t size)
inline void deallocate(void *block, std::size_t size)
ObjectPool(const ObjectPool&) = delete
ObjectPool &operator=(const ObjectPool&) = delete

Public Static Functions

static inline ObjectPool &instance()
class Pipeline

Pipeline - DAG container and orchestrator

Features:

  • Holds source and destination tasks

  • Validates DAG before execution (reachability, types, cycles)

  • Delegates execution to scheduler/executor

  • Automatically creates NoOpTask for multiple sources

Public Functions

explicit Pipeline(const PipelineConfig &config = PipelineConfig::default_config())

Constructor with configuration manager

Parameters:

configPipeline configuration (includes name, threads, etc.)

~Pipeline()
Pipeline(const Pipeline&) = delete
Pipeline &operator=(const Pipeline&) = delete
Pipeline(Pipeline&&) = delete
Pipeline &operator=(Pipeline&&) = delete
void set_source(std::shared_ptr<Task> source)

Set source task (single task)

void set_source(std::initializer_list<std::shared_ptr<Task>> sources)

Set multiple source tasks (initializer list - auto-creates NoOpTask as parent)

void set_source(const std::vector<std::shared_ptr<Task>> &sources)

Set multiple source tasks (vector - auto-creates NoOpTask as parent)

template<typename ...Tasks>
inline auto set_source(Tasks&&... sources) -> std::enable_if_t<(sizeof...(Tasks) > 1) && (std::is_convertible_v<Tasks, std::shared_ptr<Task>> && ...)>

Set multiple source tasks (variadic - auto-creates NoOpTask as parent)

void set_destination(std::shared_ptr<Task> destination)

Set destination task (single task - optional, if nullptr all terminal tasks are destinations)

void set_destination(std::initializer_list<std::shared_ptr<Task>> destinations)

Set multiple destination tasks (initializer list - auto-creates NoOpTask as child)

void set_destination(const std::vector<std::shared_ptr<Task>> &destinations)

Set multiple destination tasks (vector - auto-creates NoOpTask as child)

template<typename ...Tasks>
inline auto set_destination(Tasks&&... destinations) -> std::enable_if_t<(sizeof...(Tasks) > 1) && (std::is_convertible_v<Tasks, std::shared_ptr<Task>> && ...)>

Set multiple destination tasks (variadic - auto-creates NoOpTask as child)

bool validate()

Validate pipeline before execution

  • Check reachability from source to destination

  • Check type compatibility

  • Check for cycles

PipelineOutput execute(const std::any &input = std::any{})

Execute the pipeline

Parameters:

input – Initial input for source task (defaults to empty)

Returns:

Output from terminal tasks

template<typename T, typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, std::any>>>
inline PipelineOutput execute(T &&input)

Execute the pipeline with typed input

Template Parameters:

T – Input type

Parameters:

input – Initial input for source task

Returns:

Output from terminal tasks

void set_error_policy(ErrorPolicy policy)

Set error handling policy

void set_progress_callback(std::function<void(std::size_t completed, std::size_t total)> callback)

Set progress callback

inline const std::string &get_name() const

Get pipeline name

inline std::shared_ptr<Task> get_source() const

Get source task

inline std::shared_ptr<Task> get_destination() const

Get destination task

inline const std::vector<std::shared_ptr<Task>> &get_all_tasks() const

Get all tasks in the pipeline

struct PipelineConfig

Configuration for Pipeline execution

Thread Architecture:

  • Executor threads: Worker pool that executes task functions (compute threads)

  • Watchdog: Optional monitoring thread for hang detection

  • Watchdog: Optional monitoring thread for hang detection

Usage (Fluent API): auto config = PipelineConfig() .with_name(“MyPipeline”) .with_compute_threads(16) // CPU-bound work .with_error_policy(ErrorPolicy::FAIL_FAST) .with_error_policy(ErrorPolicy::FAIL_FAST) .with_watchdog(true) .with_global_timeout(std::chrono::seconds(30)) .with_task_timeout(std::chrono::seconds(10));

Public Functions

inline PipelineConfig &with_name(std::string pipeline_name)

Set pipeline name

inline PipelineConfig &with_compute_threads(std::size_t threads)

Set number of compute threads (worker threads for CPU-bound work) 0 = hardware_concurrency (default)

inline PipelineConfig &with_error_policy(ErrorPolicy policy)

Set error handling policy

inline PipelineConfig &with_error_handler(ErrorHandler handler)

Set custom error handler (automatically sets policy to CUSTOM)

inline PipelineConfig &with_watchdog(bool enabled)

Enable/disable watchdog

inline PipelineConfig &with_global_timeout(std::chrono::seconds timeout)

Set global timeout (0 = wait forever)

inline PipelineConfig &with_task_timeout(std::chrono::seconds timeout)

Set default task timeout (0 = wait forever)

inline PipelineConfig &with_watchdog_interval(std::chrono::seconds interval)

Set watchdog check interval

inline PipelineConfig &with_warning_threshold(std::chrono::seconds threshold)

Set long-running task warning threshold

inline PipelineConfig &with_executor_idle_timeout(std::chrono::seconds timeout)

Set executor idle timeout

inline PipelineConfig &with_executor_deadlock_timeout(std::chrono::seconds timeout)

Set executor deadlock timeout

inline PipelineConfig &with_timeslice(std::chrono::microseconds duration)

Set coroutine timeslice duration (0 = disable automatic yielding)

inline PipelineConfig &with_io_threads(std::size_t count)

Set I/O thread pool size (used by thread pool and epoll backends)

inline PipelineConfig &with_io_backend(io::IoBackendType type)

Force a specific I/O backend (AUTO = runtime detection)

inline PipelineConfig &with_io_batch_size(unsigned threshold)

Set I/O batch threshold for batched SQE submission. When pending ops reach this count, they are flushed in one syscall. 0 = per-op submission (no batching).

Public Members

std::string name = ""
std::size_t executor_threads = 0
ErrorPolicy error_policy = ErrorPolicy::FAIL_FAST
ErrorHandler error_handler = nullptr
bool enable_watchdog = true
std::chrono::seconds global_timeout = {0}
std::chrono::seconds default_task_timeout = {0}
std::chrono::seconds watchdog_interval = {1}
std::chrono::seconds long_task_warning_threshold{300}
std::chrono::seconds executor_idle_timeout{300}
std::chrono::seconds executor_deadlock_timeout{600}
std::chrono::microseconds timeslice_duration{10'000}
std::size_t io_thread_count = 0
io::IoBackendType io_backend_type = io::IoBackendType::AUTO
unsigned io_batch_threshold = 16

Public Static Functions

static inline PipelineConfig sequential()

Create sequential execution configuration (1 thread)

static inline PipelineConfig parallel(std::size_t num_threads = 0)

Create parallel execution configuration

static inline PipelineConfig default_config()

Create default configuration

static inline PipelineConfig with_timeouts(std::size_t num_threads = 0, std::chrono::seconds global_timeout = std::chrono::seconds(60), std::chrono::seconds task_timeout = std::chrono::seconds(30))

Create configuration with timeouts

class PipelineError : public std::runtime_error

Public Types

enum Type

Values:

enumerator TYPE_MISMATCH
enumerator TYPE_MISMATCH_ERROR
enumerator VALIDATION_ERROR
enumerator EXECUTION_ERROR
enumerator INITIALIZATION_ERROR
enumerator OUTPUT_CONVERSION_ERROR
enumerator TIMEOUT_ERROR
enumerator INTERRUPTED
enumerator EXECUTOR_UNRESPONSIVE
enumerator UNKNOWN_ERROR

Public Functions

inline PipelineError(Type type, const std::string &message)
inline Type get_type() const
struct PipelineOutput : public std::unordered_map<TaskIndex, std::any>

Public Functions

inline operator std::any() const
inline std::any get() const
template<typename T>
inline T get() const
inline std::any get(TaskIndex id) const
template<typename T>
inline T get(TaskIndex id) const
inline std::any first() const
template<typename T>
inline T first() const
class Runtime

Lightweight wrapper around Executor + Watchdog for running coroutines on a thread pool without Pipeline/Scheduler/DAG overhead. Intended for Python bindings and other non-DAG consumers.

Public Functions

explicit Runtime(std::size_t threads = 0)
explicit Runtime(const ExecutorConfig &config, bool enable_watchdog = true)
Runtime(const ExecutorConfig &config, std::unique_ptr<Watchdog> watchdog)
~Runtime()
Runtime(const Runtime&) = delete
Runtime &operator=(const Runtime&) = delete
Runtime(Runtime&&) = delete
Runtime &operator=(Runtime&&) = delete
TaskHandle submit(coro::CoroTask<void> task, std::string name = "")

Async submit returns immediately, task runs on executor.

template<typename T>
TypedTaskHandle<T> submit(coro::CoroTask<T> task, std::string name = "")

Async submit with typed result.

template<typename Func> inline requires std::is_invocable_r_v< coro::CoroTask< void >, Func, CoroScope & > TaskHandle scope (std::string name, Func &&func)

Submit a scoped task (provides CoroScope to the lambda). Returns immediately, task runs on executor.

Usage:

auto handle = rt->scope("my_task", [](CoroScope& scope) ->
CoroTask<void> {
    scope.spawn([](CoroScope& s) -> CoroTask<void> { co_return; });
    co_await scope.join();
});
handle.get();  // wait when needed

template<typename UtilityT, typename InputT, typename DecayedUtility = std::remove_reference_t<UtilityT>> inline requires utilities::has_tag_v< utilities::tags::NeedsContext, DecayedUtility > TaskHandle scope (std::string name, UtilityT &utility, InputT input)

Submit a NeedsContext utility with automatic context injection.

Usage:

AggregatorUtility util;
rt->scope("aggregator", util, input).get();

void wait_all()

Wait for all outstanding tasks to complete.

ExecutorProgress get_progress() const
bool is_responsive() const
void set_global_timeout(std::chrono::milliseconds timeout)
void set_default_task_timeout(std::chrono::milliseconds timeout)
void shutdown()
std::size_t threads() const
std::size_t io_threads() const
inline Executor *executor()
inline Watchdog *watchdog()
class Scheduler

Scheduler - Manages task scheduling and dependency tracking

Tasks are submitted directly to the executor from the calling thread (source task) or from worker threads (child tasks on completion). No dedicated scheduling thread &#8212; fully event-driven, push-based.

Features:

  • Traverses DAG and checks dependencies

  • Submits ready tasks directly to executor

  • Tracks completed tasks

  • Handles error policies

  • Supports dynamic task submission

Public Functions

explicit Scheduler(Executor *executor)
Scheduler(Executor *executor, Watchdog *watchdog, const PipelineConfig &config)
~Scheduler()
Scheduler(const Scheduler&) = delete
Scheduler &operator=(const Scheduler&) = delete
Scheduler(Scheduler&&) = delete
Scheduler &operator=(Scheduler&&) = delete
void schedule(std::shared_ptr<Task> source, const std::any &input = {})

Schedule execution starting from source task

Parameters:
  • source – Starting task

  • input – Initial input

template<typename T, typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, std::any>>>
inline void schedule(std::shared_ptr<Task> source, T &&input)

Schedule execution starting from source task

Parameters:
  • source – Starting task

  • input – Initial input

void on_task_completed(std::shared_ptr<Task> task)

Called by executor when a task completes

Parameters:

task – Completed task

void submit_dynamic_task(std::shared_ptr<Task> task, const std::any &input)

Submit a dynamic task (for intra-task parallelism)

Parameters:
  • taskTask to submit

  • inputTask input

void set_error_policy(ErrorPolicy policy)

Set error handling policy

void set_error_handler(ErrorHandler handler)

Set custom error handler

void set_progress_callback(std::function<void(std::size_t completed, std::size_t total)> callback)

Set progress callback

void set_global_timeout(std::chrono::milliseconds timeout)

Set global timeout (0 = wait forever)

void set_default_task_timeout(std::chrono::milliseconds timeout)

Set default task timeout (0 = wait forever)

void request_shutdown()

Request graceful shutdown

inline bool is_shutdown_requested() const

Check if shutdown was requested

inline Watchdog *get_watchdog()

Get watchdog (for configuration)

void reset()

Reset scheduler state

inline bool is_running() const

Check if scheduler is running

inline Executor *get_executor() const

Get executor reference (for TaskFuture async suspension)

void register_task_completion_callback(TaskIndex task_id, std::function<void()> callback)

Register callback to be invoked when task completes

Uses ShardedMutex for minimal contention. Multiple tasks can register callbacks concurrently without blocking.

Thread-safe: Only locks the specific shard for this task_id. Other tasks registering callbacks concurrently don’t block.

Usage:

// Called from TaskFuture::await_suspend()
scheduler->register_task_completion_callback(task_id, [awaiting]() {
    awaiting.resume();  // Resume awaiting coroutine
});

Parameters:
  • task_idTask identifier

  • callback – Function to invoke on completion: []() { … }

void invoke_completion_callbacks(TaskIndex task_id)

Invoke completion callbacks for a task

Only locks the specific shard for this task_id. Other tasks completing concurrently don’t block.

Called by on_task_completed() when a task finishes. Invokes all registered callbacks (e.g., resuming awaiting coroutines).

Parameters:

task_idTask identifier

struct ScopedFd

Public Functions

ScopedFd() = default
inline explicit ScopedFd(int fd)
ScopedFd(const ScopedFd&) = delete
ScopedFd &operator=(const ScopedFd&) = delete
inline ScopedFd(ScopedFd &&other) noexcept
inline ScopedFd &operator=(ScopedFd &&other) noexcept
inline ~ScopedFd()
inline void reset()
inline int get() const

Public Members

int value = -1
template<typename Map, typename Hash, std::size_t NUM_SHARDS = 64>
class ShardedMap : public dftracer::utils::ShardedMutex<Map, 64>

Thread-safe sharded map extending ShardedMutex with map ergonomics.

Adds key-based shard selection (via Hash), operator[], and emplace. The underlying ShardedMutex handles locking and shard storage.

Template Parameters:
  • Map – The underlying map type per shard

  • Hash – Hash function for map keys

  • NUM_SHARDS – Number of shards (power of 2)

Public Types

using key_type = typename Map::key_type
using mapped_type = typename Map::mapped_type

Public Functions

ShardedMap() = default
template<typename Fn>
inline void with_shard(const key_type &key, Fn &&fn)
template<typename Fn>
inline void with_shard(const key_type &key, Fn &&fn) const
inline mapped_type &operator[](const key_type &key)
template<typename ...Args>
inline auto emplace(const key_type &key, Args&&... args)
template<typename T, std::size_t NUM_SHARDS = 64>
class ShardedMutex

ShardedMutex<T>

Generic sharded mutex pattern for reducing lock contention

Usage:

ShardedMutex<std::unordered_map<int, Callback>> callbacks;

// Write to shard
callbacks.with_shard(task_id, [&](auto& map) {
    map[task_id].push_back(callback);
});

// Read from shard
callbacks.with_shard(task_id, [&](const auto& map) {
    if (auto it = map.find(task_id); it != map.end()) {
        process(it->second);
    }
});

Template Parameters:
  • T – Type of data stored per shard

  • NUM_SHARDS – Number of shards (must be power of 2)

Public Functions

ShardedMutex() = default
inline ShardedMutex(ShardedMutex &&other) noexcept
inline ShardedMutex &operator=(ShardedMutex &&other) noexcept
ShardedMutex(const ShardedMutex&) = delete
ShardedMutex &operator=(const ShardedMutex&) = delete
template<typename Func>
inline void with_shard(std::size_t key, Func &&func)

Execute function with exclusive access to shard

Thread-safe: Only locks the specific shard, other threads can access different shards concurrently.

Parameters:
  • key – Shard key (typically task ID, hash, etc.)

  • func – Function to execute: [](T& data) { … }

template<typename Func>
inline void with_shard(std::size_t key, Func &&func) const

Execute function with shared (const) access to shard

Parameters:
  • key – Shard key

  • func – Function to execute: [](const T& data) { … }

template<typename Func>
inline void for_each_shard(Func &&func)

Execute function on ALL shards sequentially Useful for cleanup, aggregation, or global operations

Acquires locks sequentially to avoid deadlock. Not thread-safe with respect to other operations!

Parameters:

func – Function to execute: [](T& data) { … }

template<typename Func>
inline void for_each_shard(Func &&func) const

Execute function on ALL shards sequentially (const version)

template<typename Func>
inline void for_each_shard_unlocked(Func &&func)

Execute function on ALL shards without locking. Only safe when no concurrent access is possible.

template<typename Func>
inline void for_each_shard_unlocked(Func &&func) const
template<typename Func>
inline bool try_with_shard(std::size_t key, Func &&func)

Try to execute function with exclusive access to shard (non-blocking)

Parameters:
  • key – Shard key

  • func – Function to execute if lock acquired

Returns:

true if function was executed, false if lock not acquired

template<typename U = T>
inline auto clear() -> decltype(std::declval<U>().clear(), void())

Clear all shards (calls .clear() on each shard’s data) Only available if T has a .clear() method

template<typename U = T>
inline auto size() const -> decltype(std::declval<U>().size())

Get approximate total size across all shards Only available if T has a .size() method

Note: Not thread-safe Result is approximate due to concurrent modifications.

template<typename U = T>
inline auto empty() const -> decltype(std::declval<U>().empty())

Check if all shards are empty Only available if T has an .empty() method

Public Static Functions

static inline constexpr std::size_t num_shards() noexcept

Get number of shards (compile-time constant)

class StringIntern

Public Functions

inline StringIntern()
inline ~StringIntern()
StringIntern(const StringIntern&) = delete
StringIntern &operator=(const StringIntern&) = delete
StringIntern(StringIntern&&) = delete
StringIntern &operator=(StringIntern&&) = delete
inline std::uint32_t get_or_insert(std::string_view sv)
inline void insert_at_id(std::uint32_t id, std::string_view sv)

Insert or look up a string at a specific id (for loading a persisted dictionary where ids must be preserved). If the id already holds a different string, the existing binding wins (caller error -> ignored). Safe to call concurrently with other inserts; must be called before any resolve(id) at that id.

inline std::string_view resolve(std::uint32_t id) const
inline std::string_view intern(std::string_view sv)
inline std::size_t size() const
inline void reserve_id_base(std::uint32_t base) noexcept

Shift the next-to-assign id counter to base. Subsequent get_or_insert calls allocate ids starting at base. Must be called before any get_or_insert on this instance. Lock-free: caller ensures no concurrent inserts.

inline void enable_deterministic_ids() noexcept

Enable deterministic-hash id assignment. When set, get_or_insert returns a stable id derived from the string’s content rather than a sequential counter. Same string -> same id in every process, regardless of insertion order. Intended for multi-process workflows (e.g. MPI ranks) where keys that include string ids must be identical across ranks so RocksDB merge operators can combine operands for the same logical key.

Collision handling: the 32-bit id is hash(str) & 0x7FFFFFFF to stay within FAST_CAPACITY-reachable range on lookup when id < FAST_CAPACITY. Different strings with the same id are chained in the bucket and lookup resolves by string equality, but resolve(id) can only return one of them. For the typical dftracer workload (cat/name/hhash/fhash dictionaries with O(1000) entries) birthday collisions are negligible.

Must be called before any get_or_insert.

Public Static Attributes

static constexpr std::size_t FAST_CAPACITY = 1u << 20
class Task : public std::enable_shared_from_this<Task>

Task - Self-contained DAG node with dependencies

Features:

  • Fluent API for building DAG (.depends_on())

  • Owns TaskResult for lightweight result retrieval

  • Knows parents and children (DAG structure)

  • Immutable after construction (blueprint pattern)

  • Type validation during edge creation

  • Supports automatic tuple packing for multiple parents

Subclassed by dftracer::utils::NoOpTask, dftracer::utils::TypedTask< I, O >

Public Functions

template<typename Func>
inline explicit Task(Func &&func, std::string_view name = "", std::source_location loc = std::source_location::current())

Constructor with function that takes input and CoroScope

virtual ~Task() = default
Task(const Task&) = delete
Task &operator=(const Task&) = delete
Task(Task&&) = delete
Task &operator=(Task&&) = delete
std::shared_ptr<Task> depends_on(std::shared_ptr<Task> parent)

Add a single parent dependency

std::shared_ptr<Task> depends_on(std::initializer_list<std::shared_ptr<Task>> parents)

Add multiple parent dependencies (initializer list) Usage: task->depends_on({parent1, parent2, parent3, …})

template<typename T1, typename T2, typename ...Rest>
inline std::shared_ptr<Task> depends_on(T1 &&parent1, T2 &&parent2, Rest&&... rest)

Add multiple parent dependencies (variadic, 2+ parents) Usage: task->depends_on(parent1, parent2, parent3, …)

std::shared_ptr<Task> with_combiner(std::function<std::any(const std::vector<std::any>&)> combiner)

Set custom input combiner for multiple parents Accepts a function that takes std::vector<std::any>&

template<typename ...Args>
std::shared_ptr<Task> with_combiner(std::function<std::any(Args...)> combiner)

Set custom input combiner for multiple parents (tuple-based with std::function) Accepts a function that takes specific types Example: with_combiner(std::function<std::any(int, std::string)>([](int a, std::string b) { … }))

template<typename Func>
auto with_combiner(Func &&combiner) -> std::enable_if_t<!std::is_same_v<std::decay_t<Func>, std::function<std::any(const std::vector<std::any>&)>>, std::shared_ptr<Task>>

Set custom input combiner for multiple parents (lambda/callable) Automatically deduces argument types from lambda Example: task->with_combiner([](int a, std::string b) -> std::any { return a + b.size(); })

std::shared_ptr<Task> with_name(std::string name)

Set task name

template<typename T>
inline std::shared_ptr<Task> with_input(T &&input)

Set initial input for this task This allows providing input directly to a task without using dependencies

Template Parameters:

T – Input type (automatically deduced)

Parameters:

input – The input value (will be converted to std::any internally)

inline std::shared_ptr<Task> with_timeout(std::chrono::milliseconds timeout)

Set timeout for this task

Parameters:

timeout – Timeout duration (0 = no timeout, wait forever)

Returns:

This task (for method chaining)

inline TaskIndex get_id() const

Get task ID (pointer address)

template<typename T>
inline T get() const

Get task result with automatic type casting

Template Parameters:

T – The expected result type

Throws:

std::bad_any_cast – if the result cannot be cast to T

Returns:

The task result cast to type T

inline bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) const

Wait for task to complete without retrieving the result

Parameters:

timeout – Timeout duration (0 = wait forever)

Returns:

true if completed, false if timed out

inline const TaskResult &result() const

Access the underlying TaskResult

inline TaskResult &result()
inline TaskResult::WhenReadyAwaitable when_ready()

Coroutine-friendly wait &#8212; co_await task->when_ready()

inline std::vector<std::shared_ptr<Task>> get_parents() const

Get parent tasks (returns a copy for thread safety)

inline std::vector<std::shared_ptr<Task>> get_children() const

Get child tasks (returns a copy for thread safety)

inline bool is_ready() const

Check if all parents have completed

inline bool is_completed() const

Check if task has completed

inline std::type_index get_input_type() const

Get input type

inline std::type_index get_output_type() const

Get output type

inline const char *get_name() const

Get task name. Returns user name if set, otherwise caller’s function name from source_location (const char*, zero allocation).

inline const std::source_location &get_location() const

Get source location where the task was created.

inline bool has_combiner() const

Check if task has custom combiner

inline bool has_initial_input() const

Check if task has initial input set

inline const std::optional<std::any> &get_initial_input() const

Get initial input (if set)

inline void set_initial_input(std::any input)

Set initial input for this task

inline std::chrono::milliseconds get_timeout() const

Get task timeout (0 = no timeout)

inline bool has_timeout() const

Check if task has timeout set

template<typename Func>
inline std::shared_ptr<Task> then(Func &&func, std::string name = "")

Chain operation using then()

- create dependent task with transformation

Usage:

auto task1 = make_task([](CoroScope& ctx) -> CoroTask<int> {
    co_return 42;
});

auto task2 = task1->then([](CoroScope& ctx, int x) ->
CoroTask<std::string> { co_return std::to_string(x * 2);
});

Parameters:
  • func – Transformation function

  • name – Optional name for the new task

Returns:

New task that depends on this task

template<typename Func>
inline std::shared_ptr<Task> tap(Func &&func, std::string name = "")

Tap operation - inspect/log value without transformation Creates a pass-through task that executes a side effect

Usage:

auto pipeline = task1
    ->tap([](CoroScope& ctx, int x) -> CoroTask<void> {
        std::cout << "Value: " << x << "\n";
        co_return;
    }, "log")
    ->then([](CoroScope& ctx, int x) -> CoroTask<int> {
        co_return x * 2;
    });

Parameters:
  • func – Inspection function (doesn’t return a value or returns same type)

  • name – Optional name for the tap task

Returns:

This task (for method chaining)

std::shared_ptr<Task> operator&(std::shared_ptr<Task> other)

Operator& for parallel composition (AND) - creates independent tasks Creates a combiner task that depends on both input tasks

Usage:

auto combined = task1 & task2;  // Both run in parallel
auto [result1, result2] = combined->get<std::tuple<int, std::string>>();

Parameters:
  • other – Second task to run in parallel

  • name – Optional name for the combiner task

Returns:

Task that waits for both and returns tuple of results

std::shared_ptr<Task> operator^(std::shared_ptr<Task> tap_task)

Operator^ for tap/tee composition - send output to both paths Creates a tap task that receives this task’s output as a side effect

Usage:

auto logger = make_task([](CoroScope& ctx, const std::any& x) ->
CoroTask<void> { std::cout << "Value: " << std::any_cast<int>(x) << "\n";
    co_return;
});

auto result = task1 ^ logger;  // task1's output goes to logger as side
effect
// result continues with task1's output type

Parameters:

tap_taskTask to receive the output (runs as side effect)

Returns:

This task (for method chaining), output continues from here

struct TaskHandle

Handle to a submitted void task. Non-blocking by default. Call .wait() to block (re-raises on error) or .done() to check.

Public Functions

inline void get()
inline void wait()
inline bool done() const

Public Members

std::shared_future<void> future
TaskIndex id = {-1}
std::string name
struct TaskInfo

Task information for progress tracking

Public Types

enum State

Values:

enumerator QUEUED
enumerator RUNNING
enumerator WAITING
enumerator COMPLETED
enumerator FAILED
enum Location

Values:

enumerator SHARED_QUEUE
enumerator LOCAL_QUEUE
enumerator EXECUTING
enumerator DONE

Public Members

TaskIndex task_id
TaskIndex parent_task_id
std::string name
std::size_t worker_id
enum dftracer::utils::TaskInfo::State state
std::chrono::steady_clock::time_point queued_at
std::chrono::steady_clock::time_point started_at
std::chrono::steady_clock::time_point completed_at
std::vector<TaskIndex> child_task_ids
std::atomic<std::size_t> completed_children = {0}
std::string error_message
enum dftracer::utils::TaskInfo::Location location
struct TaskProgress

Task progress information

Public Members

TaskIndex task_id
std::string name
std::string state
double queued_duration_ms
double execution_duration_ms
std::size_t total_subtasks
std::size_t completed_subtasks
double progress_percentage
std::string location
std::vector<TaskProgress> children
class TaskResult

Lightweight one-shot result holder for Task. Supports both blocking wait (tests, foreground) and co_await (runtime). ~48 bytes embedded in Task. No heap allocation for shared state.

Public Types

enum class State : std::uint8_t

Values:

enumerator pending
enumerator running
enumerator value
enumerator exception
enumerator cancelled

Public Functions

TaskResult() = default
~TaskResult() = default
TaskResult(const TaskResult&) = delete
TaskResult &operator=(const TaskResult&) = delete
TaskResult(TaskResult&&) = delete
TaskResult &operator=(TaskResult&&) = delete
void set_value(std::any value)
void set_exception(std::exception_ptr ex)
void set_cancelled()
void mark_running()
void add_reader()

Register that one more consumer will read the value. Called by Task::depends_on() &#8212; one per child edge.

void release_reader()

Signal that one consumer is done with the value. Called by Scheduler after preparing a child’s input. When last reader releases, value_ is cleared (memory freed). Terminal tasks (no children) have reader_count_==0 and are never auto-released &#8212; their value persists for user get().

bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) const

Block until ready. Returns false on timeout. timeout of 0ms means wait forever.

std::any get() const

Block until ready, return copy of value. Throws if exception/cancelled. Safe to call concurrently with release_reader() (serialized by mutex).

std::any get_ready() const

Return value without blocking. Asserts ready state. Use only when caller KNOWS task is complete (e.g., scheduler processing a ready child whose parent is guaranteed complete). Returns a copy (value may be released after this call).

std::exception_ptr get_exception() const

Return exception without blocking. nullptr if no exception.

bool is_ready() const
bool has_exception() const
bool is_cancelled() const
State state() const
inline WhenReadyAwaitable when_ready()
struct WhenReadyAwaitable

Awaitable that suspends caller until result is ready. If already ready, resumes immediately.

Public Functions

bool await_ready() const noexcept
bool await_suspend(std::coroutine_handle<> h)
void await_resume()

Public Members

TaskResult &result
class TimerService

Public Types

using Callback = std::function<void()>
using TimerId = uint64_t

Public Functions

TimerService() = default
inline ~TimerService()
TimerService(const TimerService&) = delete
TimerService &operator=(const TimerService&) = delete
TimerService(TimerService&&) = delete
TimerService &operator=(TimerService&&) = delete
inline void start()
inline void stop()
template<typename Duration>
inline TimerId register_timeout(Duration duration, Callback callback)
inline void cancel_timeout(TimerId id)
struct TransparentStringEqual

Public Types

using is_transparent = void

Public Functions

inline bool operator()(std::string_view a, std::string_view b) const noexcept
struct TransparentStringHash

Public Types

using is_transparent = void
using is_avalanching = void

Public Functions

inline std::size_t operator()(std::string_view sv) const noexcept
inline std::size_t operator()(const std::string &s) const noexcept
inline std::size_t operator()(const char *s) const noexcept
class TreiberStack

Public Functions

inline TreiberStack() noexcept
inline void push(void *block) noexcept
inline void *pop() noexcept
template<typename I, typename O>
class TypedTask : public dftracer::utils::Task

TypedTask - A type-safe task wrapper with compile-time type checking

This class provides a strongly-typed interface for tasks with known input and output types. It’s an alternative to using lambdas with Task directly.

Example usage:

class MyTask : public TypedTask<int, std::string> { public: std::string apply(CoroScope& ctx, const int& input) override { return “Result: “ + std::to_string(input * 2); } };

auto task = std::make_shared<MyTask>();

Note: For simple cases, prefer using Task with lambdas: auto task = make_task([](int x) { return “Result: “ + std::to_string(x * 2); });

Public Functions

virtual ~TypedTask() = default
template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<!std::is_void_v<I_> && !std::is_void_v<O_>, O_> apply(CoroScope &ctx, const I_ &input)

Type-safe execution method - override this in your subclass

Variants based on whether input/output are void:

  • O apply(CoroScope&, const I&) - Has input and output

  • void apply(CoroScope&, const I&) - Has input, no output

  • O apply(CoroScope&) - No input, has output

  • void apply(CoroScope&) - No input or output

template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<!std::is_void_v<I_> && std::is_void_v<O_>, void> apply(CoroScope &ctx, const I_ &input)
template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<std::is_void_v<I_> && !std::is_void_v<O_>, O_> apply(CoroScope &ctx)
template<typename I_ = I, typename O_ = O>
inline std::enable_if_t<std::is_void_v<I_> && std::is_void_v<O_>, void> apply(CoroScope &ctx)
template<typename T>
struct TypedTaskHandle

Typed handle that can return a value via .get(). .wait() re-raises stored exceptions (same as .get() but discards value).

Public Functions

inline T get()
inline void wait()
inline bool done() const

Public Members

std::shared_future<T> future
TaskIndex id = {-1}
std::string name
class Watchdog

Watchdog - Monitors pipeline execution and enforces timeouts

Responsibilities:

  • Monitor global pipeline timeout

  • Monitor per-task timeouts

  • Detect when executor becomes unresponsive

  • Log warnings for long-running tasks

  • Trigger graceful shutdown on timeout

The watchdog runs in its own thread, independent of the executor and scheduler, so it can detect hangs even if they are stuck.

Public Types

using TimeoutCallback = std::function<void(const std::string &reason)>

Callback when timeout is detected Parameters: reason (string describing what timed out)

using WarningCallback = std::function<void(const std::string &task_name, int64_t elapsed_ms)>

Callback for warnings about long-running tasks Parameters: task name, elapsed time in milliseconds

Public Functions

explicit Watchdog(std::chrono::milliseconds check_interval = std::chrono::milliseconds(100), std::chrono::milliseconds global_timeout = std::chrono::milliseconds(0), std::chrono::milliseconds default_task_timeout = std::chrono::milliseconds(0), std::chrono::milliseconds warning_threshold = std::chrono::milliseconds(10000))

Constructor

Parameters:
  • check_interval – How often to check for timeouts

  • global_timeout – Global pipeline timeout (0 = no timeout)

  • default_task_timeout – Default per-task timeout (0 = no timeout)

  • warning_threshold – Warn about tasks running longer than this

~Watchdog()
Watchdog(const Watchdog&) = delete
Watchdog &operator=(const Watchdog&) = delete
Watchdog(Watchdog&&) = delete
Watchdog &operator=(Watchdog&&) = delete
void start()

Start the watchdog thread

void stop()

Stop the watchdog thread

void mark_execution_start()

Mark the start of pipeline execution

void register_task_start(TaskIndex task_id, std::shared_ptr<Task> task, std::chrono::milliseconds timeout = std::chrono::milliseconds(0))

Register a task as actively executing

Parameters:
  • task_idTask identifier

  • task – Shared pointer to the task

  • timeout – Task-specific timeout (0 = use default or no timeout)

void unregister_task(TaskIndex task_id)

Unregister a task (called when it completes)

Parameters:

task_idTask identifier

void set_timeout_callback(TimeoutCallback callback)

Set callback for timeout events

void set_warning_callback(WarningCallback callback)

Set callback for warning events

void set_executor(Executor *executor)

Set executor reference for responsiveness checks

void set_global_timeout(std::chrono::milliseconds timeout)

Set global timeout

void set_default_task_timeout(std::chrono::milliseconds timeout)

Set default task timeout

inline bool is_running() const

Check if watchdog is running

inline bool is_shutdown_requested() const

Check if shutdown was requested

size_t get_active_task_count() const

Get number of active tasks being monitored

struct TaskExecution

Information about an active task execution

Public Members

std::shared_ptr<Task> task
std::chrono::steady_clock::time_point start_time
std::chrono::milliseconds timeout
bool warning_logged = {false}
class MPIUtils

MPIUtils - Singleton class for MPI utilities

Usage:

// Initialize (call once after MPI_Init)
MPIUtils::instance().initialize();

// Use throughout application int rank = MPIUtils::instance().get_rank(); int size = MPIUtils::instance().get_world_size();

// Cleanup (call before MPI_Finalize if needed) MPIUtils::instance().finalize();

Public Functions

MPIUtils(const MPIUtils&) = delete
MPIUtils &operator=(const MPIUtils&) = delete
MPIUtils(MPIUtils&&) = delete
MPIUtils &operator=(MPIUtils&&) = delete
bool initialize()

Initialize MPI utilities Must be called after MPI_Init

Returns:

true if MPI is available and initialized

void finalize()

Finalize MPI utilities (cleanup internal state) Does NOT call MPI_Finalize - that’s the caller’s responsibility

inline bool is_initialized() const

Check if MPI is initialized and available

bool is_mpi_enabled() const

Check if MPI is enabled (compiled with MPI support)

inline int get_rank() const

Get MPI rank (0 if MPI not initialized)

inline int get_world_size() const

Get MPI world size (1 if MPI not initialized)

inline bool is_root() const

Check if this is the root rank (rank 0)

void barrier()

Barrier - synchronize all ranks

void broadcast_string(std::string &str, int root = 0)

Broadcast a string from root to all ranks

Parameters:
  • str – String to broadcast (modified on non-root ranks)

  • root – Root rank (default 0)

void broadcast_uint32_vector(std::vector<std::uint32_t> &values, int root = 0)

Broadcast a vector of uint32_t from root to all ranks

Parameters:
  • values – Vector to broadcast (modified on non-root ranks)

  • root – Root rank (default 0)

void broadcast_int(int &value, int root = 0)

Broadcast a single integer from root to all ranks

Parameters:
  • value – Integer to broadcast (modified on non-root ranks)

  • root – Root rank (default 0)

void gather_int(int send_value, std::vector<int> &recv_values, int root = 0)

Gather integers from all ranks to root

Parameters:
  • send_value – Value to send from this rank

  • recv_values – Vector to receive values (only valid on root)

  • root – Root rank (default 0)

void gatherv_uint32(const std::vector<std::uint32_t> &send_data, std::vector<std::uint32_t> &recv_data, std::vector<int> &recv_counts, std::vector<int> &displacements, int root = 0)

Gatherv - gather variable-sized data from all ranks to root

Parameters:
  • send_data – Data to send from this rank

  • recv_data – Buffer to receive data (only valid on root)

  • recv_counts – Number of elements from each rank (only valid on root)

  • displacements – Displacements for each rank (only valid on root)

  • root – Root rank (default 0)

void allgather_int(int send_value, std::vector<int> &recv_values)

Allgather - gather data from all ranks to all ranks

Parameters:
  • send_value – Value to send from this rank

  • recv_values – Vector to receive all values (resized to world_size)

void allgatherv_char(const std::vector<char> &send_data, std::vector<char> &recv_data, std::vector<int> &recv_sizes, std::vector<int> &displacements)

Allgatherv - gather variable-sized data from all ranks to all ranks

Parameters:
  • send_data – Data to send from this rank

  • recv_data – Buffer to receive all data

  • recv_sizes – Number of elements from each rank

  • displacements – Displacements for each rank

void reduce_sum_size_t(std::size_t send_value, std::size_t &recv_value, int root = 0)

Reduce to root - sum operation

Parameters:
  • send_value – Value to reduce from this rank

  • recv_value – Result (only valid on root)

  • root – Root rank (default 0)

void reduce_max_double(double send_value, double &recv_value, int root = 0)

Reduce to root - max operation for double

Parameters:
  • send_value – Value to reduce from this rank

  • recv_value – Result (only valid on root)

  • root – Root rank (default 0)

Public Static Functions

static MPIUtils &instance()

Get the singleton instance