Task Graph¶
Namespace: dftracer::utils::task_graph
For usage guide and examples, see Task Graph API.
-
class TaskGraph¶
Builder for constructing task DAGs.
Features:
Fluent API for building task graphs
Optional auto-registration with Pipeline
Support for fan-in, fan-out, transform, reduce patterns
Bi-directional connectivity with external tasks via wrap()
Configurable max_concurrency to limit in-flight parallel tasks
Usage:
Pipeline pipeline(config); auto graph = TaskGraph::builder({.name = "MyGraph", .max_concurrency = 128}); auto readers = graph.parallel<int>(8, reader_fn, {.name = "Reader"}); auto reduced = graph.reduce<int>(readers, split_every{2}, reducer, {.name = "Merge"}); pipeline.set_source(readers.tasks()[0]); pipeline.execute();
Public Functions
Wrap an external task as entry point to the graph
Use this to connect TaskGraph operations to existing barebone tasks.
Add an external task to the graph
Use this to add utility-adapter tasks or other externally created tasks that should be tracked by the graph but weren’t created via graph methods.
-
template<typename T, typename Func>
inline TaskGroup<T> source(Func &&func, TaskGraphSourceConfig opts = {})¶ Create a single source task
-
template<typename T, typename Func>
inline TaskGroup<T> parallel(std::size_t count, Func &&func, TaskGraphParallelConfig opts = {})¶ Create N parallel tasks (no dependencies between them)
Each task receives its index (0 to count-1) as parameter.
When max_concurrency is set (via options or graph-level default), a sliding window dependency chain limits in-flight tasks: task[i] depends on task[i - max_concurrency], so at most max_concurrency tasks execute simultaneously.
-
template<typename U, typename T, typename Func>
inline TaskGroup<U> fan_out(const TaskGroup<T> &source, num_outputs count, Func &&mapper, TaskGraphFanOutConfig opts = {})¶ Fan-out: 1 -> N (one input produces N outputs)
Each output task receives the source output and its index.
When max_concurrency is set, a sliding window dependency chain limits in-flight fan-out tasks.
-
template<typename U, typename T, typename Combiner>
inline TaskGroup<U> fan_in(const TaskGroup<T> &group, Combiner &&combiner, TaskGraphFanInConfig opts = {})¶ Fan-in: M -> 1 (all inputs combine to single output)
-
template<typename U, typename T, typename Combiner>
inline TaskGroup<U> fan_in(const TaskGroup<T> &group, split_every count, Combiner &&combiner, TaskGraphFanInConfig opts = {})¶ Fan-in with grouping: M -> ceil(M/N) (group every N inputs)
-
template<typename U, typename T, typename Mapper>
inline TaskGroup<U> map(const TaskGroup<T> &group, Mapper &&mapper, TaskGraphMapConfig opts = {})¶ Map: 1-to-1 mapping
When max_concurrency is set, a sliding window dependency chain limits in-flight map tasks.
-
template<typename U, typename T, typename Reducer>
inline TaskGroup<U> reduce(const TaskGroup<T> &group, split_every count, Reducer &&reducer, TaskGraphReduceConfig opts = {})¶ Tree reduce: M -> 1 with O(log N) depth
-
template<typename T, typename BinaryOp>
inline TaskGroup<T> fold(const TaskGroup<T> &group, T init, split_every count, BinaryOp &&op, TaskGraphFoldConfig opts = {})¶ Fold: tree reduction with initial value and binary operation
Each node folds its inputs: op(op(op(init, a), b), c)
-
template<typename U, typename Intermediate, typename T, typename MapFn, typename ReduceFn>
inline TaskGroup<U> aggregate(const TaskGroup<T> &group, MapFn &&map_fn, split_every count, ReduceFn &&reduce_fn, TaskGraphAggregateConfig opts = {})¶ Aggregate: map-reduce pattern
Applies mapper to each task (1:1), then reduces the results.
-
template<typename T>
inline TaskGroup<std::vector<T>> partition(const std::vector<T> &data, num_partitions count, TaskGraphPartitionConfig opts = {})¶ Partition: split data into N contiguous chunks
Each task produces its chunk as std::vector<T>.
-
template<typename T>
inline TaskGroup<std::vector<T>> concat_partitions(const TaskGroup<std::vector<T>> &group, split_every count = split_every{2}, TaskGraphConcatConfig opts = {})¶ Concat partitions: combine vector partitions into a single vector
Uses tree reduction to efficiently concatenate.
-
inline std::shared_ptr<Task> first_task() const¶
Get first task in the graph (convenience for setting Pipeline source)
-
inline std::shared_ptr<Task> last_task() const¶
Get last task in the graph (convenience for setting Pipeline destination)
-
inline const std::string &name() const¶
Get graph name
Public Static Functions
-
static inline TaskGraph builder(TaskGraphConfig opts = {})¶
Create a TaskGraph builder with options
-
struct TaskGraphAggregateConfig¶
Config for TaskGraph::aggregate()
Public Members
-
std::string name = "Aggregate"¶
-
std::string name = "Aggregate"¶
-
struct TaskGraphConcatConfig¶
Config for TaskGraph::concat_partitions()
Public Members
-
std::string name = "Concat"¶
-
std::string name = "Concat"¶
-
struct TaskGraphConfig¶
Config for TaskGraph builder.
max_concurrency controls how many parallel/fan-out/map tasks can be in-flight simultaneously. 0 = unlimited (all tasks start as soon as their dependencies are met). When set, individual method configs can override this default.
-
struct TaskGraphFanInConfig¶
Config for TaskGraph::fan_in() (single and grouped overloads)
Public Members
-
std::string name = "FanIn"¶
-
std::string name = "FanIn"¶
-
struct TaskGraphFanOutConfig¶
Config for TaskGraph::fan_out()
max_concurrency overrides the graph-level default when non-zero.
-
struct TaskGraphFoldConfig¶
Config for TaskGraph::fold()
Public Members
-
std::string name = "Fold"¶
-
std::string name = "Fold"¶
-
struct TaskGraphMapConfig¶
Config for TaskGraph::map()
max_concurrency overrides the graph-level default when non-zero.
-
struct TaskGraphParallelConfig¶
Config for TaskGraph::parallel()
max_concurrency overrides the graph-level default when non-zero.
-
struct TaskGraphPartitionConfig¶
Config for TaskGraph::partition()
Public Members
-
std::string name = "Partition"¶
-
std::string name = "Partition"¶
-
struct TaskGraphReduceConfig¶
Config for TaskGraph::reduce()
Public Members
-
std::string name = "Reduce"¶
-
std::string name = "Reduce"¶
-
struct TaskGraphSourceConfig¶
Config for TaskGraph::source()
Public Members
-
std::string name = "Source"¶
-
std::string name = "Source"¶
-
template<typename T>
class TaskGroup¶ TaskGroup<T> - Handle to a collection of tasks that produce type T
Returned by TaskGraph methods to provide access to underlying tasks. Used for:
Chaining operations (transform, reduce, fan_out, fan_in)
Accessing individual tasks for external dependencies
Getting terminal tasks for output
The type parameter T represents the output type of tasks in this group. It’s used for compile-time type checking when chaining operations.
Public Functions
-
TaskGroup() = default¶
Default constructor - creates empty group
Construct from vector of tasks
Construct from single task
-
inline std::shared_ptr<Task> task() const¶
Get single task (asserts size == 1)
Use this when you know the group contains exactly one task, e.g., after reduce() which produces a single output.
-
inline std::shared_ptr<Task> operator[](std::size_t index) const¶
Get task at index (no bounds checking)
-
inline std::size_t size() const¶
Number of tasks in group
-
inline bool empty() const¶
Check if group is empty
Add a task to the group
-
inline void reserve(std::size_t capacity)¶
Reserve capacity
-
inline auto begin()¶
-
inline auto end()¶
-
inline auto begin() const¶
-
inline auto end() const¶
-
inline auto cbegin() const¶
-
inline auto cend() const¶
-
template<typename T>
class TaskResult¶ TaskResult<T> - Container for task outputs
Provides a consistent API for task data passing:
Zero-copy reads via get()
Explicit copies via copy()
Future-proof for disk spilling without API changes
Design rationale:
Wraps data in shared_ptr for zero-copy between tasks
Consumer explicitly copies if mutation needed
size_bytes() enables smart memory management decisions
Future: can add spill_to_disk() without changing user code
Public Functions
-
TaskResult() = default¶
Default constructor - creates empty result
-
inline const T &get() const¶
Zero-copy read access
Returns const reference to underlying data. Future: may trigger load from disk if spilled.
-
inline T copy() const¶
Explicit copy for when mutation is needed
Consumer calls this when they need to modify the data. Returns owned copy that can be mutated.
Get shared ownership of data
Use when you need to extend lifetime or share with others.
-
inline bool is_ready() const¶
Check if data is ready (not empty, not spilled)
-
inline bool empty() const¶
Check if result is empty
-
inline std::size_t size_bytes() const¶
Estimate size in bytes for memory tracking
Default implementation uses sizeof(T). Specialize for types with dynamic allocation.
-
inline explicit operator bool() const¶
Boolean conversion - true if not empty
Public Static Functions
-
static inline TaskResult<T> make(T &&value)¶
Create TaskResult by moving value into shared storage
-
static inline TaskResult<T> make(const T &value)¶
Create TaskResult by copying value into shared storage
Create TaskResult from existing shared_ptr
-
struct num_outputs¶
Strong type for fan-out operations: produce N outputs from one input
Used with fan_out() to specify how many output tasks to create from a single input task.
Example: num_outputs{8} creates 8 parallel tasks from one source
Public Functions
-
inline explicit constexpr num_outputs(std::size_t n)¶
Public Members
-
std::size_t count¶
-
inline explicit constexpr num_outputs(std::size_t n)¶
-
struct num_partitions¶
Strong type for partition operations: split data into N chunks
Used with partition() to specify how many partitions to create from input data. Each partition receives a contiguous chunk.
Example: num_partitions{4} splits [1,2,3,4,5,6,7,8] into: Partition 0: [1,2] Partition 1: [3,4] Partition 2: [5,6] Partition 3: [7,8]
Public Functions
-
inline explicit constexpr num_partitions(std::size_t n)¶
Public Members
-
std::size_t count¶
-
inline explicit constexpr num_partitions(std::size_t n)¶
-
struct split_every¶
Strong type for fan-in operations: group every N inputs together
Used with fan_in() and reduce() to specify how many inputs to combine at each step. Follows Dask’s split_every semantics.
Example: split_every{2} with 7 inputs: Level 0: [0,1,2,3,4,5,6] Level 1: [reduce(0,1), reduce(2,3), reduce(4,5), 6] -> 4 items Level 2: [reduce(01,23), reduce(45,6)] -> 2 items Level 3: [reduce(0123,456)] -> 1 item
Public Functions
-
inline explicit constexpr split_every(std::size_t n)¶
Public Members
-
std::size_t count¶
-
inline explicit constexpr split_every(std::size_t n)¶