TraceReader Module

The TraceReader is the recommended way to read trace files. It auto-selects sequential or indexed reading based on whether a root-local .dftindex RocksDB store exists.

TraceReader Class

class dftracer.utils.TraceReader(path: str, index_dir: str = '', checkpoint_size: int = 33554432, auto_build_index: bool = False, runtime: Runtime | None = None)[source]

Bases: object

property path: str
property index_dir: str
property has_index: bool
property num_lines: int
read_lines(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) List[memoryview][source]
iter_lines(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, memory_budget: int = 0) Iterator[memoryview][source]
iter_json(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, batch_size: int = 1024, memory_budget: int = 0) JsonDictValue][source]
read_json(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, batch_size: int = 1024) JsonDictValue][source]
read_raw(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: str | None = None) List[memoryview][source]
iter_raw(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: str | None = None, memory_budget: int = 0) Iterator[memoryview][source]
iter_arrow(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0) _ArrowBatchCapsule][source]
iter_arrow_stream(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0) Any[source]
read_arrow(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False) ArrowTable[source]
iter_lines_json(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) Iterator[Dict[str, Any]][source]
read_lines_json(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) List[Dict[str, Any]][source]
get_max_bytes() int[source]
get_num_lines() int[source]
write_arrow(path: str, views: List[str | Dict[str, Any]] | None = None, chunk_size_mb: int = 32, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]
get_view_chunks(view: str | Dict[str, Any] | None = None) Dict[str, Any][source]
write_view_chunk(output_file: str, checkpoint_idx: int, start_byte: int, end_byte: int, view: str | Dict[str, Any] | None = None, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]
write_view_chunks(chunks: List[Dict[str, Any]], output_dir: str, view: str | Dict[str, Any] | None = None, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]
__enter__() TraceReader[source]
__exit__(exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None[source]

The path argument may be either a single trace file (.pfw / .pfw.gz) or a directory. When a directory is given, all iter_* / read_* methods discover .pfw and .pfw.gz files recursively and process them in parallel on the Runtime thread pool.

Streaming Iterators

iter_lines(), iter_raw(), and iter_json() return Python iterators backed by a bounded producer-consumer queue. The C++ coroutine runs on the Runtime’s thread pool and pushes items; Python’s __next__ pops.

iter_lines(), iter_raw(), and read_lines() / read_raw() yield memoryview objects (zero-copy views over the C++ buffer). Wrap with bytes(mv) if you need an owned copy.

reader = TraceReader("trace.pfw.gz")

# Stream decoded lines (memoryview)
for line in reader.iter_lines():
    process(bytes(line))

# Stream raw byte chunks (one line per chunk)
for chunk in reader.iter_raw(multi_line=False):
    process(chunk)  # memoryview

# Stream parsed JSON events (zero-copy JsonDictValue wrappers)
for obj in reader.iter_json():
    print(obj["name"], obj["dur"])

# Materialize to list
lines = reader.read_lines()        # list[memoryview]
chunks = reader.read_raw()         # list[memoryview]
objects = reader.read_json()       # list[JsonDictValue]

Arrow Output

iter_arrow() and read_arrow() parse JSON events into columnar Arrow record batches using dynamic schema discovery. Each JSON key becomes a column; types are inferred from values (int64, uint64, double, string, bool). Nested objects/arrays are serialized as JSON strings by default; pass flatten_objects=True to expand args into top-level columns, or normalize=True to coerce mixed-type columns into a canonical form.

The returned objects implement the Arrow PyCapsule protocol (__arrow_c_array__) for zero-copy interchange with pyarrow, polars, and DuckDB.

reader = TraceReader("trace.pfw.gz")

# Stream Arrow batches (memory-efficient)
for batch in reader.iter_arrow(batch_size=10000):
    pa_batch = pyarrow.record_batch(batch)
    df = pa_batch.to_pandas()

# Single C-side stream drain via Arrow C Data Interface
stream = reader.iter_arrow_stream(batch_size=10000)
rbr = pyarrow.RecordBatchReader.from_stream(stream)
for batch in rbr:
    ...

# Materialize all events as ArrowTable
table = reader.read_arrow()
df = table.to_pandas()    # requires pyarrow
df = table.to_polars()    # requires polars

# With range parameters and object flattening
table = reader.read_arrow(start_line=100, end_line=200, flatten_objects=True)

Writing Arrow IPC Files

write_arrow() writes trace data to Arrow IPC files with optional view-based partitioning. For finer control, get_view_chunks() returns the candidate chunks after bloom-filter pruning, and write_view_chunk / write_view_chunks write individual or batched chunks (the batched variant runs all chunks concurrently on the Runtime).

reader = TraceReader("trace.pfw.gz")

# Partition by predefined views
result = reader.write_arrow(
    "out/",
    views=["io", "compute"],
    chunk_size_mb=32,
    compression="zstd",
)

# Custom view + explicit chunk plan
info = reader.get_view_chunks({"name": "posix", "query": 'cat == "POSIX"'})
reader.write_view_chunks(info["chunks"], "out/", view="io")

File Metadata

get_max_bytes() and get_num_lines() return file metadata without reading the full file (when a .dftindex RocksDB index store exists):

reader = TraceReader("trace.pfw.gz")

max_bytes = reader.get_max_bytes()  # 0 if no index for compressed files
num_lines = reader.get_num_lines()  # 0 if no index

# Partition for parallel processing
if max_bytes > 0:
    chunk_size = max_bytes // num_workers
    for i in range(num_workers):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, max_bytes)
        process(reader.read_json(start_byte=start, end_byte=end))

