Usage
Copy .env.example
to .env
and add your API keys before running the CLI.
Environment variables are loaded automatically from this file.
CLI
flujo solve "Write a summary of this document."
flujo show-config
flujo bench "hi" --rounds 3
flujo explain path/to/pipeline.py
flujo add-eval-case -d my_evals.py -n new_case -i "input"
flujo --profile
Debugging and Observability
Flujo provides rich tracing and debugging capabilities:
# List all pipeline runs
flujo lens list
# Show detailed information about a specific run
flujo lens show <run_id>
# View hierarchical execution trace
flujo lens trace <run_id>
# List individual spans with filtering
flujo lens spans <run_id> --status completed
# Show aggregated span statistics
flujo lens stats
Use flujo improve --improvement-model MODEL
to override the model powering the
self-improvement agent when generating suggestions.
flujo bench
depends on numpy
. Install with the optional [bench]
extra:
pip install flujo[bench]
API
Note: The class-based
Default
recipe is deprecated. Use the newmake_default_pipeline
factory function for full transparency, composability, and future YAML/AI support.
from flujo.recipes.factories import make_default_pipeline, run_default_pipeline
from flujo.infra.agents import make_review_agent, make_solution_agent, make_validator_agent
from flujo import (
Flujo,
Task,
)
from flujo.infra import init_telemetry
# Initialize telemetry (optional)
init_telemetry()
# Create the default pipeline using the factory
pipeline = make_default_pipeline(
review_agent=make_review_agent(),
solution_agent=make_solution_agent(),
validator_agent=make_validator_agent(),
)
# Run the pipeline with tracing enabled
result = await run_default_pipeline(pipeline, Task(prompt="Write a poem."))
print(result)
# Access trace information
if result.trace_tree:
print(f"Pipeline trace: {result.trace_tree.name}")
print(f"Status: {result.trace_tree.status}")
print(f"Duration: {result.trace_tree.end_time - result.trace_tree.start_time:.3f}s")
The make_default_pipeline
factory creates a Review → Solution → Validate pipeline. It does
not include a reflection step by default, but you can pass a
reflection_agent
to enable one. For fully custom workflows or more complex
reflection logic, use the Step
API with the Flujo
engine.
Call init_telemetry()
once at startup to configure logging and tracing for your application.
Pipeline DSL with Tracing
You can define custom workflows using the Step
class and execute them with Flujo
:
from flujo import Step, Flujo
from flujo.plugins.sql_validator import SQLSyntaxValidator
from flujo.testing.utils import StubAgent
solution_step = Step.solution(StubAgent(["SELECT FROM"]))
validate_step = Step.validate(StubAgent([None]), plugins=[SQLSyntaxValidator()])
pipeline = solution_step >> validate_step
# Run with tracing enabled
flujo = Flujo(pipeline, enable_tracing=True)
result = flujo.run("SELECT FROM")
# Analyze execution trace
if result.trace_tree:
print(f"Steps executed: {len(result.step_history)}")
for step in result.step_history:
print(f" {step.name}: {'✅' if step.success else '❌'} ({step.latency_s:.3f}s)")
Environment Variables
OPENAI_API_KEY
(optional for OpenAI models)GOOGLE_API_KEY
(optional for Gemini models)ANTHROPIC_API_KEY
(optional for Claude models)LOGFIRE_API_KEY
(optional)REFLECTION_ENABLED
(default: true)REWARD_ENABLED
(default: true) — toggles the reward model scorer on/offMAX_ITERS
,K_VARIANTS
TELEMETRY_EXPORT_ENABLED
(default: false)OTLP_EXPORT_ENABLED
(default: false)OTLP_ENDPOINT
(optional, e.g. https://otlp.example.com)
Cost Tracking and Usage Limits
Flujo provides integrated cost and token usage tracking to help you monitor and control spending on LLM operations.
Basic Cost Tracking
Once you've configured pricing in your flujo.toml
file, cost tracking is automatically enabled:
from flujo import Step, Flujo
from flujo.infra.agents import make_agent_async
# Create agents
solution_agent = make_agent_async("openai:gpt-4o", "You are a helpful assistant.", str)
validator_agent = make_agent_async("openai:gpt-4o", "You are a validator.", str)
# Create pipeline
pipeline = Step.solution(solution_agent) >> Step.validate(validator_agent)
runner = Flujo(pipeline)
# Run and access cost information
result = runner.run("Write a short story about a robot.")
# Print cost details for each step
total_cost = 0
total_tokens = 0
for step_result in result.step_history:
cost = step_result.cost_usd
tokens = step_result.token_counts
total_cost += cost
total_tokens += tokens
print(f"{step_result.name}:")
print(f" Cost: ${cost:.4f}")
print(f" Tokens: {tokens}")
print(f" Success: {step_result.success}")
print(f"\nTotal cost: ${total_cost:.4f}")
print(f"Total tokens: {total_tokens}")
Setting Usage Limits
Prevent excessive spending by setting cost and token limits:
from flujo import Flujo, Step, UsageLimits
# Define limits
limits = UsageLimits(
total_cost_usd_limit=0.50, # Maximum $0.50 total cost
total_tokens_limit=2000 # Maximum 2,000 tokens
)
# Apply limits to pipeline
runner = Flujo(pipeline, usage_limits=limits)
try:
result = runner.run("Write a comprehensive analysis.")
print("Pipeline completed successfully!")
except UsageLimitExceededError as e:
print(f"Pipeline stopped due to usage limits: {e}")
# Access partial results
partial_result = e.partial_result
print(f"Completed {len(partial_result.step_history)} steps before stopping")
Step-Level Limits
Set limits on individual steps for fine-grained control:
from flujo import Step, UsageLimits
# Set limits for specific steps
solution_limits = UsageLimits(
total_cost_usd_limit=0.20, # Maximum $0.20 for solution step
total_tokens_limit=800 # Maximum 800 tokens for solution step
)
validation_limits = UsageLimits(
total_cost_usd_limit=0.10, # Maximum $0.10 for validation step
total_tokens_limit=400 # Maximum 400 tokens for validation step
)
pipeline = (
Step.solution(solution_agent, usage_limits=solution_limits)
>> Step.validate(validator_agent, usage_limits=validation_limits)
)
Parallel Execution with Limits
When using parallel steps, Flujo can cancel sibling branches when limits are exceeded:
from flujo import Step, Pipeline, UsageLimits
# Create parallel branches
expensive_branch = Pipeline.from_step(Step("expensive", costly_agent))
cheap_branch = Pipeline.from_step(Step("cheap", cheap_agent))
parallel = Step.parallel_branch(expensive_branch, cheap_branch)
# If expensive_branch exceeds limits, cheap_branch will be cancelled
limits = UsageLimits(total_cost_usd_limit=0.10)
runner = Flujo(parallel, usage_limits=limits)
try:
result = runner.run("Process this data.")
except UsageLimitExceededError as e:
print("One or more branches exceeded limits")
Monitoring Costs in Production
Log cost information for analysis and monitoring:
import logging
from flujo import Flujo, Step
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_costs(result):
"""Log cost information for monitoring."""
total_cost = sum(step.cost_usd for step in result.step_history)
total_tokens = sum(step.token_counts for step in result.step_history)
logger.info(f"Pipeline completed - Cost: ${total_cost:.4f}, Tokens: {total_tokens}")
# Log per-step details
for step in result.step_history:
logger.info(f" {step.name}: ${step.cost_usd:.4f} ({step.token_counts} tokens)")
# Run pipeline with cost logging
pipeline = Step.solution(my_agent) >> Step.validate(validator_agent)
runner = Flujo(pipeline)
result = runner.run("Your prompt")
log_costs(result)
Cost-Effective Pipeline Design
Design pipelines with cost efficiency in mind:
from flujo import Step, Flujo, UsageLimits
# Use cheaper models for simple tasks
simple_agent = make_agent_async("openai:gpt-3.5-turbo", "Simple task agent.", str)
complex_agent = make_agent_async("openai:gpt-4o", "Complex task agent.", str)
# Design pipeline with cost considerations
pipeline = (
Step.solution(simple_agent, usage_limits=UsageLimits(total_cost_usd_limit=0.05))
>> Step.validate(complex_agent, usage_limits=UsageLimits(total_cost_usd_limit=0.15))
)
# Set overall pipeline limits
runner = Flujo(pipeline, usage_limits=UsageLimits(total_cost_usd_limit=0.25))
Troubleshooting Cost Tracking
If cost tracking isn't working as expected:
-
Check your
flujo.toml
configuration:[cost.providers.openai.gpt-4o] prompt_tokens_per_1k = 0.005 completion_tokens_per_1k = 0.015
-
Enable debug logging:
import logging logging.getLogger("flujo.cost").setLevel(logging.DEBUG)
-
Verify agent usage information:
# Check if your agent returns usage information result = my_agent.run("test") if hasattr(result, 'usage'): usage = result.usage() print(f"Prompt tokens: {usage.prompt_tokens}") print(f"Completion tokens: {usage.completion_tokens}")
For more detailed configuration information, see the Configuration Guide.
OTLP Exporter (Tracing/Telemetry)
If you want to export traces to an OTLP-compatible backend (such as OpenTelemetry Collector, Honeycomb, or Datadog), set the following environment variables:
OTLP_EXPORT_ENABLED=true
— Enable OTLP trace exportingOTLP_ENDPOINT=https://your-otlp-endpoint
— (Optional) Custom OTLP endpoint URL
When enabled, the orchestrator will send traces using the OTLP HTTP exporter. This is useful for distributed tracing and observability in production environments.
Scoring Utilities
Functions like ratio_score
and weighted_score
are available for custom workflows.
The default orchestrator always returns a score of 1.0
.
Reflection
Add a reflection step by composing your own pipeline with Step
and running it with Flujo
.
Running Custom Pipelines from the CLI: flujo run
The flujo run
command lets you execute any custom pipeline directly from the command line—no need to write a if __name__ == "__main__":
script. This makes rapid iteration and testing of your workflows much easier.
Basic Usage
flujo run my_pipeline.py --input "Hello world" --context-model MyContext
my_pipeline.py
should define a top-level variable (default:pipeline
) of typePipeline
.--input
provides the initial input to the pipeline.--context-model
(optional) specifies the name of a context model class defined in the file.
Passing Context Data
You can pass initial context data as a JSON string or from a file (JSON or YAML):
flujo run my_pipeline.py --input "Prompt" --context-model MyContext --context-data '{"counter": 5}'
flujo run my_pipeline.py --input "Prompt" --context-model MyContext --context-file context.json
flujo run my_pipeline.py --input "Prompt" --context-model MyContext --context-file context.yaml
Customizing the Pipeline Variable Name
If your pipeline variable is not named pipeline
, use --pipeline-name
:
flujo run my_pipeline.py --input "Prompt" --pipeline-name my_custom_pipeline
Output
By default, the CLI prints a summary table and the final context. For machine-readable output, use --json
:
flujo run my_pipeline.py --input "Prompt" --context-model MyContext --json
Example Pipeline File
from flujo import step, Pipeline
from flujo.models import PipelineContext
from pydantic import Field
class MyContext(PipelineContext):
counter: int = Field(default=0)
@step
async def inc(data: str, *, context: MyContext | None = None) -> str:
if context:
context.counter += 1
return data.upper()
pipeline = inc >> inc
Example Context File (YAML)
counter: 5
Example Command
flujo run my_pipeline.py --input "hello" --context-model MyContext --context-file context.yaml
Why Use flujo run
?
- No boilerplate needed for quick experiments.
- Test and debug pipelines interactively.
- Pass context and input flexibly.
- Integrates with the full DSL and context system.
See also: Pipeline DSL Guide, Typed Pipeline Context
Full CLI Demo Example
Below is a complete example pipeline file you can run directly with the CLI:
from flujo import step, Pipeline
from flujo.models import PipelineContext
from pydantic import Field
class DemoContext(PipelineContext):
counter: int = Field(default=0)
log: list[str] = Field(default_factory=list)
@step
async def greet(data: str, *, context: DemoContext | None = None) -> str:
msg = f"Hello, {data}!"
if context:
context.counter += 1
context.log.append(msg)
return msg
@step
async def emphasize(data: str, *, context: DemoContext | None = None) -> str:
msg = data.upper() + "!!!"
if context:
context.counter += 1
context.log.append(msg)
return msg
@step
async def summarize(data: str, *, context: DemoContext | None = None) -> str:
summary = f"Summary: {data} (steps: {context.counter if context else 0})"
if context:
context.counter += 1
context.log.append(summary)
return summary
pipeline = greet >> emphasize >> summarize
You can run this file with:
flujo run examples/10_cli_run_demo.py --input "quickstart" --context-model DemoContext
Or with context data:
flujo run examples/10_cli_run_demo.py --input "with context" --context-model DemoContext --context-data '{"counter": 10}'