Tutorial: From Simple Orchestration to Custom AI Pipelines
Welcome! This tutorial will guide you through using the flujo
library, from your very first request to building advanced, custom AI workflows. We'll start with the basics and progressively build up to more complex examples.
Before You Begin:
* You should have a basic understanding of Python.
* Make sure you have set up your API keys (e.g., OPENAI_API_KEY
) in a .env
file in your project directory. The orchestrator will automatically find and use them.
Key Concepts: The Building Blocks
Before we write any code, let's understand the main components you'll be working with. Think of it like a chef learning about their ingredients before cooking.
-
AgenticLoop (Recipe): A dynamic planner-loop pattern. A planner agent decides which
AgentCommand
to execute next (run a tool agent, ask a human, run Python, or finish). This is the recommended starting point for explorative workflows. -
Flujo, Pipeline, Step: When you need more control than the standard
Default
workflow, you'll use the Pipeline DSL. APipeline
is a sequence ofStep
objects executed byFlujo
to build fully custom multi-agent workflows. -
A Candidate: This is the final result produced by the Default recipe. It contains the solution itself and the checklist used to grade it.
Now that we know the players, let's see them in action!
1. Your First AI Task: AgenticLoop
We'll begin with the make_agentic_loop_pipeline
pattern. A planner agent decides which tool agent to run and when to finish.
# 📂 step_1_agentic_loop.py
from flujo.recipes.factories import make_agentic_loop_pipeline, run_agentic_loop_pipeline
from flujo.infra.agents import make_agent_async
from flujo.infra import init_telemetry
from flujo.domain.commands import AgentCommand, FinishCommand, RunAgentCommand
init_telemetry()
async def search_agent(query: str) -> str:
return "Rainy days are peaceful" if "rain" in query else "No info"
PLANNER_PROMPT = """
You are a poet assistant. Use `search_agent` to gather inspiration.
When ready, reply with `FinishCommand` containing the final haiku.
"""
planner = make_agent_async(
"openai:gpt-4o",
PLANNER_PROMPT,
AgentCommand,
)
pipeline = make_agentic_loop_pipeline(
planner_agent=planner,
agent_registry={"search_agent": search_agent},
)
result = run_agentic_loop_pipeline(pipeline, "Write a short, optimistic haiku about a rainy day.")
for entry in result.final_pipeline_context.command_log:
print(entry)
2. Building Your First Custom Step with @step
The easiest way to extend a pipeline is to decorate an async function with @step
.
from flujo import step
@step
async def shout(text: str) -> str:
return text.upper()
shout
is now a typed Step[str, str]
that you can compose with others.
3. Composing Your First Custom Pipeline
from flujo import Flujo, step, PipelineResult, PipelineRegistry
@step
async def to_upper(text: str) -> str:
return text.upper()
@step
async def add_excitement(text: str) -> str:
return f"{text}!"
pipeline = to_upper >> add_excitement
registry = PipelineRegistry()
registry.register(pipeline, "demo", "1.0.0")
runner = Flujo(
registry=registry,
pipeline_name="demo",
pipeline_version="1.0.0",
)
result: PipelineResult[str] = runner.run(
"hello", initial_context_data={"run_id": "example"}
)
print(result.step_history[-1].output) # HELLO!
4. The Budget-Aware Workflow: Customizing Agents for Default
The Default
recipe is still handy for simple, fixed workflows. You can customize its agents to mix models for cost and quality.
Professional AI workflows often involve a mix of models to balance cost, speed, and quality. Here, we'll use a cheaper, faster model for the initial draft (solution agent) but retain the smarter models for the critical thinking roles (planning, quality control, and strategy).
# 📂 step_3_mixing_models.py
from flujo.recipes.factories import make_default_pipeline, run_default_pipeline
from flujo.infra.agents import make_agent_async
from flujo.infra import init_telemetry
from flujo.models import Task
from flujo.infra.agents import make_review_agent, make_validator_agent
init_telemetry()
print("🚀 Building a workflow with a custom Solution Agent for the Default recipe...")
FAST_SOLUTION_PROMPT = "You are a creative but junior marketing copywriter. Write a catchy and concise slogan. Be quick and creative."
fast_copywriter_agent = make_agent_async("openai:gpt-4o-mini", FAST_SOLUTION_PROMPT, str)
pipeline = make_default_pipeline(
review_agent=make_review_agent(),
solution_agent=fast_copywriter_agent,
validator_agent=make_validator_agent(),
)
task = Task(prompt="Write a slogan for a new brand of ultra-durable luxury coffee mugs.")
result = run_default_pipeline(pipeline, task)
# ... (printing logic)
# ... (printing logic)
5. Outputting Structured Data with a Custom Pipeline
So far, our agents have only outputted simple strings. What if we need structured data, like JSON? The underlying pydantic-ai
library excels at this. You can specify a Pydantic BaseModel
as the output_type
for an agent.
Let's build a workflow that extracts information from a block of text into a structured ContactCard
model.
# 📂 step_4_structured_output.py
from pydantic import BaseModel, Field
from flujo import Step, Flujo
from flujo.infra.agents import make_agent_async
from flujo.infra import init_telemetry
from flujo.models import Checklist
init_telemetry()
# 1. Define our desired output structure using a Pydantic model
class ContactCard(BaseModel):
name: str = Field(..., description="The full name of the person.")
email: str | None = Field(None, description="The person's email address.")
company: str | None = Field(None, description="The company they work for.")
# 2. Define Agents for our custom pipeline
print("🛠️ Creating a data-extraction agent...")
EXTRACTION_PROMPT = "You are a data-entry expert. Extract contact information from the user's text and format it precisely according to the ContactCard schema. If a field is not present, omit it."
extraction_agent = make_agent_async("openai:gpt-4o", EXTRACTION_PROMPT, ContactCard)
REVIEW_PROMPT_FOR_EXTRACTION = "Generate a checklist to verify the extracted contact details. Check for name correctness, email validity, and company presence."
review_agent_for_extraction = make_agent_async("openai:gpt-4o", REVIEW_PROMPT_FOR_EXTRACTION, Checklist)
VALIDATE_PROMPT_FOR_EXTRACTION = "You are a QA for data extraction. Use the checklist to verify the ContactCard."
validator_agent_for_extraction = make_agent_async("openai:gpt-4o", VALIDATE_PROMPT_FOR_EXTRACTION, Checklist)
# 3. Define the custom pipeline
data_extraction_pipeline = (
Step.review(review_agent_for_extraction, name="PlanExtraction")
>> Step.solution(extraction_agent, name="ExtractContactInfo")
>> Step.validate_step(validator_agent_for_extraction, name="ValidateCard")
)
registry.register(data_extraction_pipeline, "extract", "1.0.0")
pipeline_runner = Flujo(
registry=registry,
pipeline_name="extract",
pipeline_version="1.0.0",
)
unstructured_text = "Reach out to Jane Doe. She works at Innovate Corp and her email is jane.doe@example.com."
print(f"📄 Running custom pipeline to extract from: '{unstructured_text}'")
pipeline_result = pipeline_runner.run(unstructured_text)
if pipeline_result.step_history and pipeline_result.step_history[1].success:
contact_card_solution = pipeline_result.step_history[1].output
if isinstance(contact_card_solution, ContactCard):
print("\n✅ Successfully extracted structured data (ContactCard object):")
print(contact_card_solution.model_dump_json(indent=2))
else:
print(f"\n⚠️ Expected ContactCard, got: {type(contact_card_solution)}")
else:
print("\n❌ Custom pipeline failed to extract contact info.")
Expected Output:
✅ Successfully extracted structured data:
{
"name": "Jane Doe",
"email": "jane.doe@example.com",
"company": "Innovate Corp"
}
💡 Pro Tip: Beyond Basic Types An agent's
output_type
can bestr
,int
,float
, or any PydanticBaseModel
. This is incredibly powerful for forcing the LLM to return clean, validated JSON that you can immediately use in your application.
6. The Grand Finale: A Fully Custom Pipeline with Tools
Now for the ultimate challenge. Let's build a workflow where every agent is customized, and our solution_agent
can use external tools to get information it doesn't have.
Scenario: We need to write a factual report on a public company's stock price. The LLM doesn't know real-time stock prices, so it will need a tool.
- Custom Planner: A
review_agent
that knows what a good financial report looks like. - Tool-Using Doer: A
solution_agent
that can call aget_stock_price
function. - Custom Quality Analyst: A
validator_agent
that is hyper-critical about financial data.
# 📂 step_5_advanced_tools.py
import random
from pydantic import BaseModel
from pydantic_ai import Tool
from flujo import Flujo, Step, Pipeline
from flujo.testing import StubAgent
# --- 1. Define the Tool ---
# This is a fake stock price function for our example.
def get_stock_price(symbol: str) -> float:
"""Gets the current stock price for a given ticker symbol."""
print(f"TOOL USED: Getting stock price for {symbol}...")
# In a real app, this would make an API call. We'll fake it.
if symbol.upper() == "AAPL":
return round(random.uniform(150, 250), 2)
return round(random.uniform(50, 500), 2)
# --- 2. Create the Fully Custom Agent Team ---
print("👑 Assembling a fully custom, tool-using agent team...")
# The Planner: Focused on financial report quality
review_agent = make_agent_async("openai:gpt-4o",
"You are a financial analyst. Create a checklist for a brief, factual company report. Key items must include the company name, its stock symbol, the current price, and a concluding sentence.",
Checklist)
# The Doer: Equipped with the stock price tool
class Report(BaseModel):
company: str
symbol: str
current_price: float
summary: str
# To use tools, we wrap them in a Tool object. The name of the tool
# must match the function name.
stock_tool = Tool(get_stock_price)
solution_agent = make_agent_async("openai:gpt-4o-mini", # Cheaper model for this
"You are a junior analyst. Write a one-paragraph report on the requested company. Use the provided tools to get live data. Your final output must be a structured Report.",
Report,
# The magic happens here: we give the agent its tools.
tools=[stock_tool])
# The Quality Analyst: Hyper-critical of data
validator_agent = make_agent_async("openai:gpt-4o",
"You are a senior auditor. Meticulously check the report against the checklist. Be extremely strict about factual data. If the price is a placeholder, fail it.",
Checklist)
# --- 3. Assemble and Run the Default Recipe ---
pipeline = make_default_pipeline(
review_agent=review_agent,
solution_agent=solution_agent,
validator_agent=validator_agent,
)
task = Task(prompt="Generate a stock report for Apple Inc. (AAPL).")
print("🧠 Running advanced tool-based workflow...")
best_candidate = run_default_pipeline(pipeline, task)
if best_candidate:
print("\n🎉 Advanced workflow complete!")
print(best_candidate.solution.model_dump_json(indent=2))
What You'll See:
During the execution, you will see a message from our tool function:
TOOL USED: Getting stock price for AAPL...
This confirms that the solution_agent
recognized it needed information, called the function you provided, and used the result in its answer. The final output will be a perfectly structured report with the "live" data.
This concludes our tour! You've journeyed from a simple prompt to a sophisticated, tool-using AI system. You've learned to: - Understand the core concepts of Default recipes and Agents. - Run a basic multi-agent task and interpret its self-correction process. - Control the definition of quality using weighted scoring. - Optimize workflows by mixing different AI models. - Generate clean, structured JSON using Pydantic models. - Empower agents with external tools to overcome their knowledge limitations.
7. Building Custom Pipelines
The new Pipeline DSL lets you compose your own workflow using Step
objects. Execute the pipeline with Flujo
:
from flujo import Step, Flujo, PipelineRegistry
from flujo.plugins.sql_validator import SQLSyntaxValidator
from flujo.testing.utils import StubAgent
registry = PipelineRegistry()
sql_step = Step.solution(StubAgent(["SELECT FROM"]))
check_step = Step.validate_step(StubAgent([None]), plugins=[SQLSyntaxValidator()])
pipeline_example = sql_step >> check_step
registry.register(pipeline_example, "sql", "1.0.0")
runner = Flujo(
registry=registry,
pipeline_name="sql",
pipeline_version="1.0.0",
)
result = runner.run("SELECT FROM")
print(result.step_history[-1].feedback)
Using a Shared Typed Context
Flujo
can share a Pydantic model instance across steps. This lets you
accumulate data or pass configuration during a run. See
Typed Pipeline Context for more details.
from flujo.models import PipelineContext
class Stats(PipelineContext):
calls: int = 0
@step
async def record(data: str, *, context: Stats | None = None) -> str:
if context:
context.calls += 1
return data
pipeline = record >> record
registry = PipelineRegistry()
registry.register(pipeline, "ctx", "1.0.0")
runner = Flujo(
registry=registry,
pipeline_name="ctx",
pipeline_version="1.0.0",
context_model=Stats,
)
final = runner.run("hi", initial_context_data={"run_id": "ctx-run"})
print(final.final_pipeline_context.calls) # 2
Iterative Loops with LoopStep
Some workflows require repeating a set of steps until a condition is met. LoopStep
lets you express this directly in the DSL.
from flujo import Step, Flujo, Pipeline, step
@step
async def fixer(data: str) -> str:
return data + "!"
body = Pipeline.from_step(fixer)
loop = Step.loop_until(
name="add_exclamation",
loop_body_pipeline=body,
exit_condition_callable=lambda out, ctx: out.endswith("!!!"),
max_loops=3,
)
registry = PipelineRegistry()
registry.register(loop, "loop", "1.0.0")
runner = Flujo(
registry=registry,
pipeline_name="loop",
pipeline_version="1.0.0",
)
result = runner.run("hi", initial_context_data={"run_id": "loop-run"})
print(result.step_history[-1].output) # 'hi!!!'
Conditional Branching with ConditionalStep
Sometimes a pipeline should take different actions depending on earlier results. ConditionalStep
lets you define that logic declaratively.
def choose(out, ctx):
return "positive" if "!" in out else "neutral"
branches = {
"positive": Pipeline.from_step(
step(name="yay")(lambda x: x + " 😊")
),
"neutral": Pipeline.from_step(
step(name="meh")(lambda x: x)
),
}
branch = Step.branch_on(
name="sentiment_router",
condition_callable=choose,
branches=branches,
)
pipeline = fixer >> branch
registry = PipelineRegistry()
registry.register(pipeline, "branch", "1.0.0")
runner = Flujo(
registry=registry,
pipeline_name="branch",
pipeline_version="1.0.0",
)
print(
runner.run("ok", initial_context_data={"run_id": "branch-run"}).step_history[-1].output
)
You're now ready to build powerful and intelligent AI applications. Happy orchestrating