Skip to content

Pipeline DSL Guide

The Pipeline Domain-Specific Language (DSL) is a powerful way to create custom AI workflows in flujo. This guide explains how to use it effectively.

Overview

The Pipeline DSL lets you:

  • Compose complex workflows from simple steps and from other pipelines
  • Mix and match different agents
  • Add custom validation and scoring
  • Create reusable pipeline components

Steps vs. Agents

A Step is a declarative node in the pipeline. It holds configuration and a reference to the agent that performs the actual work. During execution the runner iterates over the steps and invokes their agents in order.

mermaid graph LR S[Step] -->|uses| A[Agent] A --> O[Output]

Step objects do not execute anything themselves—they simply describe what should happen. The agent may be an async function, an AsyncAgentWrapper created with make_agent_async, or any object implementing run().

Basic Usage

Recommended Pattern

For creating pipeline steps from your own async functions, the @step decorator is the simplest and most powerful approach. It automatically infers types and reduces boilerplate, making your code cleaner and safer.

Creating a Pipeline

from flujo import Flujo, step

@step
async def add_one(x: int) -> int:
    return x + 1

@step
async def add_two(x: int) -> int:
    return x + 2

pipeline = add_one >> add_two
runner = Flujo(pipeline)
result = runner.run(1)

The @step decorator infers the input and output types from the function's signature so the pipeline is typed as Step[int, int].

Pipeline Composition

The >> operator chains steps together:

@step
async def multiply(x: int) -> int:
    return x * 2

@step
async def add_three(x: int) -> int:
    return x + 3

pipeline1 = multiply >> add_three
pipeline2 = add_three >> multiply

Chaining Pipelines with Pipelines: Modular Multi-Stage Workflows

New You can now compose entire pipelines from other pipelines using the >> operator. This allows you to break complex workflows into logical, independent pipelines and then chain them together in a clean, readable sequence.

How it Works

  • Step >> StepPipeline
  • Pipeline >> StepPipeline
  • Pipeline >> PipelinePipeline (new!)

When you chain two pipelines, their steps are concatenated into a single, flat pipeline. The output of the first pipeline becomes the input to the second.

Example: Chaining Pipelines

from flujo import Pipeline, Step

# Define two independent pipelines
pipeline_a = Step("A1") >> Step("A2")
pipeline_b = Step("B1") >> Step("B2")

# Chain them together
master_pipeline = pipeline_a >> pipeline_b

# master_pipeline.steps == [A1, A2, B1, B2]

Real-World Example: Multi-Stage Data Processing

Suppose you want to process text in two stages: first, resolve concepts; then, generate and validate SQL.

from flujo import Pipeline, Step

# 1. Build each independent pipeline
concept_pipeline = Step("resolve_concepts", agent=concept_agent)
sql_pipeline = (
    Step("generate_sql", agent=sql_gen_agent) >>
    Step("validate_sql", agent=sql_val_agent)
)

# 2. Chain them together using the >> operator
master_pipeline = concept_pipeline >> sql_pipeline

# The resulting pipeline takes text and outputs validated SQL

This approach: - Keeps each stage modular and testable - Produces a single, flat pipeline for unified context and observability - Is fully type-safe and backward compatible

Tip: You can chain as many pipelines as you want: p1 >> p2 >> p3.

Why This Matters

  • True Sequencing: Models a sequence of operations, not just nested sub-pipelines.
  • Unified Context: All steps share a single context and are visible to the tracer.
  • Simplicity: No need for special sub-pipeline steps or wrappers.

Creating Steps from Functions

Use the @step decorator to wrap your own async functions. The decorator infers both the input and output types:

@step
async def to_upper(text: str) -> str:
    return text.upper()

upper_step = to_upper

The resulting upper_step has the type Step[str, str] and can be composed like any other step.

Step Types

Review Steps

Review steps create quality checklists:

# Basic review step
review_step = Step.review(review_agent)

# With custom timeout
review_step = Step.review(review_agent, timeout=30)

# With custom retry logic
review_step = Step.review(
    review_agent,
    retries=3,
    backoff_factor=2
)

Solution Steps

Solution steps generate the main output:

# Basic solution step
solution_step = Step.solution(solution_agent)

# With structured output
from pydantic import BaseModel

class CodeSnippet(BaseModel):
    language: str
    code: str
    explanation: str

code_agent = make_agent_async(
    "openai:gpt-4",
    "You are a programming expert.",
    CodeSnippet
)

solution_step = Step.solution(code_agent)

# With tools
from pydantic_ai import Tool

def get_weather(city: str) -> str:
    return f"Weather in {city}: Sunny"

weather_tool = Tool(get_weather)
solution_step = Step.solution(
    solution_agent,
    tools=[weather_tool]
)

Validation Steps

Validation steps verify the solution:

# Basic validation
validate_step = Step.validate_step(validator_agent)

# With strict validation (default) - step fails if validation fails
strict_step = Step.validate_step(validator_agent, validators=[...], strict=True)

# With non-strict validation - step passes but records validation failure in metadata
audit_step = Step.validate_step(validator_agent, validators=[...], strict=False)

Strict vs Non-Strict Validation:

  • strict=True (default): If any validation fails, the entire step fails and the pipeline stops or retries.
  • strict=False: The step always reports success=True, but validation failures are recorded in StepResult.metadata_['validation_passed'] = False. This is useful for creating "warning" or "auditing" steps that don't block the pipeline.
# Example: Audit step that warns but doesn't fail
audit_step = Step.validate_step(
    validator_agent,
    validators=[WordCountValidator()],
    strict=False  # Will pass even if validation fails
)

# Later in your pipeline, you can check the metadata
if result.step_history[-1].metadata_.get('validation_passed') == False:
    print("Warning: Validation failed but pipeline continued")

With custom scoring

from flujo.domain.scoring import weighted_score

weights = { "correctness": 0.6, "readability": 0.4 }

validate_step = Step.validate_step( validator_agent, scorer=lambda c: weighted_score(c, weights) )

With plugins

from flujo.plugins import SQLSyntaxValidator

validate_step = Step.validate_step( validator_agent, plugins=[SQLSyntaxValidator()] )

With programmatic validators

from flujo.validation import BaseValidator, ValidationResult

class WordCountValidator(BaseValidator): async def validate(self, output_to_check: str, *, context=None) -> ValidationResult: return ValidationResult(is_valid=len(output_to_check.split()) < 5, validator_name=self.name, feedback="Too many words" if len(output_to_check.split()) >= 5 else None)

validate_step = Step.validate_step( validator_agent, validators=[WordCountValidator()] )

See Hybrid Validation Cookbook for a complete example.

All step factories also accept a `processors: Optional[AgentProcessors]` parameter
to run pre-processing and post-processing hooks. See [Using Processors](cookbook/using_processors.md)
for details.
For complex data shaping before calling another step, consider using an [Adapter Step](cookbook/adapter_step.md).

## Advanced Features

git ### Looping and Iteration

Repeat a sub-pipeline until a condition is met using `Step.loop_until()`.
See [LoopStep documentation](pipeline_looping.md) for full details.

