Reader

See also

For complete class and member documentation, see the API Reference.

Streaming reader for compressed trace files with support for line-based and byte-based access, zero-copy reads, and async I/O.

#include <dftracer/utils/utilities/reader/trace_reader.h>

Overview

The reader provides random access into indexed compressed files (.pfw.gz + .dftindex). It supports multiple stream types for different access patterns and both synchronous and asynchronous reads.

The high-level dftracer::utils::utilities::reader::TraceReader exposes:

  • Directory input (Python binding): when constructed with a directory, all matching .pfw.gz files share one .dftindex root and are processed in parallel (each file becomes one or more checkpoint-level work items routed across the runtime thread pool).

  • JSON streaming (read_json): each line is parsed once with a reused simdjson ondemand JsonParser; the yielded JsonLine borrows the parser until the next next() call.

  • Arrow streaming (read_arrow, Python iter_arrow_stream): yields native ArrowExportResult record batches sized at batch_size rows. The Python binding exposes this as an Arrow C Data Interface stream (no Python-side row materialisation).

  • Query filtering: an optional query DSL string is compiled into AND-of-EQ probes when possible. The compiled probes evaluate directly against simdjson fields, with a uniform-match shortcut when every candidate chunk fully matches the predicate (no per-event re-evaluation).

  • Line-range work items: the dispatcher splits a file’s checkpoints into independent line-range work items that the runtime executes in parallel; ReadConfig::skip_pruning lets the dispatcher avoid re-running the chunk pruner per work item.

  • Batch chunk pruning: a single pruner pass per file feeds all work items, using ChunkPrunerUtility over bloom filters, chunk statistics, and the manifest CF.

  • flatten_objects: when set, top-level JSON object values (e.g. args) are expanded one level into parent.child columns with native Arrow types; deeper nesting still round-trips as a JSON text column.

ReaderFactory

Creates readers with automatic format detection.

using namespace dftracer::utils::utilities::reader;

auto reader = ReaderFactory::create(
    "trace.pfw.gz",       // Compressed file
    "trace.pfw.gz.idx"    // Index file
);

// Query file metadata
std::size_t num_lines = reader->get_num_lines();
std::size_t max_bytes = reader->get_max_bytes();

Stream Types

enum class StreamType {
    RAW,         // Raw compressed bytes
    BYTES,       // Decompressed bytes (no line splitting)
    LINE,        // One complete line per read()
    LINE_BYTES,  // Lines within a byte range
    LINES        // Multiple lines per read()
};

enum class RangeType {
    LINE_RANGE,  // Range specified as line numbers (1-based)
    BYTE_RANGE   // Range specified as byte offsets
};

StreamConfig

Fluent builder for configuring streams:

StreamConfig config;
config.stream_type(StreamType::LINE)
      .range_type(RangeType::LINE_RANGE)
      .from(100)    // Start line (1-based)
      .to(200);     // End line (inclusive)

Reading Lines

Read lines one at a time:

auto reader = ReaderFactory::create("trace.pfw.gz", "trace.pfw.gz.idx");

StreamConfig config;
config.stream_type(StreamType::LINE)
      .range_type(RangeType::LINE_RANGE)
      .from(1)
      .to(100);

auto stream = reader->stream(config);
std::string buffer(1024 * 1024, '\0');

while (!stream->done()) {
    std::size_t bytes = stream->read(buffer.data(), buffer.size());
    if (bytes == 0) break;

    std::string_view line(buffer.data(), bytes);
    process(line);
}

Async reading (coroutine):

auto stream = reader->stream(config);
std::string buffer(1024 * 1024, '\0');

while (!stream->done()) {
    std::size_t bytes = co_await stream->read_async(
        buffer.data(), buffer.size());
    if (bytes == 0) break;

    std::string_view line(buffer.data(), bytes);
    process(line);
}

Reading Byte Ranges

Read lines within a byte range (useful for parallel splitting):

StreamConfig config;
config.stream_type(StreamType::LINE_BYTES)
      .range_type(RangeType::BYTE_RANGE)
      .from(0)
      .to(1024 * 1024);  // First 1MB

auto stream = reader->stream(config);
// Each read() returns one complete line within the byte range

Line Processing Callbacks

Process lines without materializing them into a container:

class MyProcessor : public LineProcessor {
public:
    coro::CoroTask<bool> process(const char* data,
                                  std::size_t length) override {
        std::string_view line(data, length);
        // Process line...
        co_return true;  // Continue processing
    }
};

MyProcessor processor;
reader->process_lines(processor, 1, 1000);  // Lines 1-1000

C API

Opaque handle-based interface for C interoperability:

#include <dftracer/utils/utilities/reader/reader.h>

/* Create reader */
dft_reader_handle_t reader = dft_reader_create(
    "trace.pfw.gz", "trace.pfw.gz.idx");

/* Query metadata */
size_t num_lines = dft_reader_get_num_lines(reader);

/* Create stream */
dft_reader_stream_handle_t stream = dft_reader_create_stream(
    reader, DFT_STREAM_LINE, DFT_RANGE_LINE, 1, 100);

/* Read lines */
char buffer[1024 * 1024];
while (!dft_reader_stream_done(stream)) {
    size_t bytes = dft_reader_stream_read(
        stream, buffer, sizeof(buffer));
    if (bytes == 0) break;
    /* process buffer[0..bytes-1] */
}

/* Cleanup */
dft_reader_stream_destroy(stream);
dft_reader_destroy(reader);

Python API

from dftracer.utils import TraceReader

reader = TraceReader("trace.pfw.gz")

# Read all lines
lines = reader.read_lines()
for line in lines:
    print(line)

# Get metadata (requires index sidecar)
print(f"Total lines: {reader.get_num_lines()}")
print(f"Total bytes: {reader.get_max_bytes()}")

See Also