TraceReader Module¶
The TraceReader is the recommended way to read trace files. It auto-selects
sequential or indexed reading based on whether a root-local .dftindex
RocksDB store exists.
TraceReader Class¶
- class dftracer.utils.TraceReader(path: str, index_dir: str = '', checkpoint_size: int = 33554432, auto_build_index: bool = False, runtime: Runtime | None = None)[source]¶
Bases:
object- read_lines(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) List[memoryview][source]¶
- iter_lines(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, memory_budget: int = 0) Iterator[memoryview][source]¶
- iter_json(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, batch_size: int = 1024, memory_budget: int = 0) JsonDictValue][source]¶
- read_json(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, batch_size: int = 1024) JsonDictValue][source]¶
- read_raw(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: str | None = None) List[memoryview][source]¶
- iter_raw(start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: str | None = None, memory_budget: int = 0) Iterator[memoryview][source]¶
- iter_arrow(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0) _ArrowBatchCapsule][source]¶
- iter_arrow_stream(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0) Any[source]¶
- read_arrow(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None, flatten_objects: bool = False, normalize: bool = False) ArrowTable[source]¶
- iter_lines_json(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) Iterator[Dict[str, Any]][source]¶
- read_lines_json(batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: str | None = None) List[Dict[str, Any]][source]¶
- write_arrow(path: str, views: List[str | Dict[str, Any]] | None = None, chunk_size_mb: int = 32, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]¶
- write_view_chunk(output_file: str, checkpoint_idx: int, start_byte: int, end_byte: int, view: str | Dict[str, Any] | None = None, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]¶
- write_view_chunks(chunks: List[Dict[str, Any]], output_dir: str, view: str | Dict[str, Any] | None = None, compression: str = 'zstd', batch_size: int = 10000) Dict[str, Any][source]¶
- __enter__() TraceReader[source]¶
- __exit__(exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None[source]¶
The path argument may be either a single trace file (.pfw / .pfw.gz)
or a directory. When a directory is given, all iter_* / read_* methods
discover .pfw and .pfw.gz files recursively and process them in parallel
on the Runtime thread pool.
Streaming Iterators¶
iter_lines(), iter_raw(), and iter_json() return Python
iterators backed by a bounded producer-consumer queue. The C++ coroutine runs
on the Runtime’s thread pool and pushes items; Python’s __next__ pops.
iter_lines(), iter_raw(), and read_lines() / read_raw() yield
memoryview objects (zero-copy views over the C++ buffer). Wrap with
bytes(mv) if you need an owned copy.
reader = TraceReader("trace.pfw.gz")
# Stream decoded lines (memoryview)
for line in reader.iter_lines():
process(bytes(line))
# Stream raw byte chunks (one line per chunk)
for chunk in reader.iter_raw(multi_line=False):
process(chunk) # memoryview
# Stream parsed JSON events (zero-copy JsonDictValue wrappers)
for obj in reader.iter_json():
print(obj["name"], obj["dur"])
# Materialize to list
lines = reader.read_lines() # list[memoryview]
chunks = reader.read_raw() # list[memoryview]
objects = reader.read_json() # list[JsonDictValue]
Arrow Output¶
iter_arrow() and read_arrow() parse JSON events into columnar Arrow
record batches using dynamic schema discovery. Each JSON key becomes a column;
types are inferred from values (int64, uint64, double, string, bool). Nested
objects/arrays are serialized as JSON strings by default; pass
flatten_objects=True to expand args into top-level columns, or
normalize=True to coerce mixed-type columns into a canonical form.
The returned objects implement the Arrow PyCapsule protocol
(__arrow_c_array__) for zero-copy interchange with pyarrow, polars, and
DuckDB.
reader = TraceReader("trace.pfw.gz")
# Stream Arrow batches (memory-efficient)
for batch in reader.iter_arrow(batch_size=10000):
pa_batch = pyarrow.record_batch(batch)
df = pa_batch.to_pandas()
# Single C-side stream drain via Arrow C Data Interface
stream = reader.iter_arrow_stream(batch_size=10000)
rbr = pyarrow.RecordBatchReader.from_stream(stream)
for batch in rbr:
...
# Materialize all events as ArrowTable
table = reader.read_arrow()
df = table.to_pandas() # requires pyarrow
df = table.to_polars() # requires polars
# With range parameters and object flattening
table = reader.read_arrow(start_line=100, end_line=200, flatten_objects=True)
Writing Arrow IPC Files¶
write_arrow() writes trace data to Arrow IPC files with optional
view-based partitioning. For finer control, get_view_chunks() returns
the candidate chunks after bloom-filter pruning, and write_view_chunk /
write_view_chunks write individual or batched chunks (the batched variant
runs all chunks concurrently on the Runtime).
reader = TraceReader("trace.pfw.gz")
# Partition by predefined views
result = reader.write_arrow(
"out/",
views=["io", "compute"],
chunk_size_mb=32,
compression="zstd",
)
# Custom view + explicit chunk plan
info = reader.get_view_chunks({"name": "posix", "query": 'cat == "POSIX"'})
reader.write_view_chunks(info["chunks"], "out/", view="io")
File Metadata¶
get_max_bytes() and get_num_lines() return file metadata without
reading the full file (when a .dftindex RocksDB index store exists):
reader = TraceReader("trace.pfw.gz")
max_bytes = reader.get_max_bytes() # 0 if no index for compressed files
num_lines = reader.get_num_lines() # 0 if no index
# Partition for parallel processing
if max_bytes > 0:
chunk_size = max_bytes // num_workers
for i in range(num_workers):
start = i * chunk_size
end = min((i + 1) * chunk_size, max_bytes)
process(reader.read_json(start_byte=start, end_byte=end))
Query Filtering¶
All line-based reading methods (read_lines, iter_lines,
iter_json, read_json, iter_arrow, iter_arrow_stream,
read_arrow) accept an optional query parameter for event filtering:
reader = TraceReader("trace.pfw.gz")
# Filter by category
for line in reader.iter_lines(query='cat == "POSIX"'):
process(line)
# Combine filters with AND/OR
lines = reader.read_lines(query='cat == "POSIX" and dur > 1000')
# Use IN for multiple values
lines = reader.read_lines(query='name in ["read", "write", "open"]')
# NOT queries
lines = reader.read_lines(query='not cat == "MPI"')
# Nested field paths
lines = reader.read_lines(query='args.level == "DEBUG"')
# Combine with range parameters
lines = reader.read_lines(start_line=1, end_line=1000,
query='cat == "POSIX"')
# Arrow output with query
table = reader.read_arrow(query='cat == "POSIX" and dur > 100')
df = table.to_pandas()
When an index exists, the query is used for chunk pruning (skipping entire chunks that cannot match) before per-event filtering. This provides significant speedups on large indexed traces.
Query DSL Syntax¶
The query language supports:
Comparison:
==,!=,>,<,>=,<=Logical:
and,or,not(case-insensitive)Membership:
in [...],not in [...]Grouping: parentheses
(...)Field paths: dotted notation for nested JSON (
args.level)Values: strings (
"POSIX"), integers (1000), floats (3.14), booleans (true/false)
Keywords (and, or, not, in, true, false) are
case-insensitive: AND, and, And all work.
String values are case-sensitive: cat == "POSIX" does not match "posix".
Python Field DSL¶
For programmatic query construction, use the Field class:
from dftracer.utils.query import Field
cat = Field("cat")
dur = Field("dur")
name = Field("name")
# Operators: ==, !=, >, <, >=, <=
q = cat == "POSIX"
# AND (&), OR (|), NOT (~)
q = (cat == "POSIX") & (dur > 1000)
q = (cat == "POSIX") | (cat == "STDIO")
q = ~(cat == "MPI")
# IN / NOT IN
q = name.is_in(["read", "write", "open"])
q = cat.not_in(["MPI"])
# Pass to TraceReader
reader = TraceReader("trace.pfw.gz")
lines = reader.read_lines(query=str(q))
ReadConfig Parameters¶
All reading methods accept these keyword arguments:
start_line/end_line– line range (1-indexed; 0 = no limit)start_byte/end_byte– byte range (0 = no limit)buffer_size– internal buffer size in bytes (default 4 MB)query– query DSL string for event filtering (default None)
Streaming methods (iter_lines, iter_raw, iter_json,
iter_arrow, iter_arrow_stream) additionally accept:
memory_budget– soft cap on in-flight bytes queued from the C++ producer (0 = default)
iter_raw and read_raw additionally accept:
line_aligned– if True, chunks are aligned to line boundaries (default True)multi_line– if True, chunks may contain multiple lines (default True)
iter_json and read_json additionally accept:
batch_size– events per parse batch (default 1024)
iter_arrow, iter_arrow_stream, and read_arrow additionally accept:
batch_size– maximum rows per Arrow batch (default 10000)flatten_objects– expand object fields into top-level columns (default False)normalize– coerce mixed-type columns into a canonical form (default False)
Out-of-range values are clamped to the actual file bounds (no errors thrown).