Task Graph

Namespace: dftracer::utils::task_graph

For usage guide and examples, see Task Graph API.

class TaskGraph

Builder for constructing task DAGs.

Features:

  • Fluent API for building task graphs

  • Optional auto-registration with Pipeline

  • Support for fan-in, fan-out, transform, reduce patterns

  • Bi-directional connectivity with external tasks via wrap()

  • Configurable max_concurrency to limit in-flight parallel tasks

Usage:

Pipeline pipeline(config);
auto graph = TaskGraph::builder({.name = "MyGraph",
                                 .max_concurrency = 128});
auto readers = graph.parallel<int>(8, reader_fn,
                                   {.name = "Reader"});
auto reduced = graph.reduce<int>(readers, split_every{2}, reducer,
                                 {.name = "Merge"});
pipeline.set_source(readers.tasks()[0]);
pipeline.execute();

Public Functions

template<typename T>
inline TaskGroup<T> wrap(std::shared_ptr<Task> external_task)

Wrap an external task as entry point to the graph

Use this to connect TaskGraph operations to existing barebone tasks.

inline void add(std::shared_ptr<Task> task)

Add an external task to the graph

Use this to add utility-adapter tasks or other externally created tasks that should be tracked by the graph but weren’t created via graph methods.

template<typename T, typename Func>
inline TaskGroup<T> source(Func &&func, TaskGraphSourceConfig opts = {})

Create a single source task

template<typename T, typename Func>
inline TaskGroup<T> parallel(std::size_t count, Func &&func, TaskGraphParallelConfig opts = {})

Create N parallel tasks (no dependencies between them)

Each task receives its index (0 to count-1) as parameter.

When max_concurrency is set (via options or graph-level default), a sliding window dependency chain limits in-flight tasks: task[i] depends on task[i - max_concurrency], so at most max_concurrency tasks execute simultaneously.

template<typename U, typename T, typename Func>
inline TaskGroup<U> fan_out(const TaskGroup<T> &source, num_outputs count, Func &&mapper, TaskGraphFanOutConfig opts = {})

Fan-out: 1 -> N (one input produces N outputs)

Each output task receives the source output and its index.

When max_concurrency is set, a sliding window dependency chain limits in-flight fan-out tasks.

template<typename U, typename T, typename Combiner>
inline TaskGroup<U> fan_in(const TaskGroup<T> &group, Combiner &&combiner, TaskGraphFanInConfig opts = {})

Fan-in: M -> 1 (all inputs combine to single output)

template<typename U, typename T, typename Combiner>
inline TaskGroup<U> fan_in(const TaskGroup<T> &group, split_every count, Combiner &&combiner, TaskGraphFanInConfig opts = {})

Fan-in with grouping: M -> ceil(M/N) (group every N inputs)

template<typename U, typename T, typename Mapper>
inline TaskGroup<U> map(const TaskGroup<T> &group, Mapper &&mapper, TaskGraphMapConfig opts = {})

Map: 1-to-1 mapping

When max_concurrency is set, a sliding window dependency chain limits in-flight map tasks.

template<typename U, typename T, typename Reducer>
inline TaskGroup<U> reduce(const TaskGroup<T> &group, split_every count, Reducer &&reducer, TaskGraphReduceConfig opts = {})

Tree reduce: M -> 1 with O(log N) depth

template<typename T, typename BinaryOp>
inline TaskGroup<T> fold(const TaskGroup<T> &group, T init, split_every count, BinaryOp &&op, TaskGraphFoldConfig opts = {})

Fold: tree reduction with initial value and binary operation

Each node folds its inputs: op(op(op(init, a), b), c)

template<typename U, typename Intermediate, typename T, typename MapFn, typename ReduceFn>
inline TaskGroup<U> aggregate(const TaskGroup<T> &group, MapFn &&map_fn, split_every count, ReduceFn &&reduce_fn, TaskGraphAggregateConfig opts = {})

Aggregate: map-reduce pattern

Applies mapper to each task (1:1), then reduces the results.

template<typename T>
inline TaskGroup<std::vector<T>> partition(const std::vector<T> &data, num_partitions count, TaskGraphPartitionConfig opts = {})

Partition: split data into N contiguous chunks

Each task produces its chunk as std::vector<T>.

template<typename T>
inline TaskGroup<std::vector<T>> concat_partitions(const TaskGroup<std::vector<T>> &group, split_every count = split_every{2}, TaskGraphConcatConfig opts = {})

Concat partitions: combine vector partitions into a single vector

Uses tree reduction to efficiently concatenate.

inline std::shared_ptr<Task> first_task() const

Get first task in the graph (convenience for setting Pipeline source)

inline std::shared_ptr<Task> last_task() const

Get last task in the graph (convenience for setting Pipeline destination)

inline const std::vector<std::shared_ptr<Task>> &tasks() const

Get all tasks in the graph

inline const std::string &name() const

Get graph name

Public Static Functions

static inline TaskGraph builder(TaskGraphConfig opts = {})

Create a TaskGraph builder with options

struct TaskGraphAggregateConfig

Config for TaskGraph::aggregate()

Public Members

std::string name = "Aggregate"
struct TaskGraphConcatConfig

Config for TaskGraph::concat_partitions()

Public Members

std::string name = "Concat"
struct TaskGraphConfig

Config for TaskGraph builder.

max_concurrency controls how many parallel/fan-out/map tasks can be in-flight simultaneously. 0 = unlimited (all tasks start as soon as their dependencies are met). When set, individual method configs can override this default.

Public Members

std::string name
std::size_t max_concurrency = 0
struct TaskGraphFanInConfig

