Runtime Module¶
The Runtime manages a thread pool for executing coroutines and Python
callables. It wraps the C++ Executor and Watchdog without
Pipeline/DAG overhead.
Runtime Class¶
- class dftracer.utils.Runtime(threads: int = 0, io_threads: int = 0)[source]¶
Bases:
objectRuntime with async task submission and Python callable support.
Wraps the C++ Runtime and adds:
submit()for both C++ coroutine tasks and Python callableswait_all()across both C++ and Python tasksError tracking and callbacks
Example:
with Runtime(threads=8, io_threads=8, python_threads=4) as rt: h = rt.submit(lambda x: x * 2, 21) assert h.get() == 42
- Parameters:
- submit(task_or_fn: TaskHandle, *args: Any, name: str | None = None, **kwargs: Any) TaskHandle[Any][source]¶
- submit(task_or_fn: Callable[[...], T], *args: Any, name: str | None = None, **kwargs: Any) TaskHandle[T]
Submit a task for async execution.
Accepts either a C++ TaskHandle (pass-through) or a Python callable.
- Parameters:
task_or_fn (Any) – Python callable or a C++ _NativeTaskHandle to wrap.
*args (Any) – Arguments for callable (ignored for C++ TaskHandle).
name (str | None) –
Task name for tracking. If None, auto-derived:
C++ TaskHandle: uses name from the handle
callable: qualified name (e.g.
"module.func")
**kwargs (Any) – Keyword arguments for callable.
- Returns:
TaskHandle that can be waited on or used to get the result.
- Raises:
TypeError – If task_or_fn is not callable or a TaskHandle.
- Return type:
Example:
h = rt.submit(lambda x, y: x + y, 3, 4, name="add") result = h.get() # 7
- wait(handle: TaskHandle[Any]) None[source]¶
Block until a specific task completes.
- wait_all(raise_on_error: bool = False) None[source]¶
Block until all submitted tasks complete.
- Parameters:
raise_on_error (bool) – If True, raise RuntimeError after all tasks complete if any task failed. The error message includes all failed task names. If False (default), failed tasks are silently collected — check individual handles with
.get()to see errors.
- get_failed() List[TaskHandle[Any]][source]¶
Return handles of tasks that failed since last clear.
Call after
wait_all()to inspect failures.
- set_error_callback(callback: Callable[[TaskHandle[Any], BaseException], None] | None) None[source]¶
Set callback invoked when any task fails.
Called from the task’s thread. Must be thread-safe. Set to None to clear.
Example:
rt.set_error_callback( lambda h, e: print(f"FAILED {h.name}: {e}") )
- shutdown(wait: bool = True) None[source]¶
Shut down the runtime.
- Parameters:
wait (bool) – If True (default), wait for all tasks to complete first.
- __exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None[source]¶
The threads argument sizes the compute pool; io_threads sizes a
separate pool dedicated to blocking I/O tasks. Both default to 0,
which lets the runtime auto-size based on the host.
TaskHandle Class¶
- class dftracer.utils.TaskHandle(native: TaskHandle | None = None, future: Future[T] | None = None, name: str = '', task_id: int = -1)[source]¶
Bases:
Generic[T]Unified handle for both C++ and Python tasks.
Wraps either a C++ _NativeTaskHandle or a concurrent.futures.Future.
Example:
h = rt.submit(lambda: 42) result = h.get() # int h.wait() assert h.done()
- property exception: BaseException | None¶
Stored exception if task failed, None otherwise.
Module-level Functions¶
Submitting Tasks¶
submit() is always async – it returns a TaskHandle immediately.
Use .get() to block and retrieve the result, or .wait() to block
without a return value.
import dftracer.utils as dft
rt = dft.Runtime(threads=8, io_threads=4)
# Submit a Python callable
h = rt.submit(lambda x, y: x + y, 3, 4, name="add")
result = h.get() # blocks, returns 7
# Fire multiple tasks
handles = [rt.submit(process, f, name=f"proc-{f}") for f in files]
rt.wait_all() # blocks until all complete
# Check results
for h in handles:
print(h.name, h.get())
Name Auto-derivation¶
When name is not provided, it is derived from the callable:
def my_function(): ...
rt.submit(my_function) # name = "my_function"
class Pipeline:
def run(self): ...
p = Pipeline()
rt.submit(p.run) # name = "Pipeline.run"
rt.submit(lambda: None) # name = "<lambda>"
# Explicit override
rt.submit(my_function, name="custom-name")
Composing Tasks¶
Tasks can be composed by passing resolved values or handles:
# Pattern 1: resolve value, pass to next task
def compose(filename):
h1 = rt.submit(index, filename)
result = h1.get() # blocks for index result
h2 = rt.submit(process, result) # uses resolved value
return h2.get()
h = rt.submit(compose, "trace.pfw.gz", name="compose")
print(h.get())
# Pattern 2: pass handle directly (utility unwraps internally)
def compose(filename):
h1 = rt.submit(index, filename)
h2 = rt.submit(process, h1) # process calls h1.get()
return h2.get()
Error Handling¶
Per-task errors are propagated via .get() and .wait():
h = rt.submit(failing_function, name="fail")
try:
h.get()
except ValueError as e:
print(f"Task failed: {e}")
Batch error handling with wait_all():
# Default: wait_all() does NOT raise on errors
rt.submit(failing_function, name="fail")
rt.wait_all()
# Check failures
for h in rt.get_failed():
print(f"{h.name} failed: {h.exception}")
# Strict mode: raise after all tasks complete
rt.wait_all(raise_on_error=True) # RuntimeError: 1 task(s) failed: ...
Error callbacks for async notification:
rt.set_error_callback(lambda h, e: print(f"FAILED {h.name}: {e}"))
rt.submit(failing_function, name="monitored")
rt.wait_all()
Progress Tracking¶
get_progress() returns a dict with:
{
"total": 10,
"completed": 8,
"running": 2,
"queued": 0,
"failed": 0,
"workers": [
{"id": 0, "idle": False, "task": "iter_lines", "queue_depth": 0},
{"id": 1, "idle": True, "task": "", "queue_depth": 0},
],
"tasks": [
{"name": "iter_lines", "state": "completed",
"execution_duration_ms": 12.3, "queued_duration_ms": 0.1,
"progress_pct": 100.0, ...},
],
"errors": [],
}
Dask Integration¶
For dask.distributed, use DFTracerUtilsDaskWorkerPlugin to create
a per-worker Runtime:
from dask.distributed import Client
from dftracer.utils.dask import DFTracerUtilsDaskWorkerPlugin
client = Client("scheduler:8786")
client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48))
Dask is an optional dependency – the plugin module is only importable
when dask.distributed is installed.