Skip to content

Telemetry and Observability

This guide explains how to use the built-in telemetry and observability features of flujo.

Overview

The orchestrator includes comprehensive telemetry for:

  • Performance monitoring
  • Usage tracking
  • Error reporting
  • Distributed tracing
  • Cost tracking

Quick Start

Enable basic telemetry with one line:

from flujo import init_telemetry

# Initialize with default settings
init_telemetry()

Configuration

Environment Variables

# Enable telemetry export (default: false)
TELEMETRY_EXPORT_ENABLED=false

# Enable OTLP export (default: false)
OTLP_EXPORT_ENABLED=false

# OTLP endpoint URL (optional)
OTLP_ENDPOINT=https://your-otlp-endpoint

# Optional: Logfire API key
LOGFIRE_API_KEY=your_key_here

Python Configuration

# Initialize with custom settings
init_telemetry(
    service_name="my-app",
    environment="production",
    version="1.0.0",
    sampling_rate=0.1,  # Sample 10% of requests
    export_telemetry=True,
    export_otlp=True,
    otlp_endpoint="https://your-otlp-endpoint"
)

Metrics

The orchestrator collects several key metrics:

Performance Metrics

  • Request latency
  • Token usage
  • Model response times
  • Pipeline step durations

Usage Metrics

  • Number of requests
  • Model usage by type
  • Success/failure rates
  • Cost per request

Quality Metrics

  • Checklist pass rates
  • Score distributions
  • Retry frequencies
  • Validation results

Tracing

Distributed Tracing

Enable OTLP export for distributed tracing:

import os
os.environ["OTLP_EXPORT_ENABLED"] = "true"
os.environ["OTLP_ENDPOINT"] = "https://your-otlp-endpoint"

# Initialize telemetry
init_telemetry()

Trace Attributes

Each trace includes:

  • Request ID
  • Pipeline configuration
  • Model information
  • Step details
  • Performance data
  • Error information

Logging

flujo uses logfire for structured logging and tracing. If logfire is not installed or telemetry export is disabled, flujo falls back to standard Python logging.

logfire Object

The logfire object (imported as from flujo.infra.telemetry import logfire) is the primary interface for logging and creating spans within flujo. Its behavior depends on whether logfire is successfully initialized:

  • When logfire is enabled: The logfire object will be the actual logfire library instance, providing full tracing and logging capabilities.
  • When logfire is disabled or not installed: The logfire object will be a mock implementation that redirects calls to standard Python logging. This ensures that your application continues to log messages even if the full telemetry setup is not active.

This allows you to use logfire.info(), logfire.error(), and logfire.span() consistently throughout your code, regardless of the telemetry configuration.

Log Levels

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# The orchestrator will log:
# - INFO: Normal operation
# - WARNING: Retries, fallbacks
# - ERROR: Failures, timeouts
# - DEBUG: Detailed tracing

Log Format

# Example log entry
{
    "timestamp": "2024-03-14T12:00:00Z",
    "level": "INFO",
    "request_id": "abc123",
    "event": "pipeline_start",
    "pipeline": "default",
    "models": ["gpt-4", "gpt-3.5-turbo"],
    "duration_ms": 1500
}

Cost Tracking

Cost Metrics

The orchestrator tracks:

  • Tokens used per model
  • Cost per request
  • Cost by pipeline step
  • Monthly usage

Cost Reports

from flujo import get_cost_report

# Get cost report
report = get_cost_report(
    start_date="2024-03-01",
    end_date="2024-03-14"
)

print(f"Total cost: ${report.total_cost}")
print("Cost by model:")
for model, cost in report.cost_by_model.items():
    print(f"- {model}: ${cost}")

Integration

OpenTelemetry

The orchestrator uses OpenTelemetry for tracing:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Set up OpenTelemetry
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Add OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint="https://your-otlp-endpoint")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

Prometheus

Export metrics to Prometheus:

from prometheus_client import start_http_server
from flujo import init_telemetry

# Start Prometheus metrics server
start_http_server(8000)

# Initialize telemetry
init_telemetry(export_prometheus=True)

Production Monitoring

Integrate Flujo's telemetry with your existing observability stack:

  • OpenTelemetry Export – Add OpenTelemetryHook to your Flujo runner to stream spans to any OTLP endpoint.
from flujo.telemetry import OpenTelemetryHook
runner = Flujo(pipeline, hooks=[OpenTelemetryHook(mode="otlp", endpoint="http://collector:4318")])
  • Prometheus Metrics – Expose aggregated run metrics for scraping.
from flujo.telemetry import start_prometheus_server
start_prometheus_server(8000, backend)

Best Practices

  1. Production Setup
  2. Enable telemetry in production
  3. Use appropriate sampling rates
  4. Configure secure endpoints
  5. Monitor costs

  6. Development

  7. Use debug logging
  8. Enable detailed tracing
  9. Monitor performance
  10. Track costs

  11. Security

  12. Secure API keys
  13. Use HTTPS endpoints
  14. Implement access control
  15. Monitor for anomalies

  16. Performance

  17. Use appropriate sampling
  18. Configure batch sizes
  19. Monitor resource usage
  20. Optimize exports

Troubleshooting

Common Issues

  1. Missing Metrics
  2. Verify telemetry is enabled
  3. Check export configuration
  4. Verify endpoint accessibility
  5. Check permissions

  6. High Latency

  7. Check network connectivity
  8. Verify endpoint performance
  9. Adjust batch sizes
  10. Monitor resource usage

  11. Cost Issues

  12. Review sampling rates
  13. Check model usage
  14. Monitor token usage
  15. Set up alerts

Getting Help

Next Steps