Config for TaskGraph::fan_in() (single and grouped overloads)

Public Members

std::string name = "FanIn"
struct TaskGraphFanOutConfig

Config for TaskGraph::fan_out()

max_concurrency overrides the graph-level default when non-zero.

Public Members

std::string name = "FanOut"
std::size_t max_concurrency = 0
struct TaskGraphFoldConfig

Config for TaskGraph::fold()

Public Members

std::string name = "Fold"
struct TaskGraphMapConfig

Config for TaskGraph::map()

max_concurrency overrides the graph-level default when non-zero.

Public Members

std::string name = "Map"
std::size_t max_concurrency = 0
struct TaskGraphParallelConfig

Config for TaskGraph::parallel()

max_concurrency overrides the graph-level default when non-zero.

Public Members

std::string name = "Parallel"
std::size_t max_concurrency = 0
struct TaskGraphPartitionConfig

Config for TaskGraph::partition()

Public Members

std::string name = "Partition"
struct TaskGraphReduceConfig

Config for TaskGraph::reduce()

Public Members

std::string name = "Reduce"
struct TaskGraphSourceConfig

Config for TaskGraph::source()

Public Members

std::string name = "Source"
template<typename T>
class TaskGroup

TaskGroup<T> - Handle to a collection of tasks that produce type T

Returned by TaskGraph methods to provide access to underlying tasks. Used for:

  • Chaining operations (transform, reduce, fan_out, fan_in)

  • Accessing individual tasks for external dependencies

  • Getting terminal tasks for output

The type parameter T represents the output type of tasks in this group. It’s used for compile-time type checking when chaining operations.

Public Functions

TaskGroup() = default

Default constructor - creates empty group

inline explicit TaskGroup(std::vector<std::shared_ptr<Task>> tasks)

Construct from vector of tasks

inline explicit TaskGroup(std::shared_ptr<Task> task)

Construct from single task

inline const std::vector<std::shared_ptr<Task>> &tasks() const

Get all tasks in this group

inline std::shared_ptr<Task> task() const

Get single task (asserts size == 1)

Use this when you know the group contains exactly one task, e.g., after reduce() which produces a single output.

inline std::shared_ptr<Task> at(std::size_t index) const

Get task at index

inline std::shared_ptr<Task> operator[](std::size_t index) const

Get task at index (no bounds checking)

inline std::size_t size() const

Number of tasks in group

inline bool empty() const

Check if group is empty

inline void add(std::shared_ptr<Task> task)

Add a task to the group

inline void reserve(std::size_t capacity)

Reserve capacity

inline auto begin()
inline auto end()
inline auto begin() const
inline auto end() const
inline auto cbegin() const
inline auto cend() const
template<typename T>
class TaskResult

TaskResult<T> - Container for task outputs

Provides a consistent API for task data passing:

  • Zero-copy reads via get()

  • Explicit copies via copy()

  • Future-proof for disk spilling without API changes

Design rationale:

  • Wraps data in shared_ptr for zero-copy between tasks

  • Consumer explicitly copies if mutation needed

  • size_bytes() enables smart memory management decisions

  • Future: can add spill_to_disk() without changing user code

Public Functions

TaskResult() = default

Default constructor - creates empty result

inline const T &get() const

Zero-copy read access

Returns const reference to underlying data. Future: may trigger load from disk if spilled.

inline T copy() const

Explicit copy for when mutation is needed

Consumer calls this when they need to modify the data. Returns owned copy that can be mutated.

inline std::shared_ptr<T> share() const

Get shared ownership of data

Use when you need to extend lifetime or share with others.

inline bool is_ready() const

Check if data is ready (not empty, not spilled)

inline bool empty() const

Check if result is empty

inline std::size_t size_bytes() const

Estimate size in bytes for memory tracking

Default implementation uses sizeof(T). Specialize for types with dynamic allocation.

inline explicit operator bool() const

Boolean conversion - true if not empty

Public Static Functions

static inline TaskResult<T> make(T &&value)

Create TaskResult by moving value into shared storage

static inline TaskResult<T> make(const T &value)

Create TaskResult by copying value into shared storage

static inline TaskResult<T> from_shared(std::shared_ptr<T> ptr)

Create TaskResult from existing shared_ptr

struct num_outputs

Strong type for fan-out operations: produce N outputs from one input

Used with fan_out() to specify how many output tasks to create from a single input task.

Example: num_outputs{8} creates 8 parallel tasks from one source

Public Functions

inline explicit constexpr num_outputs(std::size_t n)

Public Members

std::size_t count
struct num_partitions

Strong type for partition operations: split data into N chunks

Used with partition() to specify how many partitions to create from input data. Each partition receives a contiguous chunk.

Example: num_partitions{4} splits [1,2,3,4,5,6,7,8] into: Partition 0: [1,2] Partition 1: [3,4] Partition 2: [5,6] Partition 3: [7,8]

Public Functions

inline explicit constexpr num_partitions(std::size_t n)

Public Members

std::size_t count
struct split_every

Strong type for fan-in operations: group every N inputs together

Used with fan_in() and reduce() to specify how many inputs to combine at each step. Follows Dask’s split_every semantics.

Example: split_every{2} with 7 inputs: Level 0: [0,1,2,3,4,5,6] Level 1: [reduce(0,1), reduce(2,3), reduce(4,5), 6] -> 4 items Level 2: [reduce(01,23), reduce(45,6)] -> 2 items Level 3: [reduce(0123,456)] -> 1 item

Public Functions

inline explicit constexpr split_every(std::size_t n)

Public Members

std::size_t count