Task Graph API¶
See also
For complete class and member documentation, see the API Reference.
DAG-based task graph builder for constructing parallel computation graphs. All classes are in the dftracer::utils::task_graph namespace.
graph LR
subgraph parallel["parallel(N)"]
T1["Task 0"]
T2["Task 1"]
Tn["Task N-1"]
end
subgraph reduce["reduce()"]
R1["Reduce 0-1"]
R2["Reduce 2-3"]
RF["Final"]
end
T1 --> R1
T2 --> R1
Tn --> R2
R1 --> RF
R2 --> RF
subgraph fanout["fan_out(1 to N)"]
Src["Source"] --> S1["Shard 0"]
Src --> S2["Shard 1"]
Src --> Sn["Shard N"]
end
subgraph fanin["fan_in(N to 1)"]
I1["Input 0"] --> Merge["Merge"]
I2["Input 1"] --> Merge
In["Input N"] --> Merge
end
Usage Examples¶
Creating a TaskGraph¶
Build a graph and execute it with Pipeline:
#include <dftracer/utils/core/pipeline/pipeline.h>
#include <dftracer/utils/core/task_graph/task_graph.h>
// Create graph builder
auto graph = TaskGraph::builder({
.name = "ProcessingGraph",
.max_concurrency = 128
});
// Add operations (see examples below)
auto results = graph.parallel<OutputType>(
file_count,
[](CoroScope& scope, std::size_t idx) -> coro::CoroTask<OutputType> {
co_return process_item(idx);
},
{.name = "ProcessItem"}
);
// Execute with Pipeline
Pipeline pipeline(pipeline_config);
pipeline.set_source(graph.first_task());
pipeline.execute();
parallel() - Process N Independent Tasks¶
Spawn N parallel tasks that each produce a result. Each task receives its index (0 to count-1):
auto graph = TaskGraph::builder({.name = "Parallel"});
// Process 8 files in parallel
auto file_results = graph.parallel<FileMetadata>(
8,
[](CoroScope& scope, std::size_t idx) -> coro::CoroTask<FileMetadata> {
std::string filename = "file_" + std::to_string(idx) + ".pfw.gz";
// Load file, parse header, return metadata
auto metadata = load_file_metadata(filename);
co_return metadata;
},
{.name = "LoadMetadata"}
);
With max_concurrency set, a sliding window limits in-flight tasks:
// Allow only 4 tasks in-flight at once
auto results = graph.parallel<int>(
100,
[](CoroScope& scope, std::size_t idx) -> coro::CoroTask<int> {
co_return expensive_computation(idx);
},
{.name = "Compute", .max_concurrency = 4}
);
reduce() - Combine N Results Into One¶
Reduce parallel results using a tree reduction (O(log N) depth). Combine every 2 results:
auto graph = TaskGraph::builder({.name = "Reduce"});
// Create 8 parallel tasks
auto file_results = graph.parallel<Metadata>(/*...*/);
// Reduce: combine pairs, then combine those results
auto combined = graph.reduce<CombinedMetadata>(
file_results,
split_every{2}, // Combine 2 results at each level
[](CoroScope& scope, std::vector<Metadata> batch)
-> coro::CoroTask<CombinedMetadata> {
// Merge batch into single result
CombinedMetadata merged;
for (const auto& meta : batch) {
merged.merge(meta);
}
co_return merged;
},
{.name = "MergeMetadata"}
);
fan_out() - Split One Input Into N Outputs¶
Fan-out distributes a single input to N downstream tasks by index:
auto graph = TaskGraph::builder({.name = "FanOut"});
// Source produces one large dataset
auto source = graph.source<Dataset>(
[](CoroScope& scope) -> coro::CoroTask<Dataset> {
co_return load_full_dataset();
},
{.name = "LoadDataset"}
);
// Fan-out: split into 4 shards
auto shards = graph.fan_out<DatasetShard>(
source,
num_outputs{4},
[](CoroScope& scope, const Dataset& input, std::size_t shard_idx)
-> coro::CoroTask<DatasetShard> {
// Extract shard_idx-th quarter of dataset
auto shard = input.partition(shard_idx, 4);
co_return shard;
},
{.name = "Shard"}
);
fan_in() - Merge N Inputs Into One¶
Combine multiple task outputs into a single result:
// Fan-in: combine all shards back together
auto merged = graph.fan_in<Dataset>(
shards,
[](CoroScope& scope, std::vector<DatasetShard> inputs)
-> coro::CoroTask<Dataset> {
// Combine all shards back to full dataset
Dataset full;
for (const auto& shard : inputs) {
full.append(shard);
}
co_return full;
},
{.name = "Merge"}
);
Fan-in with grouping (combine every N inputs):
// Combine every 2 shards (reduce M -> ceil(M/2) outputs)
auto grouped = graph.fan_in<DatasetShard>(
shards,
split_every{2},
[](CoroScope& scope, std::vector<DatasetShard> inputs)
-> coro::CoroTask<DatasetShard> {
DatasetShard result = inputs[0];
for (std::size_t i = 1; i < inputs.size(); ++i) {
result.merge(inputs[i]);
}
co_return result;
},
{.name = "GroupedMerge"}
);
map() - Transform Each Item 1:1¶
Transform each task’s output (streaming 1:1 mapping):
auto graph = TaskGraph::builder({.name = "Map"});
// Create parallel tasks
auto file_results = graph.parallel<RawData>(/*...*/);
// Map: transform each result
auto normalized = graph.map<NormalizedData>(
file_results,
[](CoroScope& scope, const RawData& raw) -> coro::CoroTask<NormalizedData> {
// Process raw data
auto norm = normalize(raw);
co_return norm;
},
{.name = "Normalize", .max_concurrency = 8}
);
fold() - Accumulate With Initial Value¶
Reduce with an initial accumulator value using a binary operation:
auto graph = TaskGraph::builder({.name = "Fold"});
// Create parallel task outputs
auto counts = graph.parallel<int>(/*...*/);
// Fold: sum all counts with initial value 0
auto total = graph.fold<int>(
counts,
0, // Initial accumulator
split_every{2},
[](int acc, int next) -> int { return acc + next; },
{.name = "Sum"}
);
aggregate() - Map-Reduce Pattern¶
Apply a mapper to each item, then reduce the results:
auto graph = TaskGraph::builder({.name = "Aggregate"});
// Create parallel tasks
auto events = graph.parallel<Event>(/*...*/);
// Aggregate: map events to statistics, then combine
auto stats = graph.aggregate<Statistics, EventStats>(
events,
// Mapper: Event -> EventStats
[](CoroScope& scope, const Event& evt) -> coro::CoroTask<EventStats> {
auto stat = compute_event_stats(evt);
co_return stat;
},
split_every{4},
// Reducer: combine EventStats
[](CoroScope& scope, std::vector<EventStats> batch)
-> coro::CoroTask<Statistics> {
Statistics merged;
for (const auto& stat : batch) {
merged.combine(stat);
}
co_return merged;
},
{.name = "AggregateEvents"}
);
partition() - Split Data Into Groups¶
Partition static data into N contiguous chunks (no coroutine tasks):
auto graph = TaskGraph::builder({.name = "Partition"});
std::vector<int> data = {0, 1, 2, 3, 4, 5, 6, 7};
// Split into 4 partitions
auto partitions = graph.partition<int>(
data,
num_partitions{4},
{.name = "Split"}
);
// partitions contains 4 tasks, each producing std::vector<int>
// Task 0: [0, 1], Task 1: [2, 3], Task 2: [4, 5], Task 3: [6, 7]
concat_partitions() - Merge Partitions Back Together¶
Recombine partitioned vectors into a single vector:
auto graph = TaskGraph::builder({.name = "Concat"});
// Create partitions (see above)
auto partitions = graph.partition<int>(data, num_partitions{4});
// Concatenate back into single vector
auto full = graph.concat_partitions<int>(
partitions,
split_every{2},
{.name = "Join"}
);
// full.task() produces std::vector<int> with all elements
Complete Real-World Example: Split & Reduce¶
This example shows a 3-phase pipeline similar to dftracer_split:
#include <dftracer/utils/core/pipeline/pipeline.h>
#include <dftracer/utils/core/task_graph/task_graph.h>
struct FileMetadata { /* file info */ };
struct ChunkManifest { /* chunk boundaries */ };
struct ExtractResult { /* chunk data */ };
int main() {
// Configure pipeline
PipelineConfig config;
config.executor_threads = 8;
// Build task graph
auto graph = TaskGraph::builder({
.name = "SplitPipeline",
.max_concurrency = 0 // Unlimited
});
// Phase 1: Load metadata for all files in parallel
auto file_metadata = graph.parallel<FileMetadata>(
10, // 10 files
[](CoroScope& scope, std::size_t idx) -> coro::CoroTask<FileMetadata> {
std::string filename = "trace_" + std::to_string(idx) + ".pfw.gz";
auto meta = load_file_header(filename);
co_return meta;
},
{.name = "LoadMetadata"}
);
// Phase 2: Reduce metadata to build global manifests
auto manifests = graph.reduce<std::vector<ChunkManifest>>(
file_metadata,
split_every{2},
[](CoroScope& scope, std::vector<FileMetadata> batch)
-> coro::CoroTask<std::vector<ChunkManifest>> {
std::vector<ChunkManifest> result;
for (const auto& meta : batch) {
auto chunks = meta.create_chunks(1 << 20); // 1 MB chunks
result.insert(result.end(), chunks.begin(), chunks.end());
}
co_return result;
},
{.name = "CreateManifests"}
);
// Phase 3: Extract chunks in parallel
auto task_extract = make_task(
[](CoroScope& scope, std::vector<ChunkManifest> manifests)
-> coro::CoroTask<std::vector<ExtractResult>> {
std::vector<ExtractResult> results;
for (const auto& manifest : manifests) {
auto chunk = extract_chunk(manifest);
results.push_back(chunk);
}
co_return results;
},
"ExtractChunks"
);
task_extract->depends_on(manifests.task());
graph.add(task_extract);
// Execute graph
Pipeline pipeline(config);
pipeline.set_source(graph.first_task());
pipeline.execute();
std::cout << "Split complete!" << std::endl;
return 0;
}
Configuration¶
TaskGraphConfig¶
Global configuration for task graph execution.
Controls graph-level settings including:
name: Identifier for the task graphmax_concurrency: Maximum number of parallel tasks in-flight (0 = unlimited)
Individual task operations (parallel(), fan_out(), map()) have their own config structs
that can override the graph-level max_concurrency when set to a non-zero value.
TaskGraphSourceConfig¶
Configuration for source() operations.
name: Identifier for the source task (default:"Source")
TaskGraphParallelConfig¶
Configuration for parallel() task execution.
name: Identifier for the parallel group (default:"Parallel")max_concurrency: Override graph-level concurrency for this group (0 = use graph default)
TaskGraphFanOutConfig¶
Configuration for fan_out() operations.
name: Identifier for the fan-out group (default:"FanOut")max_concurrency: Override graph-level concurrency for this group (0 = use graph default)
TaskGraphFanInConfig¶
Configuration for fan_in() operations.
name: Identifier for the fan-in task (default:"FanIn")
TaskGraphMapConfig¶
Configuration for map() operations.
name: Identifier for the map group (default:"Map")max_concurrency: Override graph-level concurrency for this group (0 = use graph default)
TaskGraphReduceConfig¶
Configuration for reduce() operations.
name: Identifier for the reduce task (default:"Reduce")
TaskGraphFoldConfig¶
Configuration for fold() operations.
name: Identifier for the fold task (default:"Fold")
TaskGraphAggregateConfig¶
Configuration for aggregate() operations.
name: Identifier for the aggregate task (default:"Aggregate")
TaskGraphPartitionConfig¶
Configuration for partition() operations.
name: Identifier for the partition task (default:"Partition")
TaskGraphConcatConfig¶
Configuration for concat_partitions() operations.
name: Identifier for the concat task (default:"Concat")
Tip
All config structs share a common name field for identifying the operation in
logs and diagnostics. Operations that support parallelism (parallel, fan_out,
map) additionally expose max_concurrency to override the graph-level setting.
For complete struct definitions including default values, see the API Reference.