File I/O

File reading, writing, and streaming utilities supporting both synchronous and asynchronous operations.

Synchronous I/O:

#include <dftracer/utils/utilities/fileio/file_reader.h>
#include <dftracer/utils/utilities/fileio/streaming_file_reader.h>
#include <dftracer/utils/utilities/fileio/streaming_file_writer.h>
#include <dftracer/utils/utilities/fileio/lines/streaming_line_reader.h>

Asynchronous Generators:

#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_line_generator.h>
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_bytes_generator.h>
#include <dftracer/utils/utilities/fileio/lines/sources/async_indexed_file_line_generator.h>
#include <dftracer/utils/utilities/fileio/lines/sources/async_indexed_file_bytes_generator.h>
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>

Types

// Zero-copy byte span (see core_infrastructure)
class ByteView {
    const void* data();
    std::size_t size();
    template <typename T> const T* as() const;
};

// Text content
struct Text {
    std::string content;
    bool empty() const;
    std::size_t size() const;
};

// Line with position
struct Line {
    std::string_view content;
    std::size_t line_number;  // 1-based
};

FileReaderUtility

Reads entire file into memory as text.

FileReaderUtility reader;
Text content = reader.process(FileEntry{"/path/to/file.txt"});

BinaryFileReaderUtility

Streaming binary file reader yielding zero-copy ByteView chunks.

auto gen = read_binary_file("/path/to/file.bin");
while (auto chunk = co_await gen.next()) {
    process(chunk->as<char>(), chunk->size());
}

StreamingFileReaderUtility

Reads file in chunks with lazy evaluation.

Input:

struct StreamReadInput {
    fs::path path;
    std::size_t chunk_size = 64 * 1024;  // 64KB default
};

Example:

StreamingFileReaderUtility reader;
auto input = StreamReadInput{"/path/to/large_file.dat", 1024 * 1024};

// Returns lazy iterator
ChunkRange chunks = reader.process(input);

for (const auto& chunk : chunks) {
    process(chunk);
}

StreamingFileWriterUtility

Writes data in chunks.

StreamingFileWriterUtility writer("/output/file.dat");

for (const auto& chunk : data_chunks) {
    writer.process(chunk);
}

writer.close();
std::cout << "Wrote " << writer.total_bytes() << " bytes\n";

StreamingLineReader

Lazy line-by-line reading for both plain and indexed files.

Plain text files:

// Read all lines
LineRange lines = StreamingLineReader::read_plain("/path/to/file.txt");

for (const auto& line : lines) {
    std::cout << line.line_number << ": " << line.content << "\n";
}

// Read specific line range
LineRange subset = StreamingLineReader::read_plain(
    "/path/to/file.txt",
    100,   // start_line
    200    // end_line
);

Indexed (compressed) files:

IndexedFileLineIteratorConfig config;
config.archive_path = "/path/to/file.pfw.gz";
config.index_path = "/path/to/file.pfw.gz.idx";
config.start_line = 0;
config.end_line = 1000;

LineRange lines = StreamingLineReader::read_indexed(config);

// Collect all into vector
std::vector<Line> all_lines = lines.collect();

// Or take first N
std::vector<Line> first_100 = lines.take(100);

// Or filter
 auto filtered = lines.filter([](const Line& l) {
     return l.content.find("error") != std::string_view::npos;
 });

Asynchronous File I/O

Async generators provide non-blocking line and byte reading using C++20 coroutines. They are ideal for high-concurrency scenarios and integrating with async task pipelines.

Plain Text Files

Read lines from uncompressed files with async I/O:

#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_line_generator.h>

// Read all lines asynchronously
auto gen = async_plain_file_lines("data.txt");
while (auto line = co_await gen.next()) {
    std::cout << line->line_number << ": " << line->content << "\n";
}

// Read specific line range
auto gen = async_plain_file_lines("data.txt", 100, 200);  // lines 100-200
while (auto line = co_await gen.next()) {
    process(*line);
}

Plain Text Files by Byte Range

Read lines within a byte range from plain files, with automatic line-boundary alignment:

#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_bytes_generator.h>

auto gen = async_plain_file_bytes("data.txt", 1000, 5000);  // bytes 1000-5000
while (auto line = co_await gen.next()) {
    process(*line);  // Yields complete lines within the byte range
}

