Runtime Module

The Runtime manages a thread pool for executing coroutines and Python callables. It wraps the C++ Executor and Watchdog without Pipeline/DAG overhead.

Runtime Class

class dftracer.utils.Runtime(threads: int = 0, io_threads: int = 0)[source]

Bases: object

Runtime with async task submission and Python callable support.

Wraps the C++ Runtime and adds:

  • submit() for both C++ coroutine tasks and Python callables

  • wait_all() across both C++ and Python tasks

  • Error tracking and callbacks

Example:

with Runtime(threads=8, io_threads=8, python_threads=4) as rt:
    h = rt.submit(lambda x: x * 2, 21)
    assert h.get() == 42
Parameters:
  • threads (int) – Number of C++ executor threads (0 = hardware_concurrency).

  • io_threads (int) – Number of C++ I/O threads (0 = hardware_concurrency).

  • python_threads (int) – Number of Python ThreadPoolExecutor threads (0 = min(32, threads)).

submit(task_or_fn: TaskHandle, *args: Any, name: str | None = None, **kwargs: Any) TaskHandle[Any][source]
submit(task_or_fn: Callable[[...], T], *args: Any, name: str | None = None, **kwargs: Any) TaskHandle[T]

Submit a task for async execution.

Accepts either a C++ TaskHandle (pass-through) or a Python callable.

Parameters:
  • task_or_fn (Any) – Python callable or a C++ _NativeTaskHandle to wrap.

  • *args (Any) – Arguments for callable (ignored for C++ TaskHandle).

  • name (str | None) –

    Task name for tracking. If None, auto-derived:

    • C++ TaskHandle: uses name from the handle

    • callable: qualified name (e.g. "module.func")

  • **kwargs (Any) – Keyword arguments for callable.

Returns:

TaskHandle that can be waited on or used to get the result.

Raises:

TypeError – If task_or_fn is not callable or a TaskHandle.

Return type:

TaskHandle[Any]

Example:

h = rt.submit(lambda x, y: x + y, 3, 4, name="add")
result = h.get()  # 7
wait(handle: TaskHandle[Any]) None[source]

Block until a specific task completes.

wait_all(raise_on_error: bool = False) None[source]

Block until all submitted tasks complete.

Parameters:

raise_on_error (bool) – If True, raise RuntimeError after all tasks complete if any task failed. The error message includes all failed task names. If False (default), failed tasks are silently collected — check individual handles with .get() to see errors.

get_failed() List[TaskHandle[Any]][source]

Return handles of tasks that failed since last clear.

Call after wait_all() to inspect failures.

clear_failed() None[source]

Clear the list of failed task handles.

set_error_callback(callback: Callable[[TaskHandle[Any], BaseException], None] | None) None[source]

Set callback invoked when any task fails.

Called from the task’s thread. Must be thread-safe. Set to None to clear.

Example:

rt.set_error_callback(
    lambda h, e: print(f"FAILED {h.name}: {e}")
)
shutdown(wait: bool = True) None[source]

Shut down the runtime.

Parameters:

wait (bool) – If True (default), wait for all tasks to complete first.

get_progress() Dict[str, Any][source]

Return progress dict from C++ executor.

is_responsive() bool[source]

Return True if the runtime is making progress.

set_timeout(global_ms: int = 0) None[source]

Set global timeout in milliseconds.

set_default_task_timeout(ms: int = 0) None[source]

Set default per-task timeout in milliseconds.

property threads: int

Number of C++ worker threads.

property io_threads: int

Number of C++ I/O threads.

property python_threads: int

Number of Python worker threads (0 if pool not yet created).

__enter__() Runtime[source]
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None[source]

The threads argument sizes the compute pool; io_threads sizes a separate pool dedicated to blocking I/O tasks. Both default to 0, which lets the runtime auto-size based on the host.

TaskHandle Class

class dftracer.utils.TaskHandle(native: TaskHandle | None = None, future: Future[T] | None = None, name: str = '', task_id: int = -1)[source]

Bases: Generic[T]

Unified handle for both C++ and Python tasks.

