Skip to content

Core Concepts

This guide explains the fundamental concepts that power flujo. Understanding these concepts will help you build more effective AI workflows.

AgenticLoop

AgenticLoop is the primary pattern for building explorative agent workflows. A planner agent emits an AgentCommand on each turn—run a tool agent, execute Python, ask a human, or finish. The loop executes the command and records it in the PipelineContext.

from flujo.recipes.factories import make_agentic_loop_pipeline
from flujo.infra.agents import make_agent_async
from flujo.domain.commands import AgentCommand
planner = make_agent_async(
    "openai:gpt-4o",
    "Plan the next command and finish when done.",
    AgentCommand,
)
pipeline = make_agentic_loop_pipeline(
    planner_agent=planner,
    agent_registry={},
)

The Default Recipe (Simplified)

The Default recipe is a convenient helper that runs a fixed Review → Solution → Validate workflow. It's useful when you want a quick, opinionated pipeline without planning logic. Under the hood it uses the same Pipeline DSL described later.

from flujo.recipes.factories import make_default_pipeline
from flujo.infra.agents import (
    make_review_agent,
    make_solution_agent,
    make_validator_agent,
)

pipeline = make_default_pipeline(
    review_agent=make_review_agent(),
    solution_agent=make_solution_agent(),
    validator_agent=make_validator_agent(),
)

Agents

Agents are specialized AI models with specific roles. Each agent has:

  • A system prompt that defines its role
  • An output type (string, Pydantic model, etc.)
  • Optional tools for external interactions

Default Agents

The library provides four default agents:

  1. Review Agent (review_agent)
  2. Role: Creates a quality checklist
  3. Output: Checklist model
  4. Purpose: Defines what "good" looks like

  5. Solution Agent (solution_agent)

  6. Role: Generates the actual solution
  7. Output: String or custom model
  8. Purpose: Does the main work

  9. Validator Agent (validator_agent)

  10. Role: Evaluates the solution
  11. Output: Checklist model
  12. Purpose: Quality control

  13. Reflection Agent (reflection_agent)

  14. Role: Provides improvement suggestions and meta-analysis
  15. Output: String
  16. Purpose: Self-improvement and iteration guidance

Creating Custom Agents

from flujo.infra.agents import make_agent_async

custom_agent = make_agent_async(
    "openai:gpt-4",  # Model
    "You are a Python expert.",  # System prompt
    str  # Output type
)

Tasks

A Task represents a single request to the orchestrator. It contains:

  • The prompt (what you want to achieve)
  • Optional metadata for additional context
from flujo import Task

task = Task(
    prompt="Write a function to calculate prime numbers",
    metadata={"language": "python", "complexity": "medium"}
)

Candidates

A Candidate is a potential solution produced by the orchestrator. It includes:

  • The solution itself
  • A quality score (0.0 to 1.0)
  • A quality checklist evaluation
result = orch.run_sync(task)
if result:  # result is a Candidate
    print(f"Solution: {result.solution}")
    print(f"Quality Score: {result.score}")
    if result.checklist:
        print("Checklist:")
        for item in result.checklist.items:
            print(f"- {item.description}: {'✅' if item.passed else '❌'}")

The Pipeline DSL

The Pipeline Domain-Specific Language (DSL), using Step objects and executed by Flujo, is the primary way to create flexible and custom multi-agent workflows. This gives you full control over the sequence of operations, the agents used at each stage, and the integration of plugins.

Flujo can also maintain a shared, typed context object for each run. Steps declare a context parameter to access or modify this object. See Typed Pipeline Context for full documentation.

The Pipeline Context: Built-in Memory

Every run automatically receives a PipelineContext instance. It includes a run_id, the initial prompt, a mutable scratchpad dictionary and a record of all human interactions (hitl_history). This allows agents to share state without additional setup.

The built-in Default recipe uses this DSL under the hood. When you need different logic, you can use the same tools directly through the Flujo engine. The DSL also supports advanced constructs like LoopStep for iteration and ConditionalStep for branching workflows.

from flujo import Step, Flujo
from flujo.infra.agents import make_review_agent, make_solution_agent, make_validator_agent

