File I/O

Namespace: dftracer::utils::utilities::fileio

struct ChunkInfo

Public Members

std::string path
std::size_t bytes_written = 0
std::size_t events_written = 0
int chunk_index = 0
class ChunkIterator

Iterator that lazily reads file chunks on demand.

Reads chunks from a file only when dereferenced. Only one chunk is kept in memory at a time. Exposes chunks as ByteView into the internal read buffer — zero copy on dereference.

Public Types

using iterator_category = std::input_iterator_tag
using value_type = ByteView
using difference_type = std::ptrdiff_t
using pointer = const ByteView*
using reference = const ByteView&

Public Functions

inline ChunkIterator()
inline ChunkIterator(fs::path path, std::size_t chunk_size = 64 * 1024)
inline reference operator*() const
inline pointer operator->() const
inline ChunkIterator &operator++()
inline ChunkIterator operator++(int)
inline bool operator==(const ChunkIterator &other) const
inline bool operator!=(const ChunkIterator &other) const
struct ChunkManifest

Manifest of multiple chunk specifications.

Represents a logical chunk manifest that describes multiple source files or file ranges to be processed together.

Public Functions

inline ChunkManifest()
inline ChunkManifest(std::vector<ChunkSpec> chunk_specs, double total_mb)
inline bool operator==(const ChunkManifest &other) const
inline bool operator!=(const ChunkManifest &other) const

Public Members

std::vector<ChunkSpec> specs
double total_size_mb
class ChunkRange

Range wrapper for chunk iteration (enables range-based for).

Public Functions

inline ChunkRange()
inline ChunkRange(fs::path path, std::size_t chunk_size = 64 * 1024)
inline ChunkIterator begin() const
inline ChunkIterator end() const
struct ChunkSpec

Specification for a chunk to read from a file.

Describes which file and byte range to read from. Used for chunked file processing and splitting operations.

Subclassed by dftracer::utils::utilities::composites::dft::internal::DFTracerChunkSpec

Public Functions

inline ChunkSpec()
inline ChunkSpec(std::string path, std::string idx, double mb, std::size_t start, std::size_t end)
inline bool operator==(const ChunkSpec &other) const
inline bool operator!=(const ChunkSpec &other) const
inline std::size_t size_bytes() const

Public Members

std::string file_path
std::string index_path
double size_mb
std::size_t start_byte
std::size_t end_byte
class ChunkWriter

Public Functions

explicit ChunkWriter(ChunkWriterConfig config)
~ChunkWriter()
ChunkWriter(const ChunkWriter&) = delete
ChunkWriter &operator=(const ChunkWriter&) = delete
coro::CoroTask<void> open()
coro::CoroTask<void> write_line(ByteView line)
coro::CoroTask<void> write_bytes(ByteView data)
coro::CoroTask<void> close()
inline std::size_t total_bytes_written() const
inline std::size_t total_events_written() const
inline int current_chunk_index() const
inline const std::vector<ChunkInfo> &chunks() const
inline bool is_open() const
struct ChunkWriterConfig

Public Types

using ChunkRotationCallback = std::function<void(std::size_t chunk_index, const std::string &chunk_path, std::size_t event_count, std::size_t byte_count)>

Public Functions

inline ChunkWriterConfig &with_output_dir(std::string dir)
inline ChunkWriterConfig &with_base_name(std::string name)
inline ChunkWriterConfig &with_chunk_size(std::size_t bytes)
inline ChunkWriterConfig &with_compression(bool enabled)
inline ChunkWriterConfig &with_compression_level(int level)
inline ChunkWriterConfig &with_json_array_wrapper(bool enabled)
inline ChunkWriterConfig &with_on_chunk_complete(ChunkRotationCallback callback)

Public Members

std::string output_dir
std::string base_name
std::size_t chunk_size_bytes = 256 * 1024 * 1024
bool compress = true
int compression_level = Z_DEFAULT_COMPRESSION
bool json_array_wrapper = true
ChunkRotationCallback on_chunk_complete
class FileReaderUtility : public dftracer::utils::utilities::Utility<filesystem::FileEntry, text::Text, utilities::tags::Parallelizable>

Utility that reads a file and returns its text content.

This utility takes a FileEntry (from DirectoryScanner or manually created) and reads the file content as Text. It composes with existing types.