Wraps either a C++ _NativeTaskHandle or a concurrent.futures.Future.

Example:

h = rt.submit(lambda: 42)
result = h.get()  # int
h.wait()
assert h.done()
get() T[source]

Block until task completes and return result. Raises on error.

wait() None[source]

Block until task completes. Raises on error.

done() bool[source]

Return True if task has completed (success or failure).

property name: str

Task name (auto-derived or user-provided).

property task_id: int

Unique task identifier.

property exception: BaseException | None

Stored exception if task failed, None otherwise.

Module-level Functions

dftracer.utils.get_default_runtime() Runtime[source]

Return the module-level default Runtime (lazy-created).

dftracer.utils.set_default_runtime(runtime: Runtime | None) None[source]

Replace the module-level default Runtime (pass None to clear).

Submitting Tasks

submit() is always async – it returns a TaskHandle immediately. Use .get() to block and retrieve the result, or .wait() to block without a return value.

import dftracer.utils as dft

rt = dft.Runtime(threads=8, io_threads=4)

# Submit a Python callable
h = rt.submit(lambda x, y: x + y, 3, 4, name="add")
result = h.get()  # blocks, returns 7

# Fire multiple tasks
handles = [rt.submit(process, f, name=f"proc-{f}") for f in files]
rt.wait_all()  # blocks until all complete

# Check results
for h in handles:
    print(h.name, h.get())

Name Auto-derivation

When name is not provided, it is derived from the callable:

def my_function(): ...
rt.submit(my_function)           # name = "my_function"

class Pipeline:
    def run(self): ...
p = Pipeline()
rt.submit(p.run)                 # name = "Pipeline.run"

rt.submit(lambda: None)          # name = "<lambda>"

# Explicit override
rt.submit(my_function, name="custom-name")

Composing Tasks

Tasks can be composed by passing resolved values or handles:

# Pattern 1: resolve value, pass to next task
def compose(filename):
    h1 = rt.submit(index, filename)
    result = h1.get()                    # blocks for index result
    h2 = rt.submit(process, result)      # uses resolved value
    return h2.get()

h = rt.submit(compose, "trace.pfw.gz", name="compose")
print(h.get())

# Pattern 2: pass handle directly (utility unwraps internally)
def compose(filename):
    h1 = rt.submit(index, filename)
    h2 = rt.submit(process, h1)          # process calls h1.get()
    return h2.get()

Error Handling

Per-task errors are propagated via .get() and .wait():

h = rt.submit(failing_function, name="fail")
try:
    h.get()
except ValueError as e:
    print(f"Task failed: {e}")

Batch error handling with wait_all():

# Default: wait_all() does NOT raise on errors
rt.submit(failing_function, name="fail")
rt.wait_all()

# Check failures
for h in rt.get_failed():
    print(f"{h.name} failed: {h.exception}")

# Strict mode: raise after all tasks complete
rt.wait_all(raise_on_error=True)  # RuntimeError: 1 task(s) failed: ...

Error callbacks for async notification:

rt.set_error_callback(lambda h, e: print(f"FAILED {h.name}: {e}"))
rt.submit(failing_function, name="monitored")
rt.wait_all()

Progress Tracking

get_progress() returns a dict with:

{
    "total": 10,
    "completed": 8,
    "running": 2,
    "queued": 0,
    "failed": 0,
    "workers": [
        {"id": 0, "idle": False, "task": "iter_lines", "queue_depth": 0},
        {"id": 1, "idle": True, "task": "", "queue_depth": 0},
    ],
    "tasks": [
        {"name": "iter_lines", "state": "completed",
         "execution_duration_ms": 12.3, "queued_duration_ms": 0.1,
         "progress_pct": 100.0, ...},
    ],
    "errors": [],
}

Dask Integration

For dask.distributed, use DFTracerUtilsDaskWorkerPlugin to create a per-worker Runtime:

from dask.distributed import Client
from dftracer.utils.dask import DFTracerUtilsDaskWorkerPlugin

client = Client("scheduler:8786")
client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48))

Dask is an optional dependency – the plugin module is only importable when dask.distributed is installed.