Quick Start Guide ================= This guide will help you get started with dftracer utilities quickly. Python Quick Start ------------------ Reading Trace Files ~~~~~~~~~~~~~~~~~~~ The most common use case is reading trace files: .. code-block:: python from dftracer.utils import TraceReader # Open a compressed trace file (auto-detects index sidecar) reader = TraceReader("trace.pfw.gz") # ...or pass a directory; TraceReader scans for .pfw / .pfw.gz files # and streams them transparently as a single logical input. reader = TraceReader("./traces") # Read all lines lines = reader.read_lines() for line in lines: print(line) # Read lines as JSON objects json_objects = reader.read_lines_json() for obj in json_objects: print(obj["name"], obj["dur"]) # Stream for memory efficiency for obj in reader.iter_lines_json(): process(obj) Streaming with TraceReader ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``TraceReader`` is the recommended way to read trace files. It auto-selects sequential or indexed reading and supports streaming iterators: .. code-block:: python from dftracer.utils import TraceReader reader = TraceReader("trace.pfw.gz") # Stream lines (memory-efficient, uses iterator) for line in reader.iter_lines(): process(line) # Stream raw byte chunks for chunk in reader.iter_raw(multi_line=False): process(chunk) # one line per chunk as bytes # Materialize all lines (convenience wrapper) lines = reader.read_lines() # With explicit Runtime for thread pool control from dftracer.utils import Runtime with Runtime(threads=8) as rt: reader = TraceReader("trace.pfw.gz", runtime=rt) for line in reader.iter_lines(): process(line) # Check progress print(rt.get_progress()) Async Task Submission ~~~~~~~~~~~~~~~~~~~~~ ``Runtime.submit()`` runs tasks asynchronously and returns a ``TaskHandle``: .. code-block:: python from dftracer.utils import Runtime with Runtime(threads=8, python_threads=4) as rt: # Submit Python callables -- runs on Python thread pool h1 = rt.submit(process_file, "trace1.pfw.gz", name="proc-1") h2 = rt.submit(process_file, "trace2.pfw.gz", name="proc-2") # Wait for all tasks rt.wait_all() # Or get individual results result = h1.get() # blocks until h1 completes Task names are auto-derived from the callable when not provided: .. code-block:: python rt.submit(my_function) # name = "my_function" rt.submit(obj.method) # name = "MyClass.method" rt.submit(lambda: None) # name = "" Composing tasks with dependency chains: .. code-block:: python def compose(filename): h1 = rt.submit(index_file, filename) result = h1.get() # wait for index h2 = rt.submit(query_index, result) # use result return h2.get() h = rt.submit(compose, "trace.pfw.gz", name="compose") print(h.get()) Error handling: .. code-block:: python # Per-task: .get() re-raises the original exception try: h.get() except ValueError as e: print(f"Task failed: {e}") # Batch: wait_all(raise_on_error=True) raises after all complete rt.wait_all(raise_on_error=True) # Callback: async notification on failure rt.set_error_callback(lambda h, e: log.error(f"{h.name}: {e}")) # Inspect failures for h in rt.get_failed(): print(f"{h.name}: {h.exception}") Arrow Data Interchange ~~~~~~~~~~~~~~~~~~~~~~ ``TraceReader`` and several utilities support Arrow output for efficient columnar data access. Arrow batches implement the PyCapsule protocol for zero-copy interchange with pyarrow, polars, and DuckDB. .. code-block:: python from dftracer.utils import TraceReader reader = TraceReader("trace.pfw.gz") # Stream Arrow batches for batch in reader.iter_arrow(batch_size=10000): df = pyarrow.record_batch(batch).to_pandas() # Materialize as ArrowTable table = reader.read_arrow() df = table.to_pandas() Query Filtering ~~~~~~~~~~~~~~~ ``TraceReader`` supports a query DSL for filtering events. When an index exists, chunk pruning skips non-matching chunks automatically. .. code-block:: python from dftracer.utils import TraceReader reader = TraceReader("trace.pfw.gz") # Filter by category for line in reader.iter_lines(query='cat == "POSIX"'): process(line) # Combine filters lines = reader.read_lines(query='cat == "POSIX" and dur > 1000') # Arrow output with query table = reader.read_arrow(query='name in ["read", "write"]') df = table.to_pandas() # Programmatic query building from dftracer.utils.query import Field cat = Field("cat") dur = Field("dur") q = (cat == "POSIX") & (dur > 1000) lines = reader.read_lines(query=str(q)) Utility Bindings ~~~~~~~~~~~~~~~~ The ``dftracer.utils.utilities`` module provides Python bindings for DFTracer's C++ utility classes. Tabular utilities return Arrow; scalar utilities return dicts. .. code-block:: python from dftracer.utils.utilities import ( AggregatorUtility, StatisticsQueryUtility, MetadataCollectorUtility, ) # Aggregation pipeline (returns Arrow) agg = AggregatorUtility() table = agg.process("./traces", time_interval_ms=1000.0) df = table.to_pandas() # Optional: aggregate extra numeric args fields table = agg.process( "./traces", custom_metric_fields=["bytes"], compute_percentiles=True, ) # Statistics query (returns dict) sq = StatisticsQueryUtility() stats = sq.process("trace.pfw.gz", query_type="summary") print(f"Events: {stats['total_events']}") # File metadata (returns dict) mc = MetadataCollectorUtility() meta = mc.process("trace.pfw.gz") print(f"Size: {meta['size_mb']:.2f} MB") Using with Dask ~~~~~~~~~~~~~~~ For distributed processing with ``dask.distributed``: .. code-block:: python from dask.distributed import Client from dftracer.utils.dask import DFTracerUtilsDaskWorkerPlugin client = Client("scheduler:8786") client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48)) def count_lines(path): from dftracer.utils import TraceReader return sum(1 for _ in TraceReader(path).iter_lines()) futures = client.map(count_lines, file_paths) results = client.gather(futures) Working with Indexer ~~~~~~~~~~~~~~~~~~~~ Create and use indexes for faster access: .. code-block:: python from dftracer.utils import Indexer # Create an indexer indexer = Indexer("trace.pfw.gz") # Build the index if needed if indexer.need_rebuild(): indexer.build() # Get index information print(f"Max bytes: {indexer.get_max_bytes()}") print(f"Num lines: {indexer.get_num_lines()}") # Get checkpoints checkpoints = indexer.get_checkpoints() for cp in checkpoints: print(f"Checkpoint {cp.checkpoint_idx}: {cp.num_lines} lines") C++ Quick Start --------------- Building Parallel Pipelines ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Create and execute parallel data processing tasks using coroutines: .. code-block:: cpp #include #include #include #include #include using namespace dftracer::utils; using namespace dftracer::utils::coro; int main() { // Configure executor auto config = PipelineConfig() .with_name("MyPipeline") .with_compute_threads(4); // Create channel for data streaming auto channel = make_channel(100); // Producer task - reads data and sends through channel auto producer = make_task( [ch = channel->producer()](CoroScope& scope) mutable -> CoroTask { auto guard = ch.guard(); for (int i = 0; i < 100; i++) { std::string data = "item-" + std::to_string(i); co_await ch.send(std::move(data)); } co_return; }, "Producer"); // Consumer task - reads from channel and processes auto consumer = make_task([channel](CoroScope& scope) -> CoroTask { while (auto item = co_await channel->receive()) { std::cout << "Processing: " << *item << std::endl; } co_return; }, "Consumer"); // Execute pipeline Pipeline pipeline(config); pipeline.set_source({producer}); pipeline.set_destination(consumer); pipeline.execute(); return 0; } Spawning Parallel Work ~~~~~~~~~~~~~~~~~~~~~~ Spawn multiple tasks to run in parallel: .. code-block:: cpp #include #include auto task = make_task([](CoroScope& scope) -> CoroTask { std::vector> futures; // Spawn 10 parallel workers for (int i = 0; i < 10; ++i) { auto future = scope.spawn([i](CoroScope& s) -> CoroTask { // Each worker processes item i int result = compute(i); co_return result; }); futures.push_back(std::move(future)); } // Collect results int total = 0; for (auto& fut : futures) { total += co_await fut; } std::cout << "Total: " << total << std::endl; co_return; }); Creating a Reader (Legacy) ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use the factory pattern to create a reader: .. code-block:: cpp #include #include #include #include int main() { // Create indexer first auto indexer = dftracer::utils::IndexerFactory::create( "trace.pfw.gz", "trace.pfw.gz.idx" ); // Create reader with indexer (transfers ownership) auto reader = dftracer::utils::ReaderFactory::create(indexer.release()); // Simple: Read lines by line range (returns string with all lines) std::string lines = reader->read_lines(1, 100); // Lines 1-100 std::cout << lines; // Advanced: Buffer-based reading for large files const size_t read_buffer_size = 1024 * 1024; // 1MB buffer auto buffer = std::make_unique(read_buffer_size); size_t start_bytes = 0; size_t end_bytes = reader->get_max_bytes(); size_t bytes_written; // Read in chunks while (start_bytes < end_bytes && (bytes_written = reader->read_line_bytes( start_bytes, end_bytes, buffer.get(), read_buffer_size)) > 0) { // Process the chunk std::cout.write(buffer.get(), bytes_written); start_bytes += bytes_written; // Advance for next read } return 0; } Using Async Generators for Streaming Data ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Process data lazily without materializing everything in memory: .. code-block:: cpp #include #include #include // Streaming data source (lazy evaluation) AsyncGenerator read_lines_async( const std::string& file_path) { // Opens file, yields lines on-demand // ... streaming implementation ... } auto task = make_task([](CoroScope& scope) -> CoroTask { auto gen = read_lines_async("trace.pfw.gz"); // Iterate asynchronously - each call to next() yields one item while (auto line = co_await gen.next()) { // Process line lazily as it's read parse_and_process(*line); } co_return; }); Reading with Line Processor (Legacy) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use a custom line processor for efficient line-by-line processing: .. code-block:: cpp #include #include #include // Custom line processor class MyLineProcessor : public dftracer::utils::LineProcessor { public: void process_line(const char* line, size_t length) override { // Process each line std::cout.write(line, length); } }; int main() { auto indexer = dftracer::utils::IndexerFactory::create( "trace.pfw.gz", "trace.pfw.gz.idx" ); auto reader = dftracer::utils::ReaderFactory::create(indexer.release()); MyLineProcessor processor; // Process lines 1-1000 with custom processor reader->read_lines_with_processor(1, 1000, processor); return 0; } Working with Indexer (Legacy) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use the factory pattern to create an indexer: .. code-block:: cpp #include int main() { // Create an indexer using the factory auto indexer = dftracer::utils::IndexerFactory::create( "trace.pfw.gz", // Archive path "trace.pfw.gz.idx", // Index path true // Force rebuild ); // Build the index indexer->build(); // Get index information std::cout << "Max bytes: " << indexer->get_max_bytes() << std::endl; std::cout << "Num lines: " << indexer->get_num_lines() << std::endl; return 0; } C Quick Start ------------- Reading Trace Files ~~~~~~~~~~~~~~~~~~~ Using the C API for reading trace files: .. code-block:: c #include #include #include int main() { // Create reader dft_reader_handle_t reader = dft_reader_create( "trace.pfw.gz", "trace.pfw.gz.idx", 1048576 // checkpoint_size ); // Allocate buffer char *buffer = malloc(1024 * 1024); // 1MB buffer // Read lines 1-100 int result = dft_reader_read_lines( reader, 1, 100, // start_line, end_line buffer, 1024 * 1024 // buffer_size ); if (result == 0) { printf("%s", buffer); } // Cleanup free(buffer); dft_reader_destroy(reader); return 0; } Working with Indexer ~~~~~~~~~~~~~~~~~~~~ Creating and using an indexer: .. code-block:: c #include #include int main() { // Create indexer dft_indexer_handle_t indexer = dft_indexer_create( "trace.pfw.gz", "trace.pfw.gz.idx", 1048576, // checkpoint_size 0 // force_rebuild ); // Build index if needed if (dft_indexer_need_rebuild(indexer)) { printf("Building index...\n"); dft_indexer_build(indexer); } // Get index information size_t max_bytes, num_lines; dft_indexer_get_max_bytes(indexer, &max_bytes); dft_indexer_get_num_lines(indexer, &num_lines); printf("Max bytes: %zu\n", max_bytes); printf("Num lines: %zu\n", num_lines); // Cleanup dft_indexer_destroy(indexer); return 0; } Next Steps ---------- - Read :doc:`pipeline` for comprehensive coroutine pipeline guide - Check :doc:`api/index` for detailed Python API documentation - See :doc:`cpp_api/index` for C++ API reference - Visit :doc:`utilities` for built-in composable components - Read :doc:`developers` for development guidelines Key Resources: - **Pipeline patterns**: :doc:`pipeline` covers CoroScope, channels, fan-out/fan-in, async generators - **CLI tools**: :doc:`cli` lists all available command-line utilities - **Python bindings**: Use ``from dftracer.utils import TraceReader, Indexer`` for Python scripts - **C++ integration**: Link ``dftracer-utils`` library and include headers from ``include/dftracer/utils/``