Composition examples:

  • DirectoryScanner → FileReader → Text

  • FileEntry → FileReader → Text → LineSplitter → Lines

  • FileEntry → FileReader → Text → TextHasher → Hash

Features:

  • Reads entire file into memory as text

  • Can be tagged with Cacheable, Retryable, Monitored behaviors

  • Composes with text utilities for processing

Usage:

auto reader = std::make_shared<FileReader>();

FileEntry file{"/path/to/file.txt"};
text::Text content = reader->process(file);

std::cout << "Read " << content.size() << " bytes\n";

Composition with DirectoryScanner:

auto scanner = std::make_shared<DirectoryScanner>();
auto reader = std::make_shared<FileReader>();

auto files = scanner->process(Directory{"."});
for (const auto& file : files) {
    if (file.is_regular_file) {
        text::Text content = reader->process(file);
        // Process content...
    }
}

Public Functions

FileReaderUtility() = default
~FileReaderUtility() = default
inline virtual coro::CoroTask<text::Text> process(const filesystem::FileEntry &input) override

Read file content as text.

Parameters:

input – FileEntry representing the file to read

Throws:

std::runtime_error – if file cannot be read

Returns:

Text containing file content

struct StreamReadInput

Configuration for streaming file read operations.

Public Functions

StreamReadInput() = default
inline explicit StreamReadInput(fs::path p, std::size_t cs = 64 * 1024)
inline bool operator==(const StreamReadInput &other) const
inline bool operator!=(const StreamReadInput &other) const

Public Members

fs::path path
std::size_t chunk_size = 64 * 1024
struct StreamWriteResult

Result of a streaming write operation.

Public Functions

StreamWriteResult() = default

Public Members

fs::path path
std::size_t bytes_written = 0
std::size_t chunks_written = 0
bool success = false

Public Static Functions

static inline StreamWriteResult success_result(fs::path p, std::size_t bytes, std::size_t chunks)
class StreamingFileReaderUtility : public dftracer::utils::utilities::Utility<StreamReadInput, ChunkRange, utilities::tags::Parallelizable>

Streaming file reader utility that returns lazy iterator.

This utility provides a ChunkRange for lazy iteration over file chunks. Only ONE chunk is in memory at a time - true streaming!

Composable utility pattern:

Usage:

auto reader = std::make_shared<StreamingFileReader>();

StreamReadInput input{"/path/to/large/file.txt", 64 * 1024};
ChunkRange chunks = reader->process(input);

// Only one chunk in memory at a time!
for (const auto& chunk : chunks) {
    // Process chunk immediately
    compressor.process_chunk(chunk);
}

With streaming compression:

auto reader = std::make_shared<StreamingFileReader>();
StreamingCompressor compressor(&writer);

for (const auto& chunk : reader->process(StreamReadInput{"input.txt"})) {
    compressor.process_chunk(chunk);  // True streaming - constant memory!
}
compressor.finalize();

Public Functions

StreamingFileReaderUtility() = default
~StreamingFileReaderUtility() = default
inline coro::CoroTask<ChunkRange> process(const StreamReadInput &input) override

Get lazy chunk iterator for file.

Parameters:

inputStreamReadInput with file path and chunk size

Throws:

std::runtime_error – if file cannot be accessed

Returns:

ChunkRange for iterating over chunks

class StreamingFileWriterUtility

Streaming file writer that accepts ByteView chunks.

Usage:

StreamingFileWriterUtility writer("/output.gz");
co_await writer.process(ByteView(data, len));
writer.close();

Public Functions

inline explicit StreamingFileWriterUtility(fs::path path, bool append = false, bool create_dirs = true)

Open file for streaming write.

Parameters:
  • path – Output file path

  • append – Append to existing file (default: false)

  • create_dirs – Create parent directories (default: true)

inline ~StreamingFileWriterUtility()
StreamingFileWriterUtility(const StreamingFileWriterUtility&) = delete
StreamingFileWriterUtility &operator=(const StreamingFileWriterUtility&) = delete
inline coro::CoroTask<StreamWriteResult> process(ByteView chunk)

Write a single chunk immediately.

Parameters:

chunk – Data chunk to write

Returns:

StreamWriteResult with current write status

inline void close()

Flush and close the file.

inline bool append_mode() const
inline bool create_dirs_mode() const
inline std::size_t total_bytes() const
inline std::size_t total_chunks() const
inline const fs::path &path() const
inline bool is_opened() const
inline bool is_closed() const