Utilities

Namespace: dftracer::utils::utilities

template<typename I, typename Batch, typename ...Tags>
class StreamingUtility : public dftracer::utils::utilities::UtilityBase<I, Tags...>

Streaming utility: process() yields batches via AsyncGenerator<Batch>.

Use when a single input produces an unbounded or large sequence of output batches that should be consumed lazily rather than materialized all at once.

Template Parameters:
  • I – Input type

  • Batch – Element type yielded per iteration

  • Tags – Variadic tag types for opt-in features

Public Types

using BatchType = Batch

Public Functions

inline StreamingUtility()
template<typename Dummy = void, typename = std::enable_if_t<(sizeof...(Tags) > 0) && std::is_void_v<Dummy>>>
inline explicit StreamingUtility(Tags... tags)
virtual coro::AsyncGenerator<Batch> process(const I &input) = 0
inline void bind_context(CoroScope &ctx)

Bind context for streaming utilities with NeedsContext tag. Unlike Utility::process which is wrapped by CoroScope::spawn, streaming utilities need explicit context binding since their AsyncGenerator cannot be spawned directly.

inline void unbind_context()

Unbind context after streaming completes.

Public Static Functions

static inline constexpr std::string_view get_type_signature()
static inline constexpr std::string_view get_name()
template<typename I, typename O, typename ...Tags>
class Utility : public dftracer::utils::utilities::UtilityBase<I, Tags...>

Materialized utility: process() returns CoroTask<O>.

Template Parameters:
  • I – Input type

  • O – Output type

  • Tags – Variadic tag types

Public Types

using Output = O

Public Functions

inline Utility()
template<typename Dummy = void, typename = std::enable_if_t<(sizeof...(Tags) > 0) && std::is_void_v<Dummy>>>
inline explicit Utility(Tags... tags)
virtual coro::CoroTask<O> process(const I &input) = 0
inline coro::CoroTask<O> process(I &&input)

Public Static Functions

static inline constexpr std::string_view get_type_signature()
static inline constexpr std::string_view get_name()

Friends

friend class behaviors::UtilityExecutor< I, O, Tags... >
template<typename I, typename O, typename ...Tags>
class UtilityAdapter

Adapter that wraps Utility as a Task with behavior support.

This adapter provides a fluent interface for converting utilities into tasks with optional behaviors. The primary operation is as_task() which returns a std::shared_ptr<Task> that can be used with the standard Task API.

Design Philosophy:

  • Utilities are just specialized functions that can be wrapped as tasks

  • Behaviors are EXPLICIT and opt-in via with_behavior()

  • The Task API handles dependencies, scheduling, and execution

  • No coupling to Pipeline - work directly with Scheduler

Usage:

// Basic: Convert utility to task
auto task = use(utility).as_task();
scheduler.schedule(task, input);

// With dependency (using Task API)
auto task = use(utility).as_task();
task->depends_on(parent_task);
scheduler.schedule(root_task, initial_input);

// With explicit behavior
auto task = use(utility)
    .with_behavior(std::make_shared<TimingBehavior<I, O>>())
    .as_task();

// Implicit conversion to Task
std::shared_ptr<Task> task = use(utility);  // Calls as_task() implicitly

// Dynamic submission from within a task
auto outer_task = make_task([&](CoroScope& ctx) {
    auto inner = use(utility).as_task();
    auto future = ctx.submit_task(inner, data);
    return future.get();
});

Public Functions

inline explicit UtilityAdapter(std::shared_ptr<Utility<I, O, Tags...>> utility)

Construct adapter from utility.

Automatically creates behaviors from the utility’s tags using the default BehaviorFactory.

Parameters:

utility – Shared pointer to the utility to adapt

inline UtilityAdapter &with_behavior(std::shared_ptr<behaviors::UtilityBehavior<I, O>> behavior)

Add a custom behavior to the chain.

Manually adds a behavior beyond those automatically created from tags. Useful for custom behaviors or when not using tags.

Usage:

use(utility)
    .with_behavior(std::make_shared<MyCustomBehavior<I, O>>())
    .emit_on(pipeline);

Parameters:

behavior – Shared pointer to behavior to add

Returns:

Reference to this adapter for chaining

inline std::shared_ptr<Task> as_task()
inline operator std::shared_ptr<Task>()

Implicit conversion to Task for convenience.

Allows using UtilityAdapter directly where a std::shared_ptr<Task> is expected:

std::shared_ptr<Task> task = use(utility);  // Implicit conversion

Public Static Functions

static inline constexpr bool needs_context()

Check if utility needs CoroScope at compile time.

template<typename I, typename ...Tags>
class UtilityBase

Shared machinery for all utility variants.

Holds tags and context pointer. Type signature is generated at compile time and stored as a static constexpr string_view.

Template Parameters:
  • I – Input type

  • Tags – Variadic tag types for opt-in features

