Skip to content

Extending flujo

Adding a Custom Agent

from pydantic_ai import Agent
class MyAgent(Agent):
    ...

Adding a Reflection Step

The simplified orchestrator no longer performs reflection automatically. To incorporate strategic feedback, build a custom pipeline using Step:

from flujo import Step, Flujo, get_reflection_agent
from flujo.infra.agents import make_review_agent, make_solution_agent, make_validator_agent

reflection_agent = get_reflection_agent(model="anthropic:claude-3-haiku")

pipeline = (
    Step.review(make_review_agent())
    >> Step.solution(make_solution_agent())
    >> Step.validate(make_validator_agent())
    >> Step.validate(reflection_agent)
)

result = Flujo(pipeline).run("Write a poem")

Creating Custom Step Factories with Pre-configured Plugins

If you frequently use a step with the same set of plugins, you can create your own factory function:

from flujo import Step
from my_app.plugins import MyCustomValidator

def ReusableSQLStep(agent, **config) -> Step:
    '''A solution step that always includes MyCustomValidator.'''
    step = Step.solution(agent, **config)
    step.add_plugin(MyCustomValidator(), priority=10)
    return step

# Usage:
pipeline = ReusableSQLStep(my_sql_agent) >> Step.validate(...)

Creating a Custom Execution Backend

Execution back-ends allow you to control how and where pipeline steps run. Implement the ExecutionBackend protocol and pass your implementation to Flujo.

from flujo.domain.backends import ExecutionBackend, StepExecutionRequest
from flujo.models import StepResult

class LoggingBackend(ExecutionBackend):
    def __init__(self, registry: dict[str, Any]):
        self.agent_registry = registry

    async def execute_step(self, request: StepExecutionRequest) -> StepResult:
        print(f"Executing {request.step.name}")
        agent = request.step.agent
        if request.stream and hasattr(agent, "stream"):
            chunks: list[Any] = []
            async for c in agent.stream(request.input_data):
                if request.on_chunk is not None:
                    await request.on_chunk(c)
                chunks.append(c)
        else:
            chunks = [await agent.run(request.input_data)]
        output = (
            "".join(chunks)
            if chunks and all(isinstance(c, str) for c in chunks)
            else chunks
        )
        return StepResult(name=request.step.name, output=output)

custom_backend = LoggingBackend({})
runner = Flujo(pipeline, backend=custom_backend)

For remote back-ends, use the agent_registry to safely map agent names to trusted objects.

Automatic Context and Resource Injection

flujo can automatically inject PipelineContext and AppResources into your custom functions and methods if they are type-hinted as keyword-only arguments. This allows you to write cleaner, more reusable code without having to manually pass these objects around.

How it Works

When you use a custom function as a mapper in a Step, flujo analyzes its signature to determine if it needs context or resources. If it finds a keyword-only argument named context that is a subclass of BaseModel, or a keyword-only argument named resources that is a subclass of AppResources, it will automatically inject the corresponding object at runtime.

Example

from flujo import Step, Flujo
from flujo.domain.models import PipelineContext
from flujo.domain.resources import AppResources

class MyContext(PipelineContext):
    counter: int = 0

class MyResources(AppResources):
    db_pool: Any

async def my_mapper(text: str, *, context: MyContext, resources: MyResources) -> str:
    context.counter += 1
    # Access the database pool from resources
    db_conn = await resources.db_pool.acquire()
    # ... do something with the database connection ...
    await resources.db_pool.release(db_conn)
    return text.upper()

# Create a pipeline with the custom mapper
custom_pipeline = Step.from_mapper(my_mapper)

# Initialize Flujo with the context and resources
runner = Flujo(
    custom_pipeline,
    context_model=MyContext,
    initial_context_data={"counter": 0},
    resources=MyResources(db_pool=make_pool()),
)

# Run the pipeline
result = runner.run("some input")

# The counter in the context will be incremented
assert result.final_pipeline_context.counter == 1

In this example, flujo automatically injects the MyContext and MyResources objects into the my_mapper function because they are type-hinted as keyword-only arguments named context and resources.

Transactional resources (per-attempt)

If your AppResources object implements a sync or async context manager, Flujo will enter it for each step attempt and exit it afterward. This lets you bind a database session/transaction to a single attempt (including retries and parallel branches) without holding connections across the entire run. Make sure the resource is re-entrant or can hand out per-attempt handles (e.g., via a pool) so parallel steps do not trample shared state.

State Providers for External Data

For large datasets (knowledge graphs, vector stores, conversation history), use StateProvider and ContextReference instead of storing data directly in context. This keeps your context lightweight and serializable.

Defining a StateProvider

from flujo.domain.interfaces import StateProvider
from typing import List

class KnowledgeGraphProvider(StateProvider):
    """Manages external knowledge graph data."""

    def __init__(self, db_connection):
        self._db = db_connection

    async def load(self, key: str) -> List[dict]:
        """Load graph data from database."""
        return await self._db.fetch_all(
            "SELECT * FROM nodes WHERE graph_key = ?", key
        )

    async def save(self, key: str, data: List[dict]) -> None:
        """Persist graph updates to database."""
        await self._db.execute_many(
            "INSERT OR REPLACE INTO nodes (graph_key, data) VALUES (?, ?)",
            [(key, node) for node in data]
        )

Using ContextReference

from flujo.domain.models import PipelineContext, ContextReference
from pydantic import Field

class MyContext(PipelineContext):
    # Lightweight pointer - not the actual data
    knowledge: ContextReference[List[dict]] = Field(
        default_factory=lambda: ContextReference(
            provider_id="kg_provider",  # Maps to state_providers key
            key="main_graph"            # Passed to provider.load/save
        )
    )

Registering Providers

runner = Flujo(
    pipeline=my_pipeline,
    context_model=MyContext,
    state_providers={
        "kg_provider": KnowledgeGraphProvider(my_db)
    }
)

Accessing Data in Steps

from datetime import datetime, timezone

@step
async def search_graph(query: str, *, context: MyContext) -> str:
    # Automatically hydrated from provider
    graph_data = context.knowledge.get()

    # Modify and set back (persisted after step completes)
    graph_data.append({"query": query, "timestamp": datetime.now(timezone.utc).isoformat()})
    context.knowledge.set(graph_data)

    return f"Processed with {len(graph_data)} nodes"

See examples/state_providers_demo.py for a complete working example.