Behaviors

Namespace: dftracer::utils::utilities::behaviors

template<typename I, typename O>
class BehaviorChain

Chains multiple behaviors together for sequential execution.

BehaviorChain manages a collection of behaviors and invokes their hooks in order. Behaviors are executed in the order they were added.

Hook execution order:

Template Parameters:
  • I – Input type

  • O – Output type

Public Functions

BehaviorChain() = default
inline void add_behavior(std::shared_ptr<UtilityBehavior<I, O>> behavior)

Add a behavior to the chain.

Behaviors are executed in the order they are added.

Parameters:

behavior – Shared pointer to behavior to add

inline void before_process(const I &input)

Invoke before_process() on all behaviors.

Calls behaviors in forward order (first added, first executed).

Parameters:

input – Input that will be passed to utility

inline O after_process(const I &input, O result)

Invoke after_process() on all behaviors.

Calls behaviors in reverse order (last added, first executed). This allows behaviors to wrap results like middleware layers.

Parameters:
  • input – Input that was passed to utility

  • result – Initial result from utility.process()

Returns:

Final transformed result after all behaviors

inline std::variant<BehaviorErrorResult, std::optional<O>> on_error(const I &input, const std::exception &e, std::size_t attempt)

Invoke on_error() on behaviors until one handles it.

Calls behaviors in forward order. Processing stops when a behavior:

  • Returns BehaviorErrorResult (retry or rethrow decision made)

  • Returns recovery value (std::optional<O> with value)

If all behaviors return std::nullopt, returns rethrow by default.

Parameters:
  • input – Input that was passed to utility

  • e – Exception that was thrown

  • attempt – Current attempt number (0-indexed)

Returns:

BehaviorErrorResult, recovery value, or rethrow if unhandled

inline coro::CoroTask<O> process(const I &input, std::function<coro::CoroTask<O>(const I&)> core)

Execute the async middleware chain.

Runs synchronous behavior hooks around an async core callable. Behaviors stay synchronous (they don’t do I/O) &#8212; only the core utility process() is async.

Parameters:
  • input – Input to process

  • core – Async core function to execute

Returns:

Final result after all middleware

inline bool empty() const

Check if chain is empty.

Returns:

true if no behaviors in chain

inline std::size_t size() const

Get number of behaviors in chain.

Returns:

Number of behaviors

inline void clear()

Clear all behaviors from chain.

struct BehaviorErrorResult

Result of behavior error handling indicating action to take.

Used by behaviors to signal how the executor should handle an error:

  • Retry: Continue retry loop

  • Rethrow: Stop and rethrow the exception

Public Functions

inline bool should_retry() const

Check if this result indicates a retry should occur.

Returns:

true if action is Retry

inline bool should_rethrow() const

Check if this result indicates the exception should be rethrown.

Returns:

true if action is Rethrow

Public Members

BehaviorErrorAction action
std::exception_ptr exception

Public Static Functions

static inline BehaviorErrorResult retry(std::exception_ptr ex = std::current_exception())

Create a retry result.

Signals that the operation should be retried.

Parameters:

ex – The exception that occurred (defaults to current exception)

Returns:

BehaviorErrorResult with Retry action

static inline BehaviorErrorResult rethrow(std::exception_ptr ex = std::current_exception())

Create a rethrow result.

Signals that the exception should be rethrown to stop execution.

Parameters:

ex – The exception to rethrow (defaults to current exception)

Returns:

BehaviorErrorResult with Rethrow action

template<typename I, typename O>
class BehaviorFactory

Factory that creates behaviors based on tags (type-erased mapping).

This factory allows registering tag-to-behavior mappings, enabling generic behavior creation without hardcoding specific tag types.

Key design principle: The executor/wrapper doesn’t know about specific tags. It just asks the factory “give me a behavior for this tag” and the factory handles the type-erased creation.

Template Parameters:
  • I – Input type

  • O – Output type

Public Functions

BehaviorFactory() = default
template<typename Tag>
inline void register_behavior(std::function<std::shared_ptr<UtilityBehavior<I, O>>(const Tag&)> creator)

Register a behavior creator for a specific tag type.

The creator function receives the tag configuration and returns the corresponding behavior instance.

Usage:

factory.register_behavior<tags::Cacheable>(
    [](const tags::Cacheable& tag) {
        return std::make_shared<CachingBehavior<I, O>>(tag);
    }
);

Template Parameters:

Tag – The tag type to register

Parameters:

creator – Function that creates behavior from tag config

template<typename Tag>
inline std::shared_ptr<UtilityBehavior<I, O>> create(const Tag &tag) const

Create a behavior for a specific tag.

Looks up the registered creator for the tag type and invokes it.

Usage:

tags::Cacheable cache_tag;
cache_tag.max_cache_size = 500;
auto behavior = factory.create(cache_tag);
if (behavior) {
    chain.add_behavior(behavior);
}

Template Parameters:

Tag – The tag type

Parameters:

tag – The tag configuration instance

Returns:

Shared pointer to created behavior, or nullptr if not registered

template<typename Tag>
inline bool has() const

Check if a behavior is registered for a tag type.

Template Parameters:

Tag – The tag type to check

Returns:

true if behavior creator is registered

inline std::size_t size() const

Get number of registered behaviors.

Returns:

Number of tag-to-behavior mappings

inline void clear()

Clear all registered behaviors.

template<typename I, typename O>
struct CacheEntry

LRU cache entry with TTL support.

Template Parameters:
  • I – Input type (cache key)

  • O – Output type (cache value)

Public Members

O value
std::chrono::steady_clock::time_point insertion_time
std::list<I>::iterator lru_iterator
template<typename I, typename O>
class CachingBehavior : public dftracer::utils::utilities::behaviors::UtilityBehavior<I, O>

Thread-safe caching behavior with LRU eviction and TTL support.

Implements caching using:

  • LRU (Least Recently Used) eviction when cache is full

  • TTL (Time To Live) to invalidate stale entries

  • Optional hash function for custom input types

  • Read-write lock for thread-safety

The cache intercepts utility execution in after_process to store results, and can return cached results on future calls via get().

Thread-safety: Uses std::shared_mutex for concurrent access:

Note: get() requires exclusive lock because LRU tracking updates access order.

Template Parameters:
  • I – Input type (must be hashable or provide custom hash)

  • O – Output type (must be copyable)

Public Functions

inline CachingBehavior(std::size_t max_size, std::chrono::seconds ttl, bool use_lru = true)

Construct caching behavior.

Parameters:
  • max_size – Maximum number of entries to cache

  • ttl – Time-to-live for cache entries

  • use_lru – Whether to use LRU eviction

inline O process(const I &input, typename UtilityBehavior<I, O>::NextFunction next) override

Middleware process - handles caching transparently.

Implements automatic caching:

  1. Check cache for cached result

  2. If cache HIT: return cached value (skip execution!)

  3. If cache MISS: call next() to execute, then store result

This makes caching completely transparent - just add the behavior and re-running the pipeline will use cached results automatically.

Parameters:
  • input – Input to process

  • next – Next function in chain (the actual utility or next behavior)

Returns:

Cached or freshly computed result

inline virtual void before_process([[maybe_unused]] const I &input) override

Check cache before processing.

inline O after_process(const I &input, O result) override

Cache the result after processing.

With the new middleware pattern, caching happens in process(). This hook is kept for compatibility when behaviors don’t override process().

Parameters:
  • input – The input key

  • result – The output to cache

Returns:

The unmodified result

inline std::variant<BehaviorErrorResult, std::optional<O>> on_error(const I &input, [[maybe_unused]] const std::exception &e, [[maybe_unused]] std::size_t attempt) override

Attempt to retrieve from cache on error.

If the utility fails but we have a (possibly stale) cached value, return it as a fallback recovery value. Otherwise, pass through to let retry behavior handle it.

Parameters:
  • input – The input key

  • e – The exception that occurred

  • attempt – Current retry attempt

Returns:

Cached value if available, nullopt to pass through

inline std::optional<O> get(const I &input)

Get cached value if available and not expired.

Parameters:

input – The input key

Returns:

Cached output if available, nullopt otherwise

inline void clear()

Clear all cached entries.

inline std::size_t size() const

Get current cache size.

class MaxRetriesExceeded : public std::runtime_error

Exception thrown when max retry attempts are exceeded.

Public Functions

inline MaxRetriesExceeded(std::size_t attempts, std::exception_ptr original = nullptr)

Construct with attempt count and original exception.

Parameters:
  • attempts – Number of attempts made

  • original – The original exception that caused failures

inline std::size_t get_attempts() const

Get number of attempts made.

inline std::exception_ptr get_original_exception() const

Get the original exception that caused the failures.

template<typename I, typename O>
class MonitoringBehavior : public dftracer::utils::utilities::behaviors::UtilityBehavior<I, O>

