"""Arrow data interchange and I/O for DFTracer.
Provides:
- ArrowBatch and ArrowTable classes that wrap Arrow C Data Interface
objects (PyCapsules) with convenience methods for conversion to pandas
and polars DataFrames.
- write_arrow() and read_arrow() for Arrow IPC file I/O with Runtime
parallelization.
These wrappers are pure Python. The actual Arrow data is produced by the
C extension (TraceReader.iter_arrow, utility to_arrow methods). Conversion
to pandas requires pyarrow; conversion to polars requires polars. Neither
is a required dependency.
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union
if TYPE_CHECKING:
import pyarrow as pa
import pyarrow.ipc as ipc
from dftracer.utils.dftracer_utils_ext import (
read_arrow_files_parallel as _cpp_read_parallel,
)
_HAS_PYARROW = False
_HAS_CPP_READER = False
try:
import pyarrow as pa
import pyarrow.ipc as ipc
_HAS_PYARROW = True
except ImportError:
pass
try:
from dftracer.utils.dftracer_utils_ext import (
read_arrow_files_parallel as _cpp_read_parallel,
)
_HAS_CPP_READER = True
except ImportError:
pass
[docs]
class ArrowBatch:
"""Wrapper around an Arrow RecordBatch from the C extension.
Supports the Arrow PyCapsule protocol (__arrow_c_array__) for
zero-copy interchange with pyarrow, polars, and DuckDB.
The underlying C data is exported via ownership transfer on first
use. The pyarrow RecordBatch is cached so subsequent calls to
``to_pandas()``, ``to_polars()``, or ``__arrow_c_array__()`` are safe.
"""
def __init__(self, capsule: Any) -> None:
self._capsule = capsule
self._pa_batch: Any = None # cached pyarrow.RecordBatch
def _to_pa_batch(self) -> Any:
"""Convert to pyarrow RecordBatch, caching the result.
Returns:
pyarrow.RecordBatch: The converted batch.
Raises:
ImportError: If pyarrow is not installed.
"""
if self._pa_batch is not None:
return self._pa_batch
try:
import pyarrow as pa
except ImportError:
raise ImportError("pyarrow is required. Install with: pip install pyarrow") from None
self._pa_batch = pa.record_batch(self._capsule)
return self._pa_batch
[docs]
def __arrow_c_array__(self, requested_schema: Any = None) -> Tuple[Any, Any]:
"""Export via Arrow C Data Interface."""
return self._to_pa_batch().__arrow_c_array__(requested_schema)
@property
def num_rows(self) -> int:
"""Number of rows in this batch."""
if self._pa_batch is not None:
return self._pa_batch.num_rows
return self._capsule.num_rows
@property
def num_columns(self) -> int:
"""Number of columns in this batch."""
if self._pa_batch is not None:
return self._pa_batch.num_columns
return self._capsule.num_columns
[docs]
def to_pandas(self) -> Any:
"""Convert to pandas DataFrame.
Returns:
pandas.DataFrame: The converted DataFrame.
Raises:
ImportError: If pyarrow is not installed.
"""
return self._to_pa_batch().to_pandas()
[docs]
def to_polars(self) -> Any:
"""Convert to polars DataFrame.
Returns:
polars.DataFrame: The converted DataFrame.
Raises:
ImportError: If polars is not installed.
"""
try:
import polars as pl # type: ignore[import-not-found] # ty: ignore[unresolved-import]
except ImportError:
raise ImportError(
"polars is required for to_polars(). Install with: pip install polars"
) from None
return pl.from_arrow(self._to_pa_batch())
[docs]
class ArrowTable:
"""Wrapper around a collection of Arrow RecordBatches.
Returned by read_arrow() and utility process() methods. Supports
the Arrow PyCapsule stream protocol (__arrow_c_stream__) for
zero-copy interchange.
Accepts either a pre-built list of batches or a lazy iterator.
When constructed from an iterator, batches are not materialized
until data access (to_pandas, to_polars, batches, etc.).
``num_rows`` is special: if the iterator has not been consumed yet,
it streams through counting rows without retaining batches (O(1)
memory). After a streaming ``num_rows``, data access methods will
return empty results -- use ``iter_arrow`` directly if you need
both count and data for very large datasets.
"""
def __init__(
self,
batches: Any,
schema_capsule: Optional[Any] = None,
) -> None:
self._stream: Any = None
if isinstance(batches, list):
self._batches: Optional[list[Any]] = batches
self._iter: Optional[Iterator[Any]] = None
elif hasattr(batches, "__arrow_c_stream__"):
self._batches = None
self._iter = None
self._stream = batches
else:
self._batches = None
self._iter = iter(batches)
self._schema_capsule = schema_capsule
self._pa_table: Any = None
def _materialize(self) -> list[Any]:
if self._batches is not None:
return self._batches
if self._stream is not None:
self._to_pa_table()
if self._pa_table is not None:
self._batches = list(self._pa_table.to_batches())
return self._batches
if self._iter is not None:
self._batches = list(self._iter)
self._iter = None
return self._batches
self._batches = []
return self._batches
def _to_pa_table(self) -> Any:
"""Convert to pyarrow Table, caching the result.
Arrow C Data Interface export is single-use (ownership transfer),
so we cache the pyarrow table on first conversion. After
conversion the batch capsule references are cleared since pyarrow
now owns the underlying buffers.
Returns:
pyarrow.Table: The converted table.
Raises:
ImportError: If pyarrow is not installed.
"""
if self._pa_table is not None:
return self._pa_table
try:
import pyarrow as pa
except ImportError:
raise ImportError("pyarrow is required. Install with: pip install pyarrow") from None
if self._stream is not None:
self._pa_table = pa.table(self._stream)
self._stream = None
return self._pa_table
batches = self._materialize()
pa_batches = [pa.record_batch(b) for b in batches]
self._batches = None
if not pa_batches:
schema = pa.schema([])
if self._schema_capsule is not None:
schema = pa.Schema.from_arrow(self._schema_capsule)
self._pa_table = pa.table({}, schema=schema)
else:
self._pa_table = pa.Table.from_batches(pa_batches)
return self._pa_table
[docs]
def __arrow_c_stream__(self, requested_schema: Any = None) -> Any:
"""Arrow C Stream Interface -- yields batches to consumers."""
return self._to_pa_table().__arrow_c_stream__(requested_schema)
@property
def num_batches(self) -> int:
"""Number of batches."""
return len(self._materialize())
@property
def num_rows(self) -> int:
"""Total number of rows across all batches."""
if self._pa_table is not None:
return self._pa_table.num_rows
return sum(b.num_rows for b in self._materialize())
@property
def empty(self) -> bool:
"""True if there are no batches."""
if self._pa_table is not None:
return self._pa_table.num_rows == 0
return len(self._materialize()) == 0
[docs]
def batch(self, i: int) -> Any:
"""Get the i-th batch."""
return self._materialize()[i]
[docs]
def batches(self) -> Iterator[Any]:
"""Iterate over batches."""
return iter(self._materialize())
[docs]
def to_pandas(self) -> Any:
"""Convert all batches to a single pandas DataFrame.
Returns:
pandas.DataFrame: The converted DataFrame.
Raises:
ImportError: If pyarrow is not installed.
"""
return self._to_pa_table().to_pandas()
[docs]
def to_polars(self) -> Any:
"""Convert all batches to a single polars DataFrame.
Returns:
polars.DataFrame: The converted DataFrame.
Raises:
ImportError: If polars is not installed.
"""
try:
import polars as pl # type: ignore[import-not-found] # ty: ignore[unresolved-import]
except ImportError:
raise ImportError(
"polars is required for to_polars(). Install with: pip install polars"
) from None
table = self._to_pa_table()
if table.num_rows == 0:
return pl.DataFrame()
return pl.from_arrow(table)
def write_arrow(
file_path: str,
output_dir: str,
view: Optional[Union[str, Dict]] = None,
index_dir: str = "",
checkpoint_size: int = 32 * 1024 * 1024,
compression: str = "zstd",
batch_size: int = 10000,
chunks: Optional[List[Dict]] = None,
parallel: bool = True,
) -> Dict:
"""Write trace data to Arrow IPC files.
If chunks is provided, writes those specific chunks. Otherwise, gets
all candidate chunks from the file after bloom filter pruning.
Args:
file_path: Path to the trace file.
output_dir: Directory for output Arrow IPC files.
view: View definition - string ('io', 'compute', 'dlio') or
dict with 'name' and optional 'query'.
index_dir: Directory for index files.
checkpoint_size: Checkpoint size for indexing.
compression: 'zstd' or 'none'.
batch_size: Events per batch.
chunks: Optional list of specific chunks to write. If None,
gets all candidate chunks from the file.
parallel: If True (default), process chunks in parallel via Runtime.
Returns:
dict with:
- files: List of written Arrow IPC file paths
- total_chunks: Number of chunks processed
- skipped_chunks: Number of chunks skipped by bloom filter
- total_rows: Total rows written
- total_events_matched: Total events matched
Example:
>>> from dftracer.utils.arrow import write_arrow
>>> result = write_arrow(
... "trace.pfw.gz",
... "/output/io_view",
... view="io",
... )
>>> print(f"Wrote {len(result['files'])} files")
"""
from dftracer.utils import TraceReader, get_default_runtime
os.makedirs(output_dir, exist_ok=True)
reader = TraceReader(file_path, index_dir=index_dir, checkpoint_size=checkpoint_size)
if chunks is None:
chunks_result = reader.get_view_chunks(view=view)
if not chunks_result["file_may_match"]:
return {
"files": [],
"total_chunks": 0,
"skipped_chunks": chunks_result["skipped_checkpoints"],
"total_rows": 0,
"total_events_matched": 0,
}
chunks = chunks_result["chunks"]
skipped_chunks = chunks_result["skipped_checkpoints"]
else:
skipped_chunks = 0
if not chunks:
return {
"files": [],
"total_chunks": 0,
"skipped_chunks": skipped_chunks,
"total_rows": 0,
"total_events_matched": 0,
}
if parallel and len(chunks) > 1:
runtime = get_default_runtime()
def write_chunk(chunk: Dict) -> Dict:
r = TraceReader(file_path, index_dir=index_dir, checkpoint_size=checkpoint_size)
return r.write_view_chunks(
chunks=[chunk],
output_dir=output_dir,
view=view,
compression=compression,
batch_size=batch_size,
)
handles = [
runtime.submit(write_chunk, chunk, name=f"write:chunk_{i}")
for i, chunk in enumerate(chunks)
]
batch_results = [h.get() for h in handles]
else:
result = reader.write_view_chunks(
chunks=chunks,
output_dir=output_dir,
view=view,
compression=compression,
batch_size=batch_size,
)
batch_results = [result]
files = []
total_rows = 0
total_events_matched = 0
for br in batch_results:
for r in br.get("results", []):
if r.get("rows_written", 0) > 0:
files.append(r["output_file"])
total_rows += br.get("total_rows", 0)
total_events_matched += br.get("total_events_matched", 0)
return {
"files": files,
"total_chunks": len(chunks),
"skipped_chunks": skipped_chunks,
"total_rows": total_rows,
"total_events_matched": total_events_matched,
}
def read_arrow(
files: List[str],
parallel: bool = True,
):
"""Read Arrow IPC files and return a combined pyarrow Table.
Uses pyarrow for reading with optional parallelization via Runtime.
Falls back to C++ reader if pyarrow is not available.
Args:
files: List of Arrow IPC file paths.
parallel: If True (default), read files in parallel using Runtime.
Returns:
pyarrow.Table with all data combined, or list of batch objects if
pyarrow is not available.
Example:
>>> from dftracer.utils.arrow import read_arrow
>>> table = read_arrow(["file1.arrow", "file2.arrow"])
>>> print(f"Read {table.num_rows} rows")
"""
from dftracer.utils import get_default_runtime
if not files:
return None
valid_files = [f for f in files if os.path.exists(f) and os.path.getsize(f) > 0]
if not valid_files:
return None
if _HAS_PYARROW:
def read_one(path: str) -> pa.Table:
return ipc.open_file(path).read_all()
if parallel and len(valid_files) > 1:
runtime = get_default_runtime()
handles = [
runtime.submit(read_one, f, name=f"read:{os.path.basename(f)}") for f in valid_files
]
tables = [h.get() for h in handles]
else:
tables = [read_one(f) for f in valid_files]
if not tables:
return None
return pa.concat_tables(tables)
if not _HAS_CPP_READER:
raise ImportError(
"Neither pyarrow nor C++ Arrow IPC reader available. "
"Install pyarrow or build dftracer-utils with Arrow IPC support."
)
from dftracer.utils import Runtime as PyRuntime
runtime = get_default_runtime()
native_rt = runtime._native if isinstance(runtime, PyRuntime) else runtime
result = _cpp_read_parallel(valid_files, runtime=native_rt)
batches = []
for fr in result.get("file_results", []):
if fr.get("success"):
batches.extend(fr.get("batches", []))
return batches