Skip to content

Tracing and Debugging Guide

Flujo provides rich internal tracing and visualization capabilities (FSD-12) that help you debug and analyze pipeline execution. This guide shows you how to use these features effectively.

Overview

The tracing system captures: - Hierarchical execution flow with parent-child relationships - Precise timing for each step and the overall pipeline - Status tracking (running, completed, failed) - Metadata including attempts, costs, and token counts - Error details with sanitized feedback

Enabling Tracing

Tracing is enabled by default when you create a Flujo instance:

from flujo import Flujo, Pipeline, Step

# Tracing is enabled by default
flujo = Flujo(pipeline=pipeline)

# Or explicitly enable it
flujo = Flujo(pipeline=pipeline, enable_tracing=True)

Accessing Trace Information

After running a pipeline, you can access the trace tree:

async for result in flujo.run_async("input_data"):
    pass

# Access the trace tree
if result.trace_tree:
    print(f"Root span: {result.trace_tree.name}")
    print(f"Status: {result.trace_tree.status}")
    print(f"Duration: {result.trace_tree.end_time - result.trace_tree.start_time:.3f}s")
    print(f"Children: {len(result.trace_tree.children)}")

# Access step history
for step in result.step_history:
    print(f"{step.name}: {'✅' if step.success else '❌'} ({step.latency_s:.3f}s)")

CLI Debugging Tools

Flujo provides powerful CLI tools for inspecting traces:

List All Runs

flujo lens list

Shows all pipeline runs with basic information: - Run ID - Pipeline name - Status - Start time - Duration

View Run Details

flujo lens show <run_id>

Shows detailed information about a specific run: - Pipeline configuration - Step results - Final output - Error details (if any)

View Hierarchical Trace

flujo lens trace <run_id>

Displays a tree-based view of the execution trace:

pipeline_root (completed, 1.234s)
├── step1 (completed, 0.123s)
├── loop1 (completed, 0.456s)
│   ├── loop_step (completed, 0.234s)
│   └── loop_step (completed, 0.222s)
├── conditional1 (completed, 0.345s)
│   └── high_branch (completed, 0.111s)
└── final_step (completed, 0.234s)

List Individual Spans

flujo lens spans <run_id>

Shows all spans with filtering options: - --status completed - Only show completed spans - --name step1 - Only show spans with specific name - --limit 10 - Limit number of results

View Statistics

flujo lens stats

Shows aggregated statistics: - Total runs - Success/failure rates - Average duration - Most common step types

Advanced Usage

Custom Pipeline with Tracing

from flujo import Pipeline, Step, Flujo
from flujo.domain.models import PipelineContext

async def simple_step(input_data: str, context: PipelineContext) -> str:
    return f"processed_{input_data}"

async def another_step(input_data: str, context: PipelineContext) -> str:
    return f"enhanced_{input_data}"

# Create pipeline
pipeline = Pipeline(steps=[
    Step.from_callable(simple_step, name="step1"),
    Step.from_callable(another_step, name="step2"),
])

# Run with tracing
flujo = Flujo(pipeline=pipeline, enable_tracing=True)
async for result in flujo.run_async("test_input"):
    pass

# Analyze trace
if result.trace_tree:
    print(f"Pipeline completed in {result.trace_tree.end_time - result.trace_tree.start_time:.3f}s")

    for child in result.trace_tree.children:
        print(f"  {child.name}: {child.status} ({child.end_time - child.start_time:.3f}s)")

Error Handling

The tracing system gracefully handles errors:

async def failing_step(input_data: str, context: PipelineContext) -> str:
    raise ValueError("Intentional failure")

pipeline = Pipeline(steps=[
    Step.from_callable(failing_step, name="failing_step"),
])

flujo = Flujo(pipeline=pipeline, enable_tracing=True)
async for result in flujo.run_async("test_input"):
    pass

# Even failed pipelines generate traces
if result.trace_tree:
    failed_step = None
    for child in result.trace_tree.children:
        if child.name == "failing_step":
            failed_step = child
            break

    if failed_step and failed_step.status == "failed":
        print(f"Step failed: {failed_step.attributes.get('feedback', 'Unknown error')}")

Performance Analysis

Use traces to identify performance bottlenecks:

# After running a pipeline
if result.trace_tree:
    # Find the slowest step
    slowest_step = max(result.step_history, key=lambda s: s.latency_s)
    print(f"Slowest step: {slowest_step.name} ({slowest_step.latency_s:.3f}s)")

    # Find failed steps
    failed_steps = [s for s in result.step_history if not s.success]
    if failed_steps:
        print(f"Failed steps: {[s.name for s in failed_steps]}")

Best Practices

1. Use Descriptive Step Names

# Good
Step.from_callable(process_data, name="data_processing")

# Avoid
Step.from_callable(process_data, name="step1")

2. Handle Trace Access Gracefully

if result.trace_tree:
    # Access trace information
    pass
else:
    # Tracing might be disabled or failed
    print("No trace information available")

3. Use CLI for Complex Analysis

For large pipelines, use the CLI tools instead of programmatic access:

# Find slow runs
flujo lens list | grep "failed"

# Analyze specific run
flujo lens trace <run_id> | grep "completed"

# Get statistics
flujo lens stats

4. Monitor Performance Overhead

Tracing adds minimal overhead (< 50% increase), but monitor in production:

import time

# Test without tracing
start = time.time()
flujo_no_trace = Flujo(pipeline=pipeline, enable_tracing=False)
# ... run pipeline
no_trace_time = time.time() - start

# Test with tracing
start = time.time()
flujo_with_trace = Flujo(pipeline=pipeline, enable_tracing=True)
# ... run pipeline
with_trace_time = time.time() - start

overhead = (with_trace_time / no_trace_time - 1) * 100
print(f"Tracing overhead: {overhead:.1f}%")

Troubleshooting

No Trace Information

If result.trace_tree is None: 1. Check that tracing is enabled: Flujo(pipeline, enable_tracing=True) 2. Verify the pipeline completed (even if it failed) 3. Check for any trace serialization errors in logs

CLI Commands Not Working

If CLI commands fail: 1. Ensure you have a SQLite database: flujo_ops.db 2. Check that the run_id exists: flujo lens list 3. Verify database permissions and connectivity

Performance Issues

If tracing causes performance problems: 1. Monitor overhead with the test above 2. Consider disabling tracing for high-frequency pipelines 3. Use filtering in CLI commands to reduce data transfer

Integration with Other Tools

SQLite Backend

Traces are automatically persisted to the SQLite backend: - spans table stores hierarchical trace data - runs table stores pipeline metadata - Audit logging tracks trace access

OTLP Export

For production environments, enable OTLP export:

export OTLP_EXPORT_ENABLED=true
export OTLP_ENDPOINT=https://your-otlp-endpoint

This sends traces to external observability platforms like: - OpenTelemetry Collector - Honeycomb - Datadog - Jaeger

Examples

See the following examples for practical usage: - examples/test_tracing_demo.py - Simple tracing demo - examples/fsd_12_tracing_demo.py - Comprehensive FSD-12 showcase - tests/integration/test_fsd_12_tracing_complete.py - Complete test suite