Monitoring behavior for logging and metrics.

Provides hooks for:

  • Logging before/after processing

  • Timing execution duration

  • Error tracking and logging

  • Custom callbacks for metrics collection

The behavior uses a callback function to report events, allowing flexible integration with different logging/metrics systems.

Template Parameters:
  • I – Input type

  • O – Output type

Public Functions

inline explicit MonitoringBehavior(std::function<void(const std::string&)> log_callback, std::string utility_name = "Utility")

Construct monitoring behavior.

Parameters:
  • log_callback – Callback function for log messages

  • utility_name – Optional name of the utility being monitored

inline virtual void before_process([[maybe_unused]] const I &input) override

Log before processing and start timing.

Parameters:

input – The input to be processed

inline virtual O after_process([[maybe_unused]] const I &input, O result) override

Log after processing with execution time.

Parameters:
  • input – The input that was processed

  • result – The processing result

Returns:

Unmodified result

inline std::variant<BehaviorErrorResult, std::optional<O>> on_error([[maybe_unused]] const I &input, const std::exception &e, std::size_t attempt) override

Log error occurrence.

Logs the error but doesn’t influence retry or recovery behavior. Returns nullopt to pass through to next behavior in chain.

Parameters:
  • input – The input that caused the error

  • e – The exception that occurred

  • attempt – Current retry attempt

Returns:

nullopt (pass through to next behavior)

inline void set_log_callback(std::function<void(const std::string&)> callback)

Set new log callback.

Parameters:

callback – New callback function

inline const std::function<void(const std::string&)> &get_log_callback() const

Get current log callback.

template<typename I, typename O>
class RetryBehavior : public dftracer::utils::utilities::behaviors::UtilityBehavior<I, O>

Retry behavior with exponential backoff support.

Implements retry logic with configurable:

  • Maximum retry attempts

  • Base delay between retries

  • Optional exponential backoff

The behavior intercepts errors in on_error and signals to the executor whether to retry by returning nullopt (retry) or a value (stop retrying with that value).

Template Parameters:
  • I – Input type

  • O – Output type

Public Functions

inline RetryBehavior(std::size_t max_retries, std::chrono::milliseconds base_delay, bool exponential_backoff = true)

Construct retry behavior.

Parameters:
  • max_retries – Maximum number of retry attempts

  • base_delay – Base delay between retries

  • exponential_backoff – Whether to use exponential backoff

inline O process(const I &input, typename UtilityBehavior<I, O>::NextFunction next) override

Middleware process with retry loop.

Wraps execution with try/catch and retry logic:

  1. Try to call next(input)

  2. On success: return result

  3. On error: check if should retry using on_error()

  4. If retry: wait and try again (up to max_retries)

  5. If max retries exceeded: throw MaxRetriesExceeded

Parameters:
  • input – Input to process

  • next – Next function in chain (could be another behavior or utility)

Throws:

MaxRetriesExceeded – if all retries fail

Returns:

Successful result (after retries if needed)

inline virtual void before_process([[maybe_unused]] const I &input) override

Hook before processing (no-op for retry).

inline virtual O after_process([[maybe_unused]] const I &input, O result) override

Hook after processing (no-op for retry).

inline std::variant<BehaviorErrorResult, std::optional<O>> on_error([[maybe_unused]] const I &input, [[maybe_unused]] const std::exception &e, std::size_t attempt) override

Handle error and determine whether to retry.

If we haven’t exceeded max_retries, wait for calculated delay and return Retry action. Otherwise, return Rethrow action with MaxRetriesExceeded exception.

Parameters:
  • input – The input that caused the error

  • e – The exception that occurred

  • attempt – Current attempt number (0-indexed)

Returns:

BehaviorErrorResult with Retry or Rethrow action

inline std::size_t get_max_retries() const

Get maximum retry attempts.

inline std::chrono::milliseconds get_base_delay() const

Get base delay.

inline bool is_exponential_backoff() const

Check if exponential backoff is enabled.

template<typename I, typename O>
class UtilityBehavior

Base interface for utility behaviors.

Behaviors are composable middleware that can be applied to utilities to add cross-cutting concerns like caching, retry logic, monitoring, etc.

Behaviors can intercept execution at multiple points:

  1. process() - Wraps the execution (middleware pattern) - can skip, transform, or retry

  2. before_process() - Called before execution (hook)

  3. after_process() - Called after execution (hook + transform)

  4. on_error() - Called on exception (error handling)