Query Filtering

All line-based reading methods (read_lines, iter_lines, iter_json, read_json, iter_arrow, iter_arrow_stream, read_arrow) accept an optional query parameter for event filtering:

reader = TraceReader("trace.pfw.gz")

# Filter by category
for line in reader.iter_lines(query='cat == "POSIX"'):
    process(line)

# Combine filters with AND/OR
lines = reader.read_lines(query='cat == "POSIX" and dur > 1000')

# Use IN for multiple values
lines = reader.read_lines(query='name in ["read", "write", "open"]')

# NOT queries
lines = reader.read_lines(query='not cat == "MPI"')

# Nested field paths
lines = reader.read_lines(query='args.level == "DEBUG"')

# Combine with range parameters
lines = reader.read_lines(start_line=1, end_line=1000,
                           query='cat == "POSIX"')

# Arrow output with query
table = reader.read_arrow(query='cat == "POSIX" and dur > 100')
df = table.to_pandas()

When an index exists, the query is used for chunk pruning (skipping entire chunks that cannot match) before per-event filtering. This provides significant speedups on large indexed traces.

Query DSL Syntax

The query language supports:

  • Comparison: ==, !=, >, <, >=, <=

  • Logical: and, or, not (case-insensitive)

  • Membership: in [...], not in [...]

  • Grouping: parentheses (...)

  • Field paths: dotted notation for nested JSON (args.level)

  • Values: strings ("POSIX"), integers (1000), floats (3.14), booleans (true/false)

Keywords (and, or, not, in, true, false) are case-insensitive: AND, and, And all work. String values are case-sensitive: cat == "POSIX" does not match "posix".

Python Field DSL

For programmatic query construction, use the Field class:

from dftracer.utils.query import Field

cat = Field("cat")
dur = Field("dur")
name = Field("name")

# Operators: ==, !=, >, <, >=, <=
q = cat == "POSIX"

# AND (&), OR (|), NOT (~)
q = (cat == "POSIX") & (dur > 1000)
q = (cat == "POSIX") | (cat == "STDIO")
q = ~(cat == "MPI")

# IN / NOT IN
q = name.is_in(["read", "write", "open"])
q = cat.not_in(["MPI"])

# Pass to TraceReader
reader = TraceReader("trace.pfw.gz")
lines = reader.read_lines(query=str(q))

ReadConfig Parameters

All reading methods accept these keyword arguments:

  • start_line / end_line – line range (1-indexed; 0 = no limit)

  • start_byte / end_byte – byte range (0 = no limit)

  • buffer_size – internal buffer size in bytes (default 4 MB)

  • query – query DSL string for event filtering (default None)

Streaming methods (iter_lines, iter_raw, iter_json, iter_arrow, iter_arrow_stream) additionally accept:

  • memory_budget – soft cap on in-flight bytes queued from the C++ producer (0 = default)

iter_raw and read_raw additionally accept:

  • line_aligned – if True, chunks are aligned to line boundaries (default True)

  • multi_line – if True, chunks may contain multiple lines (default True)

iter_json and read_json additionally accept:

  • batch_size – events per parse batch (default 1024)

iter_arrow, iter_arrow_stream, and read_arrow additionally accept:

  • batch_size – maximum rows per Arrow batch (default 10000)

  • flatten_objects – expand object fields into top-level columns (default False)

  • normalize – coerce mixed-type columns into a canonical form (default False)

Out-of-range values are clamped to the actual file bounds (no errors thrown).