First‑Class Extensible Step Primitives
This document explains how Flujo supports first‑class, extensible Step primitives via a framework registry and policy‑driven execution. It also shows how to add your own high‑level orchestration steps and make them available in the Python DSL, YAML loader, and the AI Architect.
Overview
Flujo’s executor is policy‑driven: concrete step types resolve to execution policies that implement their semantics. FSD‑025 introduces a public registry for adding new Step
subclasses as first‑class primitives, with full YAML and Architect integration.
Key components:
flujo.framework.registry
register_step_type(step_class)
: Map akind
string →Step
subclassregister_policy(step_class, policy_instance)
: MapStep
subclass → policy instanceget_step_class(kind)
: Resolve a customkind
during YAML load- YAML loader: delegates custom
kind
to the registry; instantiates the registered model viamodel_validate()
- ExecutorCore: initializes its
PolicyRegistry
with the framework policy registry and routes execution accordingly - Architect skill:
flujo.builtins.get_framework_schema
exposes JSON Schemas for all registered step primitives
Registry API
from typing import Any
from flujo.framework import registry
from flujo.domain.dsl.step import Step
class MyStep(Step[Any, Any]):
kind = "MyStep"
class MyPolicy:
async def execute(self, core, frame):
# Return a StepOutcome (Success/Failure/Paused)
from flujo.domain.models import StepResult, Success
return Success(step_result=StepResult(name="MyStep", output=None, success=True))
# Register at import time (e.g., in your package __init__.py)
registry.register_step_type(MyStep)
registry.register_policy(MyStep, MyPolicy())
Rules:
- step_class
must be a subclass of Step
and declare a class attr kind: str
- Duplicate kind
registrations raise ConfigurationError
- Policies can be callables that accept an ExecutionFrame
or objects exposing execute(core, frame)
YAML Loader Integration
Custom steps become first‑class YAML primitives:
version: "0.1"
steps:
- kind: MyStep
name: X
# ... custom fields supported by your Pydantic model
During load, the loader checks built‑in kinds; otherwise it calls registry.get_step_class(kind)
and instantiates the Pydantic model via model_validate(your_yaml_dict)
.
ExecutorCore Dispatch
ExecutorCore
bootstraps its PolicyRegistry
with the global policy registry from flujo.framework.registry
. If a policy object exposes execute(core, frame)
, the core binds it into a callable so the dispatch signature remains uniform.
AI Architect Integration
Flujo’s Architect learns about registered primitives via the builtin skill:
flujo.builtins.get_framework_schema
returns{"steps": {kind: json_schema}}
wherejson_schema
is produced by your step model’smodel_json_schema()
.
You can import flujo.builtins
(or run the CLI) to ensure skills are registered, then query the skill registry to fetch the schema at runtime.
Best Practices
- Keep complex orchestration logic in policies; keep
Step
models declarative and strongly typed - Use Pydantic validators to coerce nested YAML (e.g., converting lists of steps into Pipelines)
- Maintain context isolation and merging via
ContextManager
helpers from the core (see Team Guide) - Add tests: registry behavior, YAML parsing for your
kind
, and execution (policy path)