Subclassed by dftracer::utils::utilities::StreamingUtility< ViewReaderInput, ViewReaderBatch, tags::Parallelizable >, dftracer::utils::utilities::StreamingUtility< AggregatorInput, AggregationBatch, tags::NeedsContext >, dftracer::utils::utilities::Utility< ChunkIndexerInput, ChunkIndexerOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< FileMergerUtilityInput, FileMergerUtilityOutput >, dftracer::utils::utilities::Utility< std::string, Hash >, dftracer::utils::utilities::Utility< FileChunkMapperInput, FileChunkMapperOutput >, dftracer::utils::utilities::Utility< AggregatorSummaryInput, AggregatorSummaryOutput >, dftracer::utils::utilities::Utility< Lines, Lines, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< ChunkPrunerInput, ChunkPrunerOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< FileCompressionUtilityInput, FileCompressionUtilityOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< DirectoryScannerUtilityInput, std::vector< FileEntry >, utilities::tags::Parallelizable, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< StatisticsQueryInput, StatisticsQueryOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< ChunkDetailScanInput, ChunkDetailScanOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< EventHashInput, EventHashOutput >, dftracer::utils::utilities::Utility< ChunkManifestMapperUtilityInput, ChunkManifestMapperUtilityOutput >, dftracer::utils::utilities::Utility< FilterableLine, std::optional< Line > >, dftracer::utils::utilities::Utility< ViewBuilderInput, ViewBuilderOutput, tags::Parallelizable >, dftracer::utils::utilities::Utility< EventCollectorFromMetadataCollectorUtilityInput, EventCollectorUtilityOutput >, dftracer::utils::utilities::Utility< LineBatchProcessUtilityInput, LineBatchProcessUtilityOutput< LineOutput > >, dftracer::utils::utilities::Utility< FileDecompressionUtilityInput, FileDecompressionUtilityOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< DirectoryProcessInput, BatchFileProcessOutput< FileOutput >, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< FileMergeValidatorUtilityInput, FileMergeValidatorUtilityOutput >, dftracer::utils::utilities::Utility< Text, Lines, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< EventIdExtractionInput, EventIdExtractionOutput >, dftracer::utils::utilities::Utility< ChunkExtractorUtilityInput, ChunkExtractorUtilityOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< SimpleLineBatchProcessUtilityInput, SimpleLineBatchProcessUtilityOutput< LineOutput > >, dftracer::utils::utilities::Utility< IndexBuildConfig, IndexBuildResult, tags::NeedsContext >, dftracer::utils::utilities::Utility< ReconstructorInput, ReconstructorResult, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< PatternDirectoryScannerUtilityInput, std::vector< FileEntry >, utilities::tags::Parallelizable, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< ReorganizationPlannerInput, ExtractionPlan, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< ResolverInput, ResolverResult, utilities::tags::Parallelizable, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< ComparisonUtilityInput, ComparisonUtilityOutput >, dftracer::utils::utilities::Utility< ChunkAggregatorInput, ChunkAggregationOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< std::vector< ItemInput >, std::vector< ItemOutput >, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< StreamReadInput, ChunkRange, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< AssociationResolverInput, AssociationResolverOutput >, dftracer::utils::utilities::Utility< IncrementalEventHashInput, std::size_t >, dftracer::utils::utilities::Utility< JsonParserInput, JsonParserOutput >, dftracer::utils::utilities::Utility< filesystem::FileEntry, text::Text, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< ChunkVerificationUtilityInput< ChunkType, MetadataType >, ChunkVerificationUtilityOutput, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< ChunkMapperInput, ChunkMapperOutput >, dftracer::utils::utilities::Utility< IndexedReadInput, std::shared_ptr< reader::internal::Reader > >, dftracer::utils::utilities::Utility< ReconstructionPlannerInput, ReconstructionPlan >, dftracer::utils::utilities::Utility< MetadataCollectorUtilityInput, MetadataCollectorUtilityOutput, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< StatisticsAggregatorInput, TraceStatistics, utilities::tags::Parallelizable >, dftracer::utils::utilities::Utility< PerfettoTraceWriterInput, PerfettoTraceWriterOutput, utilities::tags::Parallelizable, utilities::tags::NeedsContext >, dftracer::utils::utilities::Utility< StringJsonParserInput, JsonParserOutput >

Public Types

using Input = I
using TagsTuple = std::tuple<Tags...>

Public Functions

UtilityBase() = default
template<typename Dummy = void, typename = std::enable_if_t<(sizeof...(Tags) > 0) && std::is_void_v<Dummy>>>
inline explicit UtilityBase(Tags... tags)
virtual ~UtilityBase() = default
UtilityBase(const UtilityBase&) = default
UtilityBase &operator=(const UtilityBase&) = default
UtilityBase(UtilityBase&&) = default
UtilityBase &operator=(UtilityBase&&) = default
template<typename Tag>
inline const Tag &get_tag() const
template<typename Tag>
inline Tag &get_tag()
template<typename Tag>
inline void set_tag(Tag tag)

Public Static Functions

template<typename Tag>
static inline constexpr bool has_tag()
static inline constexpr std::string_view get_type_signature()
static inline constexpr std::string_view get_name()

Friends

friend class ::dftracer::utils::CoroScope