Indexed (Compressed) Files

Read lines from .gz.idx indexed archive files asynchronously:

#include <dftracer/utils/utilities/fileio/lines/sources/async_indexed_file_line_generator.h>

auto config = IndexedFileLineIteratorConfig()
    .with_file("trace.pfw.gz", "trace.pfw.gz.idx")
    .with_line_range(1, 1000);

auto gen = async_indexed_file_lines(config);
while (auto line = co_await gen.next()) {
    process(*line);
}

Indexed Files by Byte Range

Read lines within a byte range from indexed archives:

#include <dftracer/utils/utilities/fileio/lines/sources/async_indexed_file_bytes_generator.h>

auto reader = ReaderFactory::create("trace.pfw.gz", "trace.pfw.gz.idx");
auto gen = async_indexed_file_bytes(reader, 1000, 5000);  // bytes 1000-5000
while (auto line = co_await gen.next()) {
    process(*line);
}

Streaming Gzip Decompression

Read lines from .gz files without building an index, using streaming decompression:

#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>

// Decompress and stream lines without building a sidecar index
auto gen = async_streaming_gz_lines("data.pfw.gz");
while (auto line = co_await gen.next()) {
    process(*line);
}

// With line range filtering
auto gen = async_streaming_gz_lines("data.pfw.gz", 100, 200);  // lines 100-200
while (auto line = co_await gen.next()) {
    process(*line);
}

Parallel Writers

Layout-aware parallel writers for multi-worker output. The ParallelWriter interface is implemented by three concrete layouts under fileio/parallel/:

  • StripedWriter — single output file, atomic-offset pwrite per worker. Used on local FS and PFS without padded stripes.

  • PaddedStripedWriter — single output file where each worker chunk is padded to a full PFS stripe so per-stripe writes never cross workers. Recommended for Lustre/GPFS when the stripe size is at least MIN_PADDED_STRIPE_BYTES (1 MiB).

  • ShardedWriter — N output files, one per worker, glob-named by ordinal. Used on NFS where atomic-offset pwrite is not reliable.

#include <dftracer/utils/utilities/fileio/parallel/parallel_writer.h>
#include <dftracer/utils/utilities/fileio/parallel/layout.h>

using namespace dftracer::utils::utilities::fileio::parallel;

auto info   = detect_layout("/lustre/.../output.pfw.gz");
auto sizing = compute_writer_sizing(info, /*baseline_workers=*/64,
                                    /*default_flush=*/4 << 20,
                                    /*headroom=*/1 << 20,
                                    /*padded=*/true);

WriterConfig cfg{
    .layout = info.layout,
    .stripe_size = info.stripe_size,
    .gzip = true,
};
auto writer = make_writer(cfg);
co_await writer->open("output.pfw.gz", sizing.num_workers,
                      /*gzip_extension=*/true, scope);

co_await writer->write_header(header_bytes);
co_await writer->write_chunk(worker_id, chunk_bytes);
auto member = writer->last_member(worker_id);  // offset+length of the gzip member
co_await writer->write_footer(footer_bytes);
co_await writer->close();

The writer collects per-chunk MemberSpan entries (offset + length of each independently decompressable gzip member) and exposes them via member_layout() after close. shard_base_offsets() remaps shard-local offsets to merged-file offsets for sharded layouts.

Layout detection (detect_layout) classifies a path’s filesystem as Lustre, GPFS, BeeGFS, NFS, or LOCAL and picks SHARDED on NFS, STRIPED elsewhere; compute_writer_sizing caps worker count at the PFS stripe count and sets flush_threshold to the stripe size for padded layouts so each compressed flush coalesces into one stripe.

Note

Compressor generators consumed by the parallel writer are wrapped in smart pointers (std::unique_ptr<ManualStreamingCompressorUtility>) so they can be moved across coroutine frames without leaking the underlying zlib stream.

Async vs Synchronous

Use async generators when:

  • Integrating with coroutine-based pipelines (TaskGraph, Channel-based streaming)

  • Processing multiple files concurrently without blocking threads

  • Operating in high-concurrency environments (many tasks sharing thread pools)

Use synchronous readers when:

  • Sequential file processing is acceptable

  • Working outside of coroutine contexts

  • Simpler error handling is preferred (no need to handle resumable failures)