The process() method is the primary interception point using middleware pattern. Override it to wrap execution with custom logic (e.g., caching, retry).

Template Parameters:
  • I – Input type for the utility

  • O – Output type for the utility

Subclassed by dftracer::utils::utilities::behaviors::CachingBehavior< I, O >, dftracer::utils::utilities::behaviors::MonitoringBehavior< I, O >, dftracer::utils::utilities::behaviors::RetryBehavior< I, O >

Public Types

using NextFunction = std::function<O(const I&)>

Public Functions

virtual ~UtilityBehavior() = default
inline virtual O process(const I &input, NextFunction next)

Middleware-style process wrapper.

This is the primary interception point. Override to wrap execution:

  • Caching: Check cache, call next() on miss, store result

  • Retry: Try/catch next(), retry on failure

  • Monitoring: Time execution around next()

  • Transform: Modify input before next(), output after next()

The ‘next’ function represents the rest of the execution chain. Call next(input) to continue execution, or skip it to short-circuit.

Default implementation calls before_process, then next, then after_process.

Parameters:
  • input – Input to process

  • next – Next function in the chain (could be another behavior or the utility)

Returns:

Processed output

inline virtual void before_process([[maybe_unused]] const I &input)

Hook called before utility.process().

Use this for:

  • Pre-validation

  • Logging start time

  • Rate limiting

  • Pre-processing

Parameters:

input – Input that will be passed to utility

inline virtual O after_process([[maybe_unused]] const I &input, O result)

Hook called after utility.process() succeeds.

Use this for:

  • Result transformation

  • Caching results

  • Logging completion

  • Post-processing

Parameters:
  • input – Input that was passed to utility

  • result – Result from utility.process()

Returns:

Potentially transformed result

inline virtual std::variant<BehaviorErrorResult, std::optional<O>> on_error([[maybe_unused]] const I &input, [[maybe_unused]] const std::exception &e, [[maybe_unused]] std::size_t attempt)

Hook called when utility.process() throws exception.

Can return one of three results:

  • BehaviorErrorResult: Explicit retry or rethrow action

  • std::optional<O> with value: Recovery value to use instead

  • std::nullopt: Pass through to next behavior in chain

Use this for:

  • Error logging (return nullopt to pass through)

  • Retry logic (return BehaviorErrorResult::retry() or ::rethrow())

  • Fallback values (return std::optional<O> with value)

  • Error recovery (return recovery value)

Parameters:
  • input – Input that was passed to utility

  • e – Exception that was thrown

  • attempt – Current attempt number (0-indexed)

Returns:

BehaviorErrorResult, recovery value, or nullopt

template<typename I, typename O, typename ...Tags>
class UtilityExecutor

Executes a utility with behavior chain wrapping.

UtilityExecutor orchestrates the execution of a utility by:

  1. Running before_process hooks on all behaviors

  2. Executing the utility’s process() method

  3. Running after_process hooks on all behaviors

  4. Handling errors through behavior on_error hooks

This class bridges utilities and behaviors, providing a unified execution path regardless of which process() overload the utility implements.

Template Parameters:
  • I – Input type

  • O – Output type

  • Tags – Variadic tag types

Public Functions

inline UtilityExecutor(std::shared_ptr<Utility<I, O, Tags...>> utility, BehaviorChain<I, O> chain)

Construct executor with utility and behavior chain.

Parameters:
  • utility – The utility to execute

  • chain – Behavior chain to wrap execution

inline coro::CoroTask<O> execute(const I &input)

Execute utility without context using middleware pattern.

Builds a middleware chain where each behavior wraps execution. Behaviors can intercept, skip, transform, retry, or cache execution.

Parameters:

input – Input to process

Throws:

Any – exception from utility or behaviors

Returns:

Output result

inline coro::CoroTask<O> execute_with_context(CoroScope &ctx, const I &input)

Execute utility with context using middleware pattern.

Sets context reference before calling process(), then clears it after.

Parameters:
  • input – Input to process

  • ctxTask context for dynamic task emission

Throws:

Any – exception from utility or behaviors

Returns:

Output result

inline std::shared_ptr<Utility<I, O, Tags...>> get_utility() const

Get reference to underlying utility.

inline BehaviorChain<I, O> &get_behavior_chain()

Get reference to behavior chain.

inline const BehaviorChain<I, O> &get_behavior_chain() const

Get const reference to behavior chain.