API Reference
Welcome to the Flujo API reference. This documentation is automatically generated from the source code using mkdocstrings.
Core Modules
flujo.application
Application-level components for flujo.
Flujo
Bases: Generic[RunnerInT, RunnerOutT, ContextT]
Execute a pipeline sequentially.
Parameters
pipeline : Pipeline | Step | None, optional
Pipeline object to run directly. Deprecated when using registry
.
registry : PipelineRegistry, optional
Registry holding named pipelines.
pipeline_name : str, optional
Name of the pipeline registered in registry
.
pipeline_version : str, default "latest"
Version to load from the registry when the run starts.
state_backend : StateBackend, optional
Backend used to persist :class:WorkflowState
for durable execution.
delete_on_completion : bool, default False
If True
remove persisted state once the run finishes.
disable_tracing
disable_tracing() -> None
Disable tracing by removing the TraceManager hook.
run_async
async
run_async(
initial_input: RunnerInT,
*,
run_id: str | None = None,
initial_context_data: Optional[Dict[str, Any]] = None,
) -> AsyncIterator[PipelineResult[ContextT]]
Run the pipeline asynchronously.
Parameters
run_id: Optional identifier for this run. When provided the runner will load and persist state under this ID, enabling durable execution without embedding the ID in the context model.
This method should be used when an asyncio event loop is already running, such as within Jupyter notebooks or async web frameworks.
It yields any streaming output from the final step and then the final
PipelineResult
object.
run
run(
initial_input: RunnerInT,
*,
run_id: str | None = None,
initial_context_data: Optional[Dict[str, Any]] = None,
) -> PipelineResult[ContextT]
Run the pipeline synchronously.
This helper should only be called from code that is not already running
inside an asyncio event loop. If a running loop is detected a
TypeError
is raised instructing the user to use run_async
instead.
resume_async
async
resume_async(
paused_result: PipelineResult[ContextT],
human_input: Any,
) -> PipelineResult[ContextT]
Resume a paused pipeline with human input.
as_step
as_step(
name: str,
*,
inherit_context: bool = True,
**kwargs: Any,
) -> Step[RunnerInT, PipelineResult[ContextT]]
Return this Flujo
runner as a composable :class:Step
.
Parameters
name:
Name of the resulting step.
**kwargs:
Additional Step
configuration passed to :class:Step
.
Returns
Step Step that executes this runner when invoked inside another pipeline.
SelfImprovementAgent
Agent that analyzes failures and suggests improvements.
run_pipeline_async
async
run_pipeline_async(
inputs: Any, *, runner: Flujo[Any, Any, Any]
) -> PipelineResult[Any]
Adapter to run a :class:Flujo
engine as a pydantic-evals task.
evaluate_and_improve
async
evaluate_and_improve(
task_function: Callable[
[Any], Awaitable[PipelineResult[Any]]
],
dataset: Any,
improvement_agent: SelfImprovementAgent,
pipeline_definition: Optional[
Pipeline[Any, Any] | Step[Any, Any]
] = None,
) -> ImprovementReport
Run dataset evaluation and return improvement suggestions.
flujo.domain
Domain components for flujo.
Step
Bases: BaseModel
, Generic[StepInT, StepOutT]
Declarative node in a pipeline.
A Step
holds a reference to the agent that will execute, configuration
such as retries and timeout, and optional plugins. It does not execute
anything by itself. Steps are composed into :class:Pipeline
objects and
run by the :class:~flujo.application.runner.Flujo
engine.
Use :meth:arun
to invoke the underlying agent directly during unit tests.
__call__
__call__(*args: Any, **kwargs: Any) -> Any
Disallow direct invocation of a Step.
__getattr__
__getattr__(item: str) -> Any
Raise a helpful error when trying to access non-existent attributes.
arun
async
arun(data: StepInT, **kwargs: Any) -> StepOutT
Run this step's agent directly for testing purposes.
fallback
fallback(
fallback_step: "Step[Any, Any]",
) -> "Step[StepInT, StepOutT]"
Set a fallback step to execute if this step fails.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fallback_step
|
'Step[Any, Any]'
|
The step to execute if this step fails |
required |
Returns:
Type | Description |
---|---|
'Step[StepInT, StepOutT]'
|
self for method chaining |
add_plugin
add_plugin(
plugin: "ValidationPlugin",
) -> "Step[StepInT, StepOutT]"
Add a validation plugin to this step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plugin
|
'ValidationPlugin'
|
The validation plugin to add |
required |
Returns:
Type | Description |
---|---|
'Step[StepInT, StepOutT]'
|
self for method chaining |
review
classmethod
review(
agent: AsyncAgentProtocol[Any, Any],
*,
plugins: Optional[
list[tuple[ValidationPlugin, int]]
] = None,
validators: Optional[List[Validator]] = None,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
**config: Any,
) -> "Step[Any, Any]"
Construct a review step using the provided agent.
solution
classmethod
solution(
agent: AsyncAgentProtocol[Any, Any],
*,
plugins: Optional[
list[tuple[ValidationPlugin, int]]
] = None,
validators: Optional[List[Validator]] = None,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
**config: Any,
) -> "Step[Any, Any]"
Construct a solution step using the provided agent.
validate_step
classmethod
validate_step(
agent: AsyncAgentProtocol[Any, Any],
*,
plugins: Optional[
list[tuple[ValidationPlugin, int]]
] = None,
validators: Optional[List[Validator]] = None,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
strict: bool = True,
**config: Any,
) -> "Step[Any, Any]"
Construct a validation step using the provided agent.
from_callable
classmethod
from_callable(
callable_: Callable[
Concatenate[StepInT, P],
Coroutine[Any, Any, StepOutT],
],
name: str | None = None,
updates_context: bool = False,
validate_fields: bool = False,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
is_adapter: bool = False,
**config: Any,
) -> "Step[StepInT, StepOutT]"
Create a Step from an async callable.
from_mapper
classmethod
from_mapper(
mapper: Callable[
Concatenate[StepInT, P],
Coroutine[Any, Any, StepOutT],
],
name: str | None = None,
updates_context: bool = False,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
**config: Any,
) -> "Step[StepInT, StepOutT]"
Alias for :meth:from_callable
to improve readability.
human_in_the_loop
classmethod
human_in_the_loop(
name: str,
message_for_user: str | None = None,
input_schema: Type[BaseModel] | None = None,
) -> "HumanInTheLoopStep"
Construct a HumanInTheLoop step.
refine_until
classmethod
refine_until(
name: str,
generator_pipeline: "Pipeline[Any, Any]",
critic_pipeline: "Pipeline[Any, RefinementCheck]",
max_refinements: int = 5,
feedback_mapper: Optional[
Callable[[Any, RefinementCheck], Any]
] = None,
**config_kwargs: Any,
) -> "LoopStep[ContextModelT]"
Convenience for the generator -> critic refinement loop pattern.
use_input
use_input(key: str) -> 'Pipeline[Any, StepOutT]'
Create a small adapter pipeline that selects a key from a dict input.
This is a common pattern when working with :meth:parallel
branches
where each branch only needs a portion of the upstream output.
gather
classmethod
gather(
name: str, *, wait_for: List[str], **config_kwargs: Any
) -> "Step[Any, Dict[str, Any]]"
Collect outputs from multiple parallel branches.
The step expects a dictionary input (e.g. from :meth:parallel
) and
returns a dictionary containing only the specified keys.
StepConfig
Bases: BaseModel
Configuration options applied to every step.
Parameters
max_retries: How many times the step should be retried on failure. timeout_s: Optional timeout in seconds for the agent execution. temperature: Optional temperature setting for LLM based agents.
Task
Bases: BaseModel
Represents a task to be solved by the orchestrator.
Candidate
Bases: BaseModel
Represents a potential solution and its evaluation metadata.
Checklist
Bases: BaseModel
A checklist for evaluating a solution.
ChecklistItem
Bases: BaseModel
A single item in a checklist for evaluating a solution.
PipelineResult
Bases: BaseModel
, Generic[ContextT]
Aggregated result of running a pipeline.
StepResult
Bases: BaseModel
Result of executing a single pipeline step.
UsageLimits
Bases: BaseModel
Defines resource consumption limits for a pipeline run.
ExecutedCommandLog
Bases: BaseModel
Structured log entry for a command executed in the loop.
ExecutionBackend
Bases: Protocol
Protocol for executing pipeline steps.
execute_step
async
execute_step(request: StepExecutionRequest) -> StepResult
Execute a single step and return the result.
StepExecutionRequest
dataclass
Serializable request for executing a single step.
Contains the step to execute, input data, context object, resources, and execution configuration.
AgentProcessors
Bases: BaseModel
Collections of prompt and output processors.
PluginOutcome
Bases: BaseModel
Result returned by a validation plugin.
ValidationPlugin
Bases: Protocol
Protocol that all validation plugins must implement.
Validator
Bases: Protocol
A generic, stateful protocol for any component that can validate a step's output.
validate
async
validate(
output_to_check: Any,
*,
context: Optional[BaseModel] = None,
) -> ValidationResult
Validates the given output.
ValidationResult
Bases: BaseModel
The standard output from any validator, providing a clear pass/fail signal and feedback.
ValidationFinding
Bases: BaseModel
Represents a single validation finding.
ValidationReport
Bases: BaseModel
Aggregated validation results for a pipeline.
AppResources
Bases: BaseModel
Base class for user-defined resource containers.
adapter_step
adapter_step(
func: Callable[
Concatenate[StepInT, P],
Coroutine[Any, Any, StepOutT],
],
*,
name: str | None = None,
updates_context: bool = False,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
**config_kwargs: Any,
) -> "Step[StepInT, StepOutT]"
adapter_step(
*,
name: str | None = None,
updates_context: bool = False,
processors: Optional[AgentProcessors] = None,
persist_feedback_to_context: Optional[str] = None,
persist_validation_results_to: Optional[str] = None,
**config_kwargs: Any,
) -> Callable[
[
Callable[
Concatenate[StepInT, P],
Coroutine[Any, Any, StepOutT],
]
],
"Step[StepInT, StepOutT]",
]
adapter_step(
func: Callable[
Concatenate[StepInT, P],
Coroutine[Any, Any, StepOutT],
]
| None = None,
**kwargs: Any,
) -> Any
Alias for :func:step
that marks the created step as an adapter.
flujo.exceptions
Custom exceptions for the orchestrator.
OrchestratorError
Bases: Exception
Base exception for the application.
SettingsError
Bases: OrchestratorError
Raised for configuration-related errors.
OrchestratorRetryError
Bases: OrchestratorError
Raised when an agent operation fails after all retries.
RewardModelUnavailable
Bases: OrchestratorError
Raised when the reward model is required but unavailable.
FeatureDisabled
Bases: OrchestratorError
Raised when a disabled feature is invoked.
ConfigurationError
Bases: SettingsError
Raised when a required configuration for a provider is missing.
PricingNotConfiguredError
Bases: ConfigurationError
Raised when strict pricing mode is enabled but pricing is not configured for a model.
InfiniteRedirectError
Bases: OrchestratorError
Raised when a redirect loop is detected in pipeline execution.
InfiniteFallbackError
Bases: OrchestratorError
Raised when a fallback loop is detected during execution.
PipelineContextInitializationError
Bases: OrchestratorError
Raised when a typed pipeline context fails to initialize.
ContextInheritanceError
UsageLimitExceededError
PipelineAbortSignal
Bases: Exception
Special exception hooks can raise to stop a pipeline gracefully.
PausedException
ImproperStepInvocationError
Bases: OrchestratorError
Raised when a Step
object is invoked directly.
MissingAgentError
Bases: ConfigurationError
Raised when a pipeline step is missing its agent.
TypeMismatchError
Bases: ConfigurationError
Raised when consecutive steps have incompatible types.
AgentIOValidationError
Bases: OrchestratorError
Raised when an agent's input or output validation fails.
FlujoFrameworkError
Bases: Exception
Base exception for Flujo framework with enhanced error messages.
ContextFieldError
StepInvocationError
Bases: FlujoFrameworkError
Unified exception for step invocation errors.
This exception replaces ImproperStepInvocationError for consistency and provides enhanced error messages for better debugging.
Note: ImproperStepInvocationError is deprecated and will be removed in a future version. Use StepInvocationError for new code.
ParallelStepError
Infrastructure
flujo.infra
Infrastructure components for flujo.
LocalBackend
init_telemetry
init_telemetry(
settings_obj: Optional[Settings] = None,
) -> None
Configure global logging and tracing for the process.
Call once at application startup. If settings_obj
is not provided the
default :class:~flujo.infra.settings.Settings
object is used. When telemetry
is enabled the real logfire
library is configured, otherwise a fallback
logger that proxies to logging
is provided.
make_review_agent
make_review_agent(
model: str | None = None,
) -> AsyncAgentWrapper[Any, Checklist]
Create a review agent with default settings.
make_solution_agent
make_solution_agent(
model: str | None = None,
) -> AsyncAgentWrapper[Any, str]
Create a solution agent with default settings.
make_validator_agent
make_validator_agent(
model: str | None = None,
) -> AsyncAgentWrapper[Any, Checklist]
Create a validator agent with default settings.
get_reflection_agent
get_reflection_agent(
model: str | None = None,
) -> AsyncAgentProtocol[Any, Any] | NoOpReflectionAgent
Returns a new instance of the reflection agent, or a no-op if disabled.
make_agent_async
make_agent_async(
model: str,
system_prompt: str,
output_type: Type[Any],
max_retries: int = 3,
timeout: int | None = None,
processors: Optional[AgentProcessors] = None,
auto_repair: bool = True,
**kwargs: Any,
) -> AsyncAgentWrapper[Any, Any]
Creates a pydantic_ai.Agent and returns an AsyncAgentWrapper exposing .run_async.
Parameters
model : str The model identifier (e.g., "openai:gpt-4o") system_prompt : str The system prompt for the agent output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent (e.g., temperature, model_settings, max_tokens, etc.)
load_settings
load_settings(force_reload: bool = False) -> Any
Load settings with configuration file overrides. If force_reload is True, reload config/settings.
get_cli_defaults
get_cli_defaults(
command: str, force_reload: bool = False
) -> Dict[str, Any]
Get CLI defaults for a specific command. If force_reload is True, reload config/settings.
get_state_uri
get_state_uri(force_reload: bool = False) -> Optional[str]
Get the state URI from configuration. If force_reload is True, reload config/settings.
flujo.state
WorkflowState
Bases: BaseModel
Serialized snapshot of a running workflow.
StateBackend
Bases: ABC
Abstract base class for state backends.
State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.
save_state
abstractmethod
async
save_state(run_id: str, state: Dict[str, Any]) -> None
Save workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
Dict[str, Any]
|
Dictionary containing workflow state data |
required |
load_state
abstractmethod
async
load_state(run_id: str) -> Optional[Dict[str, Any]]
Load workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
Returns:
Type | Description |
---|---|
Optional[Dict[str, Any]]
|
Dictionary containing workflow state data, or None if not found |
delete_state
abstractmethod
async
delete_state(run_id: str) -> None
Delete workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
list_workflows
async
list_workflows(
status: Optional[str] = None,
pipeline_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
List workflows with optional filtering and pagination.
get_workflow_stats
async
get_workflow_stats() -> Dict[str, Any]
Get statistics about stored workflows.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: int = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
get_failed_workflows
async
get_failed_workflows(
hours_back: int = 24,
) -> List[Dict[str, Any]]
Get failed workflows from the last N hours with error details.
get_trace
abstractmethod
async
get_trace(run_id: str) -> Any
Retrieve and deserialize the trace tree for a given run_id.
save_trace
abstractmethod
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
trace
|
Dict[str, Any]
|
Dictionary containing trace tree data |
required |
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
save_run_start
async
save_run_start(run_data: Dict[str, Any]) -> None
Persist initial run metadata.
save_step_result
async
save_step_result(step_data: Dict[str, Any]) -> None
Persist a single step execution record.
save_run_end
async
save_run_end(run_id: str, end_data: Dict[str, Any]) -> None
Update run metadata when execution finishes.
get_run_details
async
get_run_details(run_id: str) -> Optional[Dict[str, Any]]
Retrieve stored metadata for a run.
list_run_steps
async
list_run_steps(run_id: str) -> List[Dict[str, Any]]
Return all step records for a run ordered by step index.
InMemoryBackend
Bases: StateBackend
Simple in-memory backend for testing and defaults.
This backend mirrors the serialization logic of the persistent backends by
storing a serialized copy of the workflow state. Values are serialized with
safe_serialize
on save and reconstructed with safe_deserialize
when
loaded.
get_trace
async
get_trace(run_id: str) -> Any
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
FileBackend
Bases: StateBackend
Persist workflow state to JSON files.
get_trace
async
get_trace(run_id: str) -> Optional[Dict[str, Any]]
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
SQLiteBackend
Bases: StateBackend
SQLite-backed persistent storage for workflow state with optimized schema.
close
async
close() -> None
Close database connections and cleanup resources.
__aenter__
async
__aenter__() -> 'SQLiteBackend'
Async context manager entry.
__aexit__
async
__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None
Async context manager exit with cleanup.
save_state
async
save_state(run_id: str, state: Dict[str, Any]) -> None
Save workflow state to the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
Dict[str, Any]
|
Dictionary containing workflow state data |
required |
delete_state
async
delete_state(run_id: str) -> None
Delete workflow state.
list_states
async
list_states(
status: Optional[str] = None,
) -> List[Dict[str, Any]]
List workflow states with optional status filter.
list_workflows
async
list_workflows(
status: Optional[str] = None,
pipeline_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
Enhanced workflow listing with additional filters and metadata.
list_runs
async
list_runs(
status: Optional[str] = None,
pipeline_name: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
List runs from the new structured schema for lens CLI.
get_workflow_stats
async
get_workflow_stats() -> Dict[str, Any]
Get comprehensive workflow statistics.
get_failed_workflows
async
get_failed_workflows(
hours_back: int = 24,
) -> List[Dict[str, Any]]
Get failed workflows from the last N hours.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: float = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Persist a trace tree as normalized spans for a given run_id.
get_trace
async
get_trace(run_id: str) -> Optional[Dict[str, Any]]
Retrieve and reconstruct the trace tree for a given run_id. Audit log access.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering. Audit log export.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
delete_run
async
delete_run(run_id: str) -> None
Delete a run from the runs table (cascades to traces). Audit log deletion.
Agents and Processing
flujo.agents
Agent utilities including validation and monitoring decorators.
validated_agent
validated_agent(
input_model: Type[BaseModel],
output_model: Type[BaseModel],
) -> Callable[[Type[_Any]], Type[_Any]]
Class decorator that validates the run
method's input and output.
monitored_agent
monitored_agent(
agent_name: str,
) -> Callable[[Type[_Any]], Type[_Any]]
Class decorator that records telemetry for the agent's run
method.
flujo.processors
Processor
Bases: Protocol
Generic processor interface.
process
async
process(
data: Any, context: Optional[BaseModel] = None
) -> Any
Process data with optional pipeline context.
AddContextVariables
Prepend context variables to a prompt.
StripMarkdownFences
Extract content from a fenced code block.
EnforceJsonResponse
Ensure the output is valid JSON.
SerializePydantic
Serialize any object with a model_dump
method to a plain dict
.
DeterministicRepairProcessor
Tier-1 deterministic fixer for malformed JSON emitted by LLMs.
flujo.steps
Utilities and Tools
flujo.utils
Flujo utilities.
format_prompt
format_prompt(template: str, **kwargs: Any) -> str
Convenience wrapper around :class:AdvancedPromptFormatter
.
Parameters
template: Template string to render. **kwargs: Values referenced inside the template.
Returns
str The rendered template.
summarize_and_redact_prompt
summarize_and_redact_prompt(
prompt_text: str,
max_length: int = 200,
settings: Optional[Settings] = None,
) -> str
Return a truncated and redacted version of a prompt.
create_field_serializer
create_field_serializer(
field_name: str, serializer_func: Callable[[Any], Any]
) -> Callable[[Any], Any]
Create a field_serializer method for a specific field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field_name
|
str
|
Name of the field to serialize |
required |
serializer_func
|
Callable[[Any], Any]
|
Function that serializes the field value |
required |
Returns:
Type | Description |
---|---|
Callable[[Any], Any]
|
A serializer function that can be used within field_serializer methods |
create_serializer_for_type
create_serializer_for_type(
obj_type: Type[Any],
serializer_func: Callable[[Any], Any],
) -> Callable[[Any], Any]
Create a serializer function that handles a specific type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj_type
|
Type[Any]
|
The type to create a serializer for |
required |
serializer_func
|
Callable[[Any], Any]
|
Function that serializes the type |
required |
Returns:
Type | Description |
---|---|
Callable[[Any], Any]
|
A serializer function that handles the specific type |
lookup_custom_serializer
lookup_custom_serializer(
value: Any,
) -> Optional[Callable[[Any], Any]]
Look up a registered serializer for a value's type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value
|
Any
|
The value to find a serializer for |
required |
Returns:
Type | Description |
---|---|
Optional[Callable[[Any], Any]]
|
The registered serializer function, or None if not found |
Example
serializer = lookup_custom_serializer(some_value) if serializer: ... result = serializer(some_value)
lookup_custom_deserializer
lookup_custom_deserializer(
obj_type: Type[Any],
) -> Optional[Callable[[Any], Any]]
Look up a registered deserializer for a type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj_type
|
Type[Any]
|
The type to find a deserializer for |
required |
Returns:
Type | Description |
---|---|
Optional[Callable[[Any], Any]]
|
The registered deserializer function, or None if not found |
Example
deserializer = lookup_custom_deserializer(MyCustomType) if deserializer: ... result = deserializer(serialized_data)
register_custom_serializer
register_custom_serializer(
obj_type: Type[Any],
serializer_func: Callable[[Any], Any],
) -> None
Register a custom serializer for a specific type globally.
This function registers a serializer that will be used by safe_serialize
and
other serialization functions when encountering objects of the specified type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj_type
|
Type[Any]
|
The type to register a serializer for |
required |
serializer_func
|
Callable[[Any], Any]
|
Function that converts the type to a serializable format |
required |
Example
from datetime import datetime def serialize_datetime(dt: datetime) -> str: ... return dt.strftime("%Y-%m-%d %H:%M:%S") register_custom_serializer(datetime, serialize_datetime)
register_custom_deserializer
register_custom_deserializer(
obj_type: Type[Any],
deserializer_func: Callable[[Any], Any],
) -> None
Register a custom deserializer for a specific type globally.
This function registers a deserializer that will be used by reconstruction functions when encountering serialized data that should be converted back to the original type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj_type
|
Type[Any]
|
The type to register a deserializer for |
required |
deserializer_func
|
Callable[[Any], Any]
|
Function that converts serialized data back to the original type |
required |
Example
from datetime import datetime def deserialize_datetime(data: str) -> datetime: ... return datetime.fromisoformat(data) register_custom_deserializer(datetime, deserialize_datetime)
safe_deserialize
safe_deserialize(
serialized_data: Any,
target_type: Optional[Type[Any]] = None,
default_deserializer: Optional[
Callable[[Any], Any]
] = None,
) -> Any
Safely deserialize an object with intelligent fallback handling.
This function provides robust deserialization for: - Pydantic models (v1 and v2) - Dataclasses - Lists, tuples, sets, frozensets, dicts - Enums - Special float values (inf, -inf, nan) - Primitives (str, int, bool, None) - Datetime objects (datetime, date, time) - Bytes and memoryview objects - Complex numbers - Custom types registered via register_custom_deserializer
Parameters:
Name | Type | Description | Default |
---|---|---|---|
serialized_data
|
Any
|
The serialized data to deserialize |
required |
target_type
|
Optional[Type[Any]]
|
Optional type hint for the expected result type |
None
|
default_deserializer
|
Optional[Callable[[Any], Any]]
|
Optional custom deserializer for unknown types |
None
|
Returns:
Type | Description |
---|---|
Any
|
The deserialized object |
Raises:
Type | Description |
---|---|
TypeError
|
If object cannot be deserialized and no default_deserializer is provided |
Note
- Special float values (inf, -inf, nan) are converted from strings
- Datetime objects are converted from ISO format strings
- Bytes are converted from base64 strings
- Complex numbers are converted from dict with 'real' and 'imag' keys
- Custom types registered via register_custom_deserializer are automatically handled
safe_serialize
safe_serialize(
obj: Any,
default_serializer: Optional[
Callable[[Any], Any]
] = None,
_seen: Optional[Set[int]] = None,
_recursion_depth: int = 0,
circular_ref_placeholder: Any = "<circular-ref>",
) -> Any
Safely serialize an object with intelligent fallback handling.
Handles circular references robustly by only clearing the _seen set at the top-level call.
The circular_ref_placeholder controls what is returned for circular references (default '
serialize_to_json
serialize_to_json(obj: Any, **kwargs: Any) -> str
Serialize an object to a JSON string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj
|
Any
|
The object to serialize |
required |
**kwargs
|
Any
|
Additional arguments to pass to json.dumps |
{}
|
Returns:
Type | Description |
---|---|
str
|
JSON string representation of the object |
Raises:
Type | Description |
---|---|
TypeError
|
If the object cannot be serialized to JSON |
serialize_to_json_robust
serialize_to_json_robust(obj: Any, **kwargs: Any) -> str
Serialize an object to a JSON string using robust_serialize. Ensures the output is always valid JSON for roundtrip.
flujo.tracing
flujo.telemetry
Telemetry integrations for production monitoring.
OpenTelemetryHook
Fallback when OpenTelemetry is not available.
CLI and Testing
flujo.cli
CLI package.
flujo.testing
Testing utilities for flujo.
StubAgent
Simple agent for testing that returns preset outputs.
DummyPlugin
A validation plugin used for testing.
override_agent
override_agent(step: Any, new_agent: Any) -> Iterator[None]
Temporarily override the agent of a Step within a context.
assert_validator_failed
assert_validator_failed(
result: PipelineResult[Any],
validator_name: str,
expected_feedback_part: Optional[str] = None,
) -> None
Assert that a specific validator failed during the run.
assert_context_updated
assert_context_updated(
result: PipelineResult[Any], **expected_updates: Any
) -> None
Assert that the final context contains the expected updates.
Recipes and Plugins
flujo.recipes
Recipe modules for common workflow patterns.
This module provides factory functions for common workflow patterns.
RECOMMENDED: Use the factory functions for better transparency, composability, and future YAML/AI support: - make_default_pipeline() - Creates a Review → Solution → Validate pipeline - make_agentic_loop_pipeline() - Creates an explorative agent workflow - run_default_pipeline() - Executes a default pipeline - run_agentic_loop_pipeline() - Executes an agentic loop pipeline
Default
Default recipe for Review → Solution → Validate workflows.
DEPRECATED: This class-based approach is deprecated. Use the new factory functions for better transparency, composability, and future YAML/AI support:
- Use
make_default_pipeline()
to create a Pipeline object - Use
run_default_pipeline()
to execute the pipeline
See flujo.recipes.factories
for the new approach.
make_default_pipeline
make_default_pipeline(
review_agent: "AsyncAgentProtocol[Any, Any]",
solution_agent: "AsyncAgentProtocol[Any, Any]",
validator_agent: "AsyncAgentProtocol[Any, Any]",
reflection_agent: Optional[
"AsyncAgentProtocol[Any, Any]"
] = None,
max_retries: int = 3,
k_variants: int = 1,
max_iters: int = 3,
reflection_limit: Optional[int] = None,
) -> Pipeline[str, Checklist]
Create a default Review → Solution → Validate pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
review_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that creates a checklist of requirements |
required |
solution_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that generates a solution |
required |
validator_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that validates the solution against requirements |
required |
reflection_agent
|
Optional['AsyncAgentProtocol[Any, Any]']
|
Optional agent for reflection/improvement |
None
|
max_retries
|
int
|
Maximum retries for each step |
3
|
k_variants
|
int
|
Number of solution variants to generate per iteration |
1
|
max_iters
|
int
|
Maximum number of iterations for improvement |
3
|
reflection_limit
|
Optional[int]
|
Optional limit on reflection iterations |
None
|
Returns:
Type | Description |
---|---|
Pipeline[str, Checklist]
|
A Pipeline object that can be inspected, composed, and executed |
make_state_machine_pipeline
make_state_machine_pipeline(
*,
nodes: Dict[str, Step[Any, Any] | Pipeline[Any, Any]],
context_model: type[PipelineContext],
router_field: str = "next_state",
end_state_field: str = "is_complete",
max_loops: int = 50,
) -> Pipeline[Any, Any]
Create a simple state machine pipeline.
Each iteration runs the state pipeline specified in context.<router_field>
.
The loop exits when context.<end_state_field>
evaluates to True
.
make_agentic_loop_pipeline
make_agentic_loop_pipeline(
planner_agent: "AsyncAgentProtocol[Any, Any]",
agent_registry: Dict[
str, "AsyncAgentProtocol[Any, Any]"
],
max_loops: int = 10,
max_retries: int = 3,
) -> Pipeline[str, Any]
Create an agentic loop pipeline for explorative workflows.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
planner_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that decides what command to run next |
required |
agent_registry
|
Dict[str, 'AsyncAgentProtocol[Any, Any]']
|
Dictionary of available agents to execute |
required |
max_loops
|
int
|
Maximum number of loop iterations |
10
|
max_retries
|
int
|
Maximum retries for each step |
3
|
Returns:
Type | Description |
---|---|
Pipeline[str, Any]
|
A Pipeline object with a LoopStep containing the agentic logic |
run_default_pipeline
async
run_default_pipeline(
pipeline: Pipeline[str, Checklist], task: Task
) -> Optional[Candidate]
Run a default pipeline and return the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline
|
Pipeline[str, Checklist]
|
Pipeline created by make_default_pipeline |
required |
task
|
Task
|
Task to execute |
required |
Returns:
Type | Description |
---|---|
Optional[Candidate]
|
Candidate with solution and checklist, or None if failed |
run_agentic_loop_pipeline
async
run_agentic_loop_pipeline(
pipeline: Pipeline[str, Any],
initial_goal: str,
resume_from: Optional[
PipelineResult[PipelineContext]
] = None,
human_input: Optional[str] = "human",
) -> PipelineResult[PipelineContext]
Run an agentic loop pipeline and return the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline
|
Pipeline[str, Any]
|
Pipeline created by make_agentic_loop_pipeline |
required |
initial_goal
|
str
|
Initial goal for the agentic loop |
required |
resume_from
|
Optional[PipelineResult[PipelineContext]]
|
Optional paused result to resume from |
None
|
human_input
|
Optional[str]
|
Human input to provide when resuming (default: "human") |
'human'
|
Returns:
Type | Description |
---|---|
PipelineResult[PipelineContext]
|
PipelineResult with the execution context and results |
flujo.plugins
Built-in plugins for flujo.
SQLSyntaxValidator
Validation plugin that checks SQL syntax.
Models and Prompts
flujo.models
Domain models for flujo.
Task
Bases: BaseModel
Represents a task to be solved by the orchestrator.
Candidate
Bases: BaseModel
Represents a potential solution and its evaluation metadata.
Checklist
Bases: BaseModel
A checklist for evaluating a solution.
ChecklistItem
Bases: BaseModel
A single item in a checklist for evaluating a solution.
PipelineResult
Bases: BaseModel
, Generic[ContextT]
Aggregated result of running a pipeline.
StepResult
Bases: BaseModel
Result of executing a single pipeline step.
UsageLimits
Bases: BaseModel
Defines resource consumption limits for a pipeline run.
RefinementCheck
Bases: BaseModel
Standardized output from a critic pipeline in a refinement loop.
ImprovementSuggestion
Bases: BaseModel
A single suggestion from the SelfImprovementAgent.
ImprovementReport
Bases: BaseModel
Aggregated improvement suggestions returned by the agent.
HumanInteraction
Bases: BaseModel
Records a single human interaction in a HITL conversation.
PipelineContext
Bases: BaseModel
Runtime context shared by all steps in a pipeline run.
The base PipelineContext
tracks essential execution metadata and is
automatically created for every call to :meth:Flujo.run
. Custom context
models should inherit from this class to add application specific fields
while retaining the built in ones.
Attributes
run_id:
Unique identifier for the pipeline run.
initial_prompt:
First input provided to the run. Useful for logging and telemetry.
scratchpad:
Free form dictionary for transient state between steps.
hitl_history:
Records each human interaction when using HITL steps.
command_log:
Stores commands executed by an :class:~flujo.recipes.AgenticLoop
.
flujo.prompts
System prompts for Flujo agents.
This module contains all the default system prompts used by Flujo agents. Users can import and override these prompts for their own use cases.
Core Utilities
flujo.caching
InMemoryCache
Bases: CacheBackend
Simple in-memory cache for step results.
flujo.console_tracer
ConsoleTracer
Configurable tracer that prints rich output to the console.
__init__
__init__(
*,
level: Literal["info", "debug"] = "debug",
log_inputs: bool = True,
log_outputs: bool = True,
colorized: bool = True,
) -> None
Create the tracer.
Parameters
level:
Output verbosity; either "info"
or "debug"
.
log_inputs:
Whether to print step inputs when level
is "debug"
.
log_outputs:
Whether to print step outputs when level
is "debug"
.
colorized:
If True
use colored output via Rich.
hook
async
hook(payload: HookPayload) -> None
Dispatch hook payloads to the appropriate handler.
flujo.monitor
FlujoMonitor
Simple in-memory monitor for agent calls.
flujo.registry
PipelineRegistry
Simple in-memory registry for pipeline objects.
register
register(
pipeline: Pipeline[Any, Any], name: str, version: str
) -> None
Register pipeline
under name
and version
.
get
get(
name: str, version: str
) -> Optional[Pipeline[Any, Any]]
Return the pipeline registered for name
and version
if present.
get_latest_version
get_latest_version(name: str) -> Optional[str]
Return the latest registered version for name
.
get_latest
get_latest(name: str) -> Optional[Pipeline[Any, Any]]
Return the latest registered pipeline for name
if any.
flujo.signature_tools
analyze_signature
analyze_signature(
func: Callable[..., Any],
) -> SignatureAnalysis
Inspect func
and determine its pipeline injection requirements.
Parameters
func: Callable to inspect. It may be a standard function or a callable object.
Returns
SignatureAnalysis
Named tuple describing whether context
or resources
keyword
parameters are required and the inferred input/output types.
Raises
ConfigurationError
If context
or resources
parameters are annotated with invalid
types.
flujo.validation
BaseValidator
validator
validator(
func: Callable[[Any], Tuple[bool, Optional[str]]],
) -> Validator
Decorator to create a stateless Validator from a function.