Source code for dftracer.utils.trace_reader

"""Python TraceReader wrapping the native C extension.

Delegates all native methods and adds iter_lines_json / read_lines_json
as shims over iter_arrow via pyarrow.
"""

from __future__ import annotations

from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Type, Union

from .dftracer_utils_ext import (
    JsonDictValue,
    _ArrowBatchCapsule,
)
from .dftracer_utils_ext import (
    TraceReader as _NativeTraceReader,
)

if TYPE_CHECKING:
    from .arrow import ArrowTable
    from .runtime import Runtime


[docs] class TraceReader: __slots__ = ("_native",) def __init__( self, path: str, index_dir: str = "", checkpoint_size: int = 33554432, auto_build_index: bool = False, runtime: Optional[Runtime] = None, ) -> None: if runtime is not None: self._native = _NativeTraceReader( path, index_dir=index_dir, checkpoint_size=checkpoint_size, auto_build_index=auto_build_index, runtime=runtime, ) else: self._native = _NativeTraceReader( path, index_dir=index_dir, checkpoint_size=checkpoint_size, auto_build_index=auto_build_index, ) # -- properties -- @property def path(self) -> str: return self._native.path @property def index_dir(self) -> str: return self._native.index_dir @property def has_index(self) -> bool: return self._native.has_index @property def num_lines(self) -> int: return self._native.num_lines # -- lines --
[docs] def read_lines( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, ) -> List[memoryview]: return self._native.read_lines( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, )
[docs] def iter_lines( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, memory_budget: int = 0, ) -> Iterator[memoryview]: return self._native.iter_lines( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, memory_budget=memory_budget, )
# -- json --
[docs] def iter_json( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, batch_size: int = 1024, memory_budget: int = 0, ) -> Iterator[JsonDictValue]: return self._native.iter_json( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, batch_size=batch_size, memory_budget=memory_budget, )
[docs] def read_json( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, batch_size: int = 1024, ) -> List[JsonDictValue]: return self._native.read_json( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, batch_size=batch_size, )
# -- raw --
[docs] def read_raw( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: Optional[str] = None, ) -> List[memoryview]: return self._native.read_raw( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, line_aligned=line_aligned, multi_line=multi_line, buffer_size=buffer_size, query=query, )
[docs] def iter_raw( self, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, line_aligned: bool = True, multi_line: bool = True, buffer_size: int = 4194304, query: Optional[str] = None, memory_budget: int = 0, ) -> Iterator[memoryview]: return self._native.iter_raw( start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, line_aligned=line_aligned, multi_line=multi_line, buffer_size=buffer_size, query=query, memory_budget=memory_budget, )
# -- arrow --
[docs] def iter_arrow( self, batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0, ) -> Iterator[_ArrowBatchCapsule]: return self._native.iter_arrow( batch_size=batch_size, start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, flatten_objects=flatten_objects, normalize=normalize, memory_budget=memory_budget, )
[docs] def iter_arrow_stream( self, batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, flatten_objects: bool = False, normalize: bool = False, memory_budget: int = 0, ) -> Any: return self._native.iter_arrow_stream( batch_size=batch_size, start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, flatten_objects=flatten_objects, normalize=normalize, memory_budget=memory_budget, )
[docs] def read_arrow( self, batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, flatten_objects: bool = False, normalize: bool = False, ) -> ArrowTable: return self._native.read_arrow( batch_size=batch_size, start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, flatten_objects=flatten_objects, normalize=normalize, )
# -- JSON shims via arrow --
[docs] def iter_lines_json( self, batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, ) -> Iterator[Dict[str, Any]]: try: import pyarrow as pa except ImportError: raise ImportError( "pyarrow is required for iter_lines_json. Install with: pip install pyarrow" ) from None for capsule in self._native.iter_arrow( batch_size=batch_size, start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, ): rb = pa.record_batch(capsule) yield from rb.to_pylist()
[docs] def read_lines_json( self, batch_size: int = 10000, start_line: int = 0, end_line: int = 0, start_byte: int = 0, end_byte: int = 0, buffer_size: int = 4194304, query: Optional[str] = None, ) -> List[Dict[str, Any]]: return list( self.iter_lines_json( batch_size=batch_size, start_line=start_line, end_line=end_line, start_byte=start_byte, end_byte=end_byte, buffer_size=buffer_size, query=query, ) )
# -- metadata --
[docs] def get_max_bytes(self) -> int: return self._native.get_max_bytes()
[docs] def get_num_lines(self) -> int: return self._native.get_num_lines()
# -- arrow IPC writing --
[docs] def write_arrow( self, path: str, views: Optional[List[Union[str, Dict[str, Any]]]] = None, chunk_size_mb: int = 32, compression: str = "zstd", batch_size: int = 10000, ) -> Dict[str, Any]: return self._native.write_arrow( path, views=views, chunk_size_mb=chunk_size_mb, compression=compression, batch_size=batch_size, )
[docs] def get_view_chunks( self, view: Optional[Union[str, Dict[str, Any]]] = None, ) -> Dict[str, Any]: return self._native.get_view_chunks(view=view)
[docs] def write_view_chunk( self, output_file: str, checkpoint_idx: int, start_byte: int, end_byte: int, view: Optional[Union[str, Dict[str, Any]]] = None, compression: str = "zstd", batch_size: int = 10000, ) -> Dict[str, Any]: return self._native.write_view_chunk( output_file=output_file, checkpoint_idx=checkpoint_idx, start_byte=start_byte, end_byte=end_byte, view=view, compression=compression, batch_size=batch_size, )
[docs] def write_view_chunks( self, chunks: List[Dict[str, Any]], output_dir: str, view: Optional[Union[str, Dict[str, Any]]] = None, compression: str = "zstd", batch_size: int = 10000, ) -> Dict[str, Any]: return self._native.write_view_chunks( chunks=chunks, output_dir=output_dir, view=view, compression=compression, batch_size=batch_size, )
# -- context manager --
[docs] def __enter__(self) -> TraceReader: self._native.__enter__() return self
[docs] def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self._native.__exit__(exc_type, exc_val, exc_tb)