# Define a pipeline
pipeline = (
    Step.review(make_review_agent())
    >> Step.solution(make_solution_agent())
    >> Step.validate(make_validator_agent())
)

# Run it
runner = Flujo(pipeline)
pipeline_result = runner.run("Your prompt here")
for step_res in pipeline_result.step_history:
    print(f"Step: {step_res.name}, Success: {step_res.success}")

Step Types

  1. Review Steps
  2. Create quality checklists
  3. Define success criteria

  4. Solution Steps

  5. Generate the main output
  6. Can use tools and external services

  7. Validation Steps

  8. Verify the solution
  9. Apply custom validation rules using plugins

  10. Custom Steps

  11. Any agent can be used in a step
  12. Flexible configuration and tool integration

Advanced Pipeline Constructs

Loop Steps

For iterative refinement:

from flujo import Step, Pipeline

loop_step = Step.loop_until(
    name="refinement_loop",
    loop_body_pipeline=Pipeline.from_step(Step.solution(make_solution_agent())),
    exit_condition_callable=lambda output, context: "done" in output.lower(),
)

Conditional Steps

For branching logic:

router_step = Step.branch_on(
    name="content_router",
    condition_callable=lambda output, context: "code" if "function" in output else "text",
    branches={
        "code": Pipeline.from_step(Step.solution(code_agent)),
        "text": Pipeline.from_step(Step.solution(text_agent)),
    },
)

Human-in-the-Loop Steps

Use Step.human_in_the_loop() to pause execution and wait for structured human input. The step optionally validates the response with a Pydantic model and all interactions are saved to the PipelineContext.

Composing Workflows with .as_step()

High-level runners like the pipeline created with make_agentic_loop_pipeline() or even another Flujo instance can be embedded into a larger pipeline. Call .as_step() on the configured runner to obtain a Step object:

from flujo.recipes.factories import make_agentic_loop_pipeline

loop_pipeline = make_agentic_loop_pipeline(planner_agent=planner, agent_registry=tools)

pipeline = (
    loop_pipeline.as_step(name="discover") >>
    Step.mapper(
        lambda r: r.final_pipeline_context.command_log[-1].execution_result,
        name="extract",
    )
)

This enables a "pipeline of pipelines" pattern where complex sub-workflows remain modular yet easy to chain together.

Scoring

The orchestrator uses scoring to evaluate and select the best solution. Scoring strategies include:

  • Ratio-based: Simple pass/fail ratio from checklist items
  • Weighted: Different criteria have different importance
  • Model-based: Using an AI model to evaluate quality
from flujo.domain.scoring import ratio_score, weighted_score

# Simple ratio scoring (default)
score = ratio_score(checklist)

# Weighted scoring with custom weights
weights = {
    "correctness": 0.5,
    "readability": 0.3,
    "efficiency": 0.2
}
score = weighted_score(checklist, weights)

Tools

Tools allow agents to interact with external systems. They can:

  • Fetch data from APIs
  • Execute code
  • Interact with databases
  • Call other services
from pydantic_ai import Tool

def get_weather(city: str) -> str:
    """Get current weather for a city."""
    # Implementation here
    return f"Weather in {city}: Sunny"

# Create a tool
weather_tool = Tool(get_weather)

# Give it to an agent
agent = make_agent_async(
    "openai:gpt-4",
    "You are a weather assistant.",
    str,
    tools=[weather_tool]
)

Plugins

Plugins extend pipeline functionality, particularly for validation:

from flujo.domain import ValidationPlugin, PluginOutcome
from flujo.plugins import SQLSyntaxValidator

# Use built-in SQL validator
sql_validator = SQLSyntaxValidator()

# Create custom plugin
class CustomValidator(ValidationPlugin):
    def validate(self, output: Any, context: Any) -> PluginOutcome:
        if self.is_valid(output):
            return PluginOutcome(passed=True)
        return PluginOutcome(passed=False, feedback="Validation failed")

# Use in pipeline
pipeline = Step.validate(make_validator_agent(), plugins=[sql_validator, CustomValidator()])

Self-Improvement & Evaluation

The library includes intelligent evaluation capabilities:

