Modular Workflows
flujo
provides factory methods for building reusable workflow components. Step.from_mapper
wraps simple async functions as steps, while Step.map_over
and Step.parallel
compose pipelines for iterative or concurrent execution.
Step.from_mapper
Step.from_mapper
is a convenience around Step.from_callable
. It infers input and output types from the annotated async function and creates a Step
.
async def to_upper(text: str) -> str:
return text.upper()
upper_step = Step.from_mapper(to_upper)
Step.map_over
Step.map_over
runs a pipeline for each item in an iterable stored in the pipeline context. The collected outputs are returned as a list.
from flujo.domain.models import PipelineContext
class Ctx(PipelineContext):
nums: list[int]
body = Pipeline.from_step(Step.from_mapper(lambda x: x * 2, name="double"))
mapper = Step.map_over("mapper", body, iterable_input="nums")
runner = Flujo(mapper, context_model=Ctx)
result = runner.run(None, initial_context_data={"nums": [1, 2, 3]})
print(result.step_history[-1].output) # [2, 4, 6]
The mapping pipeline can also run in parallel when its steps are free of side effects.
class SleepAgent:
async def run(self, data: int) -> int:
await asyncio.sleep(0.01)
return data
body = Pipeline.from_step(Step("sleep", SleepAgent()))
mapper = Step.map_over("mapper_par", body, iterable_input="nums")
runner = Flujo(mapper, context_model=Ctx)
result = runner.run(None, initial_context_data={"nums": [0, 1, 2, 3]})
print(result.step_history[-1].output) # [0, 1, 2, 3]
Step.parallel
Step.parallel
executes multiple branch pipelines concurrently and aggregates their outputs in a dictionary keyed by branch name.
from flujo.domain.models import PipelineContext
class Ctx(PipelineContext):
val: int = 0
class AddAgent:
def __init__(self, inc: int) -> None:
self.inc = inc
async def run(self, data: int, *, context: Ctx | None = None) -> int:
if context is not None:
context.val += self.inc
await asyncio.sleep(0)
return data + self.inc
branches = {
"a": Step("a", AddAgent(1)),
"b": Step("b", AddAgent(2)),
}
parallel = Step.parallel("par", branches)
runner = Flujo(parallel, context_model=Ctx)
result = runner.run(0)
print(result.step_history[-1].output) # {"a": 1, "b": 2}
print(result.final_pipeline_context.val) # 0
Step.parallel
also supports merging branch contexts and flexible failure
handling via the merge_strategy
and on_branch_failure
parameters. See
pipeline_dsl.md
for details.
See pipeline_dsl.md for general DSL usage.