```python
loop_step = Step.loop_until(
    name="refine",
    loop_body_pipeline=Pipeline.from_step(Step.solution(solution_agent)),
    exit_condition_callable=lambda out, ctx: "done" in out,
)

pipeline = Step.review(review_agent) >> loop_step >> Step.validate_step(validator_agent)

Typed Pipeline Context

a Flujo runner can share a mutable Pydantic model instance across all steps in a single run. Pass a context model when creating the runner and declare a context parameter in your step functions or agents. See Typed Pipeline Context for a full explanation.

from flujo.models import PipelineContext

class MyContext(PipelineContext):
    counter: int = 0

@step
async def increment(data: str, *, context: MyContext | None = None) -> str:
    if context:
        context.counter += 1
    return data

pipeline = increment >> increment
runner = Flujo(pipeline, context_model=MyContext)
result = runner.run("hi")
print(result.final_pipeline_context.counter)  # 2

Each run() call gets a fresh context instance. Access the final state via PipelineResult.final_pipeline_context.

You can also have a step return a partial context object and mark it with updates_context=True to automatically merge those fields into the running context:

@step(updates_context=True)
async def bootstrap(_: str) -> MyContext:
    return MyContext(counter=42)

pipeline = bootstrap >> increment
runner = Flujo(pipeline, context_model=MyContext)
result = runner.run("hi")
print(result.final_pipeline_context.counter)  # 43

When a step marked with updates_context=True returns a dictionary or a Pydantic model, the new data is merged into the current pipeline context. This merge is validation-safe: Pydantic recursively reconstructs all nested models and the entire context is revalidated. If the update would result in an invalid context, the step fails and the previous state is restored, preventing data corruption in later steps.

Managed Resources

You can also pass a long-lived resources container to the runner. Declare a keyword-only resources argument in your agents or plugins to use it.

class MyResources(AppResources):
    db_pool: Any

@step
async def query(data: int, *, resources: MyResources) -> str:
    return resources.db_pool.get_user(data)

runner = Flujo(query, resources=my_resources)

Conditional Branching

Use Step.branch_on() to route to different sub-pipelines at runtime. See ConditionalStep for full details.

def choose_branch(out, ctx):
    return "a" if "important" in out else "b"

branch_step = Step.branch_on(
    name="router",
    condition_callable=choose_branch,
    branches={
        "a": Pipeline.from_step(Step("a_step", agent_a)),
        "b": Pipeline.from_step(Step("b_step", agent_b)),
    },
)

pipeline = Step.solution(solution_agent) >> branch_step >> Step.validate_step(validator_agent)

Custom Step Factories

Create reusable step factories:

def create_code_step(agent, **config):
    """Create a solution step with code validation."""
    step = Step.solution(agent, **config)
    step.add_plugin(SQLSyntaxValidator())
    return step

# Use the factory
pipeline = (
    Step.review(review_agent)
    >> create_code_step(solution_agent)
    >> Step.validate_step(validator_agent)
)

Error Handling

Retry Logic

# Configure retries at the step level
step = Step.solution(
    solution_agent,
    retries=3,
    backoff_factor=2,
    retry_on_error=True
)

# Configure retries at the pipeline level
runner = Flujo(
    pipeline,
    max_retries=3,
    retry_on_error=True
)

Best Practices

  1. Pipeline Design
  2. Keep pipelines focused and simple
  3. Use meaningful step names
  4. Document complex pipelines
  5. Test thoroughly

  6. Error Handling

  7. Add appropriate retries
  8. Log errors properly
  9. Monitor performance

  10. Performance

  11. Optimize step order
  12. Cache results when possible
  13. Monitor resource usage

  14. Maintenance

  15. Create reusable components
  16. Version your pipelines
  17. Document dependencies
  18. Test regularly

Examples

Code Generation Pipeline

from flujo import Step, Flujo
from flujo.plugins import (
    SQLSyntaxValidator,
    CodeStyleValidator
)

# Create a code generation pipeline
pipeline = (
    Step.review(review_agent)  # Define requirements
    >> Step.solution(code_agent)  # Generate code
    >> Step.validate_step(
        validator_agent,
        plugins=[
            SQLSyntaxValidator(),
            CodeStyleValidator()
        ]
    )
)

# Run it
runner = Flujo(pipeline)
result = runner.run("Write a SQL query to find active users")

Content Generation Pipeline

# Create a content generation pipeline
pipeline = (
    Step.review(review_agent)  # Define content guidelines
    >> Step.solution(writer_agent)  # Generate content
    >> Step.validate_step(
        validator_agent,
        scorer=lambda c: weighted_score(c, {
            "grammar": 0.3,
            "style": 0.3,
            "tone": 0.4
        })
    )
)

# Run it
runner = Flujo(pipeline)
result = runner.run("Write a blog post about AI")

Troubleshooting

Common Issues

  1. Pipeline Errors
  2. Check step order
  3. Verify agent compatibility
  4. Review error messages
  5. Check configuration

  6. Performance Issues

  7. Monitor step durations
  8. Check resource usage
  9. Optimize step order

  10. Quality Issues

  11. Review scoring weights
  12. Check validation rules
  13. Monitor success rates
  14. Adjust agents

Getting Help

Next Steps

  • Read the Usage Guide
  • Explore Advanced Topics
  • Check out Use Cases
  • Future work: a pipeline.visualize() helper will output a Mermaid graph so you can instantly diagram your pipeline.

Pipeline DSL

The Pipeline DSL provides a fluent interface for building complex workflows. It supports sequential execution, conditional branching, parallel execution, and looping.

Steps

Steps are the basic building blocks of pipelines. Each step has a name and an agent that performs the actual work.

Creating Steps

from flujo import Step

# Create a step with an agent
step = Step("my_step", my_agent)

# Create a step with configuration
step = Step("my_step", my_agent, max_retries=3, timeout_s=30.0)

Step Configuration

Steps can be configured with various options:

  • max_retries: Number of retry attempts (default: 1)
  • timeout_s: Timeout in seconds (default: None)
  • temperature: Temperature for LLM agents (default: None)

Fallback Steps

Use .fallback(other_step) to specify an alternate step to run if the primary step fails after exhausting its retries. The fallback receives the same input as the original step.

from flujo import Step

primary = Step("generate", primary_agent, max_retries=2)
backup = Step("backup", backup_agent)
primary.fallback(backup)

If the fallback succeeds, the overall step is marked successful and StepResult.metadata_['fallback_triggered'] is set to True. Metrics like latency, cost, and token counts from the fallback step are merged into the primary result. Circular fallback references raise InfiniteFallbackError.

Pipelines

Pipelines are sequences of steps that execute in order.

Creating Pipelines

from flujo import Pipeline, Step

# Create a pipeline from steps
pipeline = Step("step1", agent1) >> Step("step2", agent2) >> Step("step3", agent3)

# Or create a pipeline directly
pipeline = Pipeline([step1, step2, step3])

Pipeline Composition

Pipelines can be composed using the >> operator:

pipeline1 = Step("a", agent_a) >> Step("b", agent_b)
pipeline2 = Step("c", agent_c) >> Step("d", agent_d)
combined = pipeline1 >> pipeline2

Conditional Steps

Conditional steps execute different branches based on a condition.

from flujo import Step

def route_by_type(data, context):
    if "code" in str(data):
        return "code"
    return "text"

conditional = Step.branch_on(
    name="router",
    condition_callable=route_by_type,
    branches={
        "code": Pipeline.from_step(Step("code_gen", code_agent)),
        "text": Pipeline.from_step(Step("text_gen", text_agent)),
    }
)

Parallel Steps

Parallel steps execute multiple branches concurrently and aggregate their outputs.

Basic Parallel Execution

from flujo import Step

parallel = Step.parallel(
    name="parallel_processing",
    branches={
        "analysis": Pipeline.from_step(Step("analyze", analysis_agent)),
        "summary": Pipeline.from_step(Step("summarize", summary_agent)),
    }
)

Optimized Context Copying

For pipelines with large context objects, you can optimize performance by specifying which context fields each branch needs:

from flujo import Step

# Only copy specific context fields to each branch
parallel = Step.parallel(
    name="parallel_optimized",
    branches={
        "analysis": Pipeline.from_step(Step("analyze", analysis_agent)),
        "summary": Pipeline.from_step(Step("summarize", summary_agent)),
    },
    context_include_keys=["user_id", "document_id"]  # Only copy these fields
)

This feature provides significant performance improvements when: - Your context contains large data structures (documents, images, etc.) - You have many parallel branches - Each branch only needs a subset of the context data

Context Merging and Failure Handling

Step.parallel can merge context updates from its branches back into the main pipeline context. Use the merge_strategy parameter to control how merging is performed and on_branch_failure to define failure behavior.

from flujo import Step
from flujo.domain import MergeStrategy, BranchFailureStrategy

parallel = Step.parallel(
    name="parallel_merge",
    branches={"a": Pipeline.from_step(Step("a", a_agent)), "b": Pipeline.from_step(Step("b", b_agent))},
    merge_strategy=MergeStrategy.MERGE_SCRATCHPAD,
    on_branch_failure=BranchFailureStrategy.IGNORE,
)

Available MergeStrategy values:

  • NO_MERGE (default) – discard branch context modifications.
  • OVERWRITE – context from the last declared successful branch overwrites matching fields.
  • MERGE_SCRATCHPAD – merge scratchpad dictionaries from all successful branches. Branch scratchpads are merged in alphabetical order and key collisions raise a ValueError.

on_branch_failure accepts PROPAGATE (default) or IGNORE. When set to IGNORE, the parallel step succeeds as long as one branch succeeds and the output dictionary includes the failed StepResult objects for inspection.

Proactive Governor Cancellation

Parallel steps now support proactive cancellation when usage limits are breached. When any branch exceeds cost or token limits, sibling branches are immediately cancelled to prevent unnecessary resource consumption:

from flujo import Step, UsageLimits

parallel = Step.parallel(
    name="parallel_governed",
    branches={
        "fast_expensive": Pipeline.from_step(Step("expensive", costly_agent)),
        "slow_cheap": Pipeline.from_step(Step("cheap", cheap_agent)),
    }
)

# If fast_expensive breaches the limit, slow_cheap will be cancelled immediately
limits = UsageLimits(total_cost_usd_limit=0.10)
runner = Flujo(parallel, usage_limits=limits)

This feature is particularly beneficial when: - You have branches with varying costs and execution times - You want to minimize wasted resources when limits are exceeded - You need predictable execution times under resource constraints

Dynamic Parallel Router

Use Step.dynamic_parallel_branch() when an agent selects which parallel branches to run at runtime. The router agent returns a list of branch names.

router = Step.dynamic_parallel_branch(
    name="router",
    router_agent=my_router_agent,
    branches={"billing": billing_pipe, "support": support_pipe},
)

The step behaves like Step.parallel and records executed branches in StepResult.metadata_["executed_branches"].

Loop Steps

Loop steps execute a pipeline repeatedly until a condition is met.

from flujo import Step

def should_continue(output, context):
    return len(str(output)) < 100

loop = Step.loop_until(
    name="refinement_loop",
    loop_body_pipeline=Pipeline.from_step(Step("refine", refine_agent)),
    exit_condition_callable=should_continue,
    max_loops=5
)

Human-in-the-Loop Steps

Human-in-the-loop steps pause execution for human input.

from flujo import Step

hitl = Step.human_in_the_loop(
    name="approval",
    message_for_user="Please review and approve the generated content"
)

Map Steps

Map steps apply a pipeline to each item in an iterable from the context.

from flujo import Step

class Context(BaseModel):
    items: List[str]

map_step = Step.map_over(
    name="process_items",
    pipeline_to_run=Pipeline.from_step(Step("process", process_agent)),
    iterable_input="items"
)

Step Factories

Flujo provides several factory methods for creating specialized steps.

From Callable

from flujo import Step

async def my_function(data: str, *, context: BaseModel | None = None) -> str:
    return data.upper()

step = Step.from_callable(my_function, name="uppercase")

From Mapper

from flujo import Step

async def double(x: int) -> int:
    return x * 2

step = Step.from_mapper(double, name="double")

Caching Step Results

Use Step.cached() to store the result of an expensive step in a cache backend.

from flujo import Step
from flujo.caching import InMemoryCache

expensive = Step("slow", agent)
cached = Step.cached(expensive, cache_backend=InMemoryCache())

On a cache hit, StepResult.metadata_["cache_hit"] will be True. The cache key includes a stable hash of the step's full definition (agent, config, plugins, etc.), the step input data, and any context or resources provided.

Validation and Error Handling

Steps can include validation plugins and error handlers.

from flujo import Step

step = Step("validated", agent).add_plugin(validator, priority=1)

Context Updates

Steps can update the pipeline context.

from flujo import Step

step = Step("updater", agent, updates_context=True)

Step Metadata

Steps can carry arbitrary metadata.

from flujo import Step

step = Step("metadata", agent, meta={"version": "1.0", "author": "team"})

[!TIP] Rapid Iteration: You can now run any custom pipeline directly from the command line using flujo run my_pipeline.py --input "your prompt". This is the fastest way to test and debug your pipelines—no need for a custom script. See usage.md for details.

[!TIP] Pipeline Composition: For advanced pipeline composition patterns, including wrapping entire pipelines as steps within other pipelines, see the Pipeline as a Step cookbook guide.