from flujo.application import evaluate_and_improve, SelfImprovementAgent
from flujo.infra.agents import make_self_improvement_agent

# Create improvement agent
improvement_agent = SelfImprovementAgent(make_self_improvement_agent())

# Generate improvement suggestions
report = await evaluate_and_improve(
    task_fn=your_task_function,
    dataset=your_evaluation_dataset,
    agent=improvement_agent,
    pipeline_definition=your_pipeline
)

# Review suggestions
for suggestion in report.suggestions:
    print(f"Issue: {suggestion.failure_pattern_summary}")
    print(f"Fix: {suggestion.detailed_explanation}")

Telemetry

The orchestrator includes built-in telemetry for:

  • Performance monitoring
  • Usage tracking
  • Error reporting
  • Distributed tracing
from flujo import init_telemetry

# Initialize telemetry
init_telemetry()

# Configure via environment variables:
# TELEMETRY_EXPORT_ENABLED=true
# OTLP_EXPORT_ENABLED=true
# OTLP_ENDPOINT=https://your-otlp-endpoint

Configuration

Settings can be controlled via environment variables or the settings object:

from flujo import settings

# Access current settings
print(f"Default solution model: {settings.default_solution_model}")
print(f"Reflection enabled: {settings.reflection_enabled}")

# Environment variables (in .env file):
# DEFAULT_SOLUTION_MODEL=openai:gpt-4o
# REFLECTION_ENABLED=true
# AGENT_TIMEOUT=60

Managed Resources

Flujo supports an optional resources container that is passed to every step and plugin during a pipeline run. This is a convenient place to keep shared objects like database connections or API clients.

Create your own container by inheriting from AppResources and pass an instance to the runner:

class MyResources(AppResources):
    db_pool: Any

resources = MyResources(db_pool=make_pool())
runner = Flujo(pipeline, resources=resources)

Any agent or plugin can declare a keyword-only argument named resources to receive this object.

Lifecycle Hooks

Lifecycle hooks let you run custom code before and after key events such as pre_run, post_run, pre_step, post_step, and on_step_failure.

async def log_hook(**kwargs):
    print("event", kwargs.get("event_name"))

runner = Flujo(pipeline, hooks=[log_hook])

Raise PipelineAbortSignal from a hook to stop execution.

Best Practices

  1. Agent Design
  2. Give clear, specific system prompts
  3. Use appropriate output types
  4. Include relevant tools when needed

  5. Pipeline Design

  6. Start simple, add complexity as needed
  7. Use validation steps for quality control
  8. Consider cost and performance implications

  9. Error Handling

  10. Implement proper retry logic
  11. Handle API failures gracefully
  12. Log errors for debugging

  13. Performance

  14. Use appropriate models for each step
  15. Implement caching where possible
  16. Monitor and optimize costs

  17. Quality Control

  18. Use reflection agents for self-improvement
  19. Implement custom validation plugins
  20. Monitor quality metrics over time

Usage Governor

Flujo can enforce cost and token limits during a pipeline run. Provide a UsageLimits object when creating the runner. If the pipeline exceeds either limit, it stops and raises UsageLimitExceededError with the partial result.

from flujo import Flujo, Step, UsageLimits

limits = UsageLimits(total_cost_usd_limit=1.0, total_tokens_limit=5000)
runner = Flujo(my_pipeline, usage_limits=limits)

Pluggable Execution Back-Ends

Advanced users can control where each step executes by implementing the ExecutionBackend protocol. The default LocalBackend runs steps in the current process, so behaviour is unchanged for typical usage. Custom back-ends can delegate work to remote services or task queues while the orchestration logic in Flujo stays the same.

from flujo.infra.backends import LocalBackend
from flujo import Flujo

runner = Flujo(pipeline, backend=LocalBackend())

See Creating a Custom Execution Backend for guidance on building your own.

Streaming

Flujo can stream output from the final step of a pipeline. Use stream_async to iterate over chunks as they are produced. The last item yielded is the full PipelineResult object.

async for chunk in runner.stream_async("hello"):
    if isinstance(chunk, str):
        print(chunk, end="")
    else:
        result = chunk

Next Steps