Cookbook: Running Budget-Aware Workflows with the Usage Governor
The Problem
Complex AI pipelines with loops, parallel execution, or iterative refinement can quickly spiral out of control in terms of cost and token usage. When you're running production workloads, you need confidence that your pipelines won't create runaway processes that exceed your budget or hit API rate limits.
Traditional approaches often require manual monitoring or external systems, but flujo
provides a built-in Usage Governor that automatically enforces cost and token limits across all control flow primitives.
The Solution
The UsageGovernor
is a first-class feature in flujo
that works seamlessly with all pipeline constructs, including LoopStep
and ParallelStep
. It provides:
- Automatic cost tracking across all steps and iterations
- Immediate halting when limits are breached
- Task cancellation of parallel branches to save resources
- Detailed error reporting with complete execution history
Core Concepts
UsageLimits Model
The UsageLimits
model defines your budget constraints:
from flujo.domain import UsageLimits
# Set cost and token limits
limits = UsageLimits(
total_cost_usd_limit=0.50, # Maximum $0.50 spent
total_tokens_limit=1000 # Maximum 1000 tokens used
)
UsageLimitExceededError
When limits are breached, flujo
raises this exception with complete execution context:
from flujo.exceptions import UsageLimitExceededError
try:
result = await runner.run(input_data)
except UsageLimitExceededError as e:
print(f"Pipeline halted: {e}")
print(f"Total cost: ${e.result.total_cost_usd}")
print(f"Steps completed: {len(e.result.step_history)}")
Example 1: Halting a Runaway Loop
Loops are particularly dangerous for cost control. The UsageGovernor
checks limits after each iteration, allowing you to safely use Step.loop_until
without fear of infinite loops.
import asyncio
from pydantic import BaseModel
from flujo import Flujo, Step, Pipeline, UsageLimits, UsageLimitExceededError
class MockAgentOutput(BaseModel):
"""An agent output that includes cost and token metrics."""
value: int
cost_usd: float = 0.1
token_counts: int = 100
class FixedMetricAgent:
"""An agent that returns a fixed cost and token count on each call."""
async def run(self, data: int | MockAgentOutput) -> MockAgentOutput:
val = data.value if isinstance(data, MockAgentOutput) else data
return MockAgentOutput(value=val + 1)
# Create a simple pipeline that increments a value
metric_pipeline = Pipeline.from_step(
Step.model_validate({"name": "metric_step", "agent": FixedMetricAgent()})
)
# Create a loop that will be halted by the governor
loop_step = Step.loop_until(
name="governed_loop",
loop_body_pipeline=metric_pipeline,
exit_condition_callable=lambda out, ctx: out.value >= 10, # Could exit naturally, but governor likely halts first
max_loops=20, # High max to ensure governor triggers first
)
# Set a cost limit that will be breached after a few iterations
limits = UsageLimits(total_cost_usd_limit=0.25, total_tokens_limit=None)
runner = Flujo(loop_step, usage_limits=limits)
try:
print("Running loop with cost limit...")
result = await runner.run(0)
print("Loop completed successfully!")
except UsageLimitExceededError as e:
print(f"\n✅ Loop halted by governor as expected!")
print(f" Error: {e}")
print(f" Iterations completed: {e.result.step_history[0].attempts}")
if hasattr(e.result.step_history[0].output, 'value'):
print(f" Final value: {e.result.step_history[0].output.value}")
else:
print(f" Final value: None (loop halted mid-iteration)")
print(f" Total cost: ${e.result.total_cost_usd:.2f}")
Expected Output:
Running loop with cost limit...
✅ Loop halted by governor as expected!
Error: Cost limit of $0.25 exceeded
Iterations completed: 3
Final value: None (loop halted mid-iteration)
Total cost: $0.30
How the Loop Governor Works
- Per-Iteration Checking: After each loop iteration, the governor checks the cumulative cost
- Immediate Halting: When the limit is breached, the loop stops immediately, even mid-iteration
- Complete Context: The exception contains the full execution history, including how many iterations completed
- Safe Output Access: The code safely handles cases where the output might not be available due to mid-iteration halting
Example 2: Proactive Cancellation in Parallel Steps
The UsageGovernor
provides efficient optimization for parallel execution. When one branch breaches the limit, it cancels other in-flight branches to save time and resources.
import asyncio
import time
from pydantic import BaseModel
from flujo import Flujo, Step, UsageLimits, UsageLimitExceededError
class CostlyAgent:
"""An agent that reports high cost and takes time to simulate expensive operations."""
def __init__(self, cost: float = 0.1, tokens: int = 100, delay: float = 0.1):
self.cost = cost
self.tokens = tokens
self.delay = delay
async def run(self, data: any) -> any:
await asyncio.sleep(self.delay) # Simulate expensive operation
class Output(BaseModel):
value: any
cost_usd: float = self.cost
token_counts: int = self.tokens
return Output(value=data)
# Create branches with different costs and execution times
branches = {
"fast_expensive": Step.model_validate({
"name": "fast_expensive",
"agent": CostlyAgent(cost=0.15, delay=0.05) # Breaches limit quickly
}),
"slow_cheap": Step.model_validate({
"name": "slow_cheap",
"agent": CostlyAgent(cost=0.01, delay=0.5) # Takes longer but is cheap
}),
}
parallel = Step.parallel("parallel_cancellation", branches)
limits = UsageLimits(total_cost_usd_limit=0.10) # Limit that will be breached by fast_expensive
runner = Flujo(parallel, usage_limits=limits)
start_time = time.monotonic()
try:
print("Running parallel steps with cost limit...")
result = await runner.run("input")
print("Parallel execution completed!")
except UsageLimitExceededError as e:
execution_time = time.monotonic() - start_time
print(f"\n✅ Parallel execution halted by governor!")
print(f" Error: {e}")
print(f" Execution time: {execution_time:.2f}s")
print(f" Total cost: ${e.result.total_cost_usd:.2f}")
print(f" Note: slow_cheap branch was cancelled")
Expected Output:
Running parallel steps with cost limit...
✅ Parallel execution halted by governor!
Error: Cost limit of $0.1 exceeded
Execution time: 0.06s
Total cost: $0.15
Note: slow_cheap branch was cancelled
How Parallel Cancellation Works
- Concurrent Execution: Both branches start executing simultaneously
- Fast Branch Breaches: The
fast_expensive
branch completes quickly and breaches the limit - Task Cancellation: The governor cancels the
slow_cheap
branch that was still running - Time Savings: Execution completes quickly instead of waiting for the slow branch
This optimization is valuable in production scenarios where you might have expensive API calls running in parallel.
Example 3: Complex Nested Workflows
The UsageGovernor
works seamlessly with complex nested structures, providing protection at every level.
from flujo import Flujo, Step, Pipeline, UsageLimits, UsageLimitExceededError
# Create a nested workflow: loop containing parallel steps
inner_branches = {
"a": Step.model_validate({"name": "a", "agent": FixedMetricAgent()}),
"b": Step.model_validate({"name": "b", "agent": FixedMetricAgent()}),
}
inner_parallel = Step.parallel("inner_parallel", inner_branches)
outer_loop = Step.loop_until(
name="outer_loop",
loop_body_pipeline=Pipeline.from_step(inner_parallel),
exit_condition_callable=lambda _out, _ctx: False, # Never exit naturally
iteration_input_mapper=lambda _out, _ctx, _i: 0,
max_loops=10,
)
limits = UsageLimits(total_cost_usd_limit=0.5)
runner = Flujo(outer_loop, usage_limits=limits)
try:
print("Running complex nested workflow...")
result = await runner.run(0)
print("Nested workflow completed!")
except UsageLimitExceededError as e:
print(f"\n✅ Complex workflow halted by governor!")
print(f" Error: {e}")
print(f" Loop iterations: {e.result.step_history[0].attempts}")
print(f" Total cost: ${e.result.total_cost_usd:.2f}")
Best Practices
1. Always Set Limits for Production
# Good: Always set limits for production pipelines
limits = UsageLimits(
total_cost_usd_limit=1.00, # $1.00 budget
total_tokens_limit=5000 # 5000 token limit
)
runner = Flujo(pipeline, usage_limits=limits)
# Bad: No limits can lead to runaway costs
runner = Flujo(pipeline) # No protection!
2. Use Conservative Limits
# Start with conservative limits and adjust based on monitoring
limits = UsageLimits(
total_cost_usd_limit=0.10, # Start small
total_tokens_limit=1000 # Conservative token limit
)
3. Handle Exceptions Gracefully
try:
result = await runner.run(input_data)
# Process successful result
except UsageLimitExceededError as e:
# Log the breach for monitoring
logger.warning(f"Pipeline halted due to limit breach: {e}")
# Optionally retry with different parameters
# Or fall back to a simpler pipeline
result = await fallback_runner.run(input_data)
4. Monitor and Adjust
# Use the result data to understand usage patterns
except UsageLimitExceededError as e:
result = e.result
print(f"Pipeline used ${result.total_cost_usd} in {len(result.step_history)} steps")
# Adjust limits based on actual usage
# Consider implementing dynamic limits based on step complexity
Summary
The UsageGovernor
transforms cost control from a manual, error-prone process into an automatic, reliable safety net. By integrating seamlessly with all flujo
control flow primitives, it enables you to:
- Run complex pipelines confidently without fear of runaway costs
- Optimize resource usage through task cancellation
- Maintain predictable budgets in production environments
- Scale safely with automatic protection at every level
This makes flujo
uniquely suited for production AI workloads where cost predictability is critical.