Typing Guide
flujo
uses Python type hints to help you build robust pipelines. This guide shows how the @step
decorator, ContextAware
protocols and static type checkers work together.
The @step
Decorator: Effortless Typed Steps
Wrapping an async function with @step
automatically creates a Step[In, Out]
object using the function's signature. No manual generics are required.
from flujo import step
async def legacy_process(data: str) -> int:
return len(data)
process_step = step(legacy_process) # Step[str, int]
The decorator can also be used directly on your function:
from flujo import step
@step
async def to_upper(text: str) -> str:
return text.upper()
Here to_upper
is already a Step[str, str]
ready to be composed with other steps.
Type Safety in Pipelines
Pipelines are strongly typed. If you try to chain incompatible steps, static analyzers such as mypy
will flag an error.
from flujo import step
@step
async def first(x: str) -> int:
return len(x)
@step
async def second(x: str) -> str:
return x
pipeline = first >> second # ❌ mypy: incompatible types
Because first
outputs an int
while second
expects a str
, mypy
warns that the composition is invalid.
Stateful Pipelines: The ContextAware
Protocols
To share state across steps, define a Pydantic model and have your agents or plugins implement one of the context aware protocols. They receive a typed context instance automatically.
Parameter Naming: Steps, agents, and plugins can declare a
context
parameter to receive the shared context.
from flujo.models import PipelineContext
from flujo.domain.agent_protocol import ContextAwareAgentProtocol
from flujo.domain.plugins import ContextAwarePluginProtocol, PluginOutcome
class MyContext(PipelineContext):
user_query: str
counter: int = 0
class CountingAgent(ContextAwareAgentProtocol[str, str, MyContext]):
async def run(self, data: str, *, context: MyContext, **_: object) -> str:
context.counter += 1
return data
class MyPlugin(ContextAwarePluginProtocol[MyContext]):
async def validate(self, data: dict[str, object], *, context: MyContext, **_: object) -> PluginOutcome:
return PluginOutcome(success=True)
Every call to Flujo.run()
creates a fresh context instance. Mutations are visible to all subsequent steps.
A Complete Example
from flujo import Flujo, step, PipelineResult
from flujo.models import PipelineContext
class Ctx(PipelineContext):
history: list[str] = []
@step
async def record(text: str, *, context: Ctx) -> str:
context.history.append(text)
return text.upper()
@step
async def cheer(text: str) -> str:
return f"{text}!"
pipeline = record >> cheer
runner = Flujo(pipeline, context_model=Ctx)
result: PipelineResult[str] = runner.run("hello")
print(result.final_pipeline_context.history) # ['hello']
print(result.step_history[-1].output) # 'HELLO!'
This pipeline records each input in the context while producing an enthusiastic response.