API Reference
Welcome to the Flujo API reference. This documentation is automatically generated from the source code using mkdocstrings.
Core Modules
flujo.application
Application-level components for flujo.
Flujo
Bases: Generic[RunnerInT, RunnerOutT, ContextT]
Execute a pipeline sequentially.
Parameters
pipeline : Pipeline | Step | None, optional
Pipeline object to run directly. Deprecated when using registry.
registry : PipelineRegistry, optional
Registry holding named pipelines.
pipeline_name : str, optional
Name of the pipeline registered in registry.
pipeline_version : str, default "latest"
Version to load from the registry when the run starts.
state_backend : StateBackend, optional
Backend used to persist :class:WorkflowState for durable execution.
delete_on_completion : bool, default False
If True remove persisted state once the run finishes.
persist_state : bool, default True
Disable state persistence entirely for ephemeral runs when False.
state_providers : Dict[str, StateProvider], optional
External state providers for :class:ContextReference hydration. Ignored when a
custom executor_factory is supplied; pass providers directly to the factory
instead.
disable_tracing
disable_tracing() -> None
Disable tracing by removing trace hooks and clearing active manager.
aclose
async
aclose() -> None
Asynchronously release runner-owned resources.
close
close() -> None
Synchronously release runner-owned resources (best-effort in async contexts).
run_async
run_async(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> _RunAsyncHandle[ContextT]
Run the pipeline asynchronously.
Returns an object that is both async-iterable (streaming) and awaitable (returns the final PipelineResult), preserving legacy convenience.
run_result_async
async
run_result_async(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> PipelineResult[ContextT]
Run the pipeline asynchronously and return the final PipelineResult.
run_stream
async
run_stream(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[StepOutcome[StepResult]]
Run the pipeline and stream StepOutcome events as they complete.
run_outcomes
async
run_outcomes(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[StepOutcome[StepResult]]
Alias for run_stream() with clearer naming.
replay_from_trace
async
replay_from_trace(run_id: str) -> PipelineResult[ContextT]
Replay a prior run deterministically using recorded trace and responses (FSD-013).
get_failed_background_tasks
async
get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> list[BackgroundTaskInfo]
List failed background tasks for optional parent run.
resume_background_task
async
resume_background_task(task_id: str, new_data: object | None = None) -> PipelineResult[ContextT]
Resume a failed background task synchronously.
retry_failed_background_tasks
async
retry_failed_background_tasks(parent_run_id: str, max_retries: int = 3) -> list[PipelineResult[ContextT]]
Retry failed background tasks for a given parent run.
cleanup_stale_background_tasks
async
cleanup_stale_background_tasks(stale_hours: int = 24) -> int
Mark stale running background tasks as failed.
run_with_events
async
run_with_events(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[object]
Run pipeline yielding lifecycle events (StepOutcome/Chunk) and final PipelineResult.
SelfImprovementAgent
Agent that analyzes failures and suggests improvements.
run_pipeline_async
async
run_pipeline_async(inputs: Any, *, runner: Flujo[Any, Any, Any]) -> PipelineResult[Any]
Adapter to run a :class:Flujo engine as a pydantic-evals task.
evaluate_and_improve
async
evaluate_and_improve(task_function: Callable[[Any], Awaitable[PipelineResult[Any]]], dataset: Any, improvement_agent: SelfImprovementAgent, pipeline_definition: Optional[Pipeline[Any, Any] | Step[Any, Any]] = None) -> ImprovementReport
Run dataset evaluation and return improvement suggestions.
flujo.domain
Domain components for flujo.
Step
Bases: BaseModel, Generic[StepInT, StepOutT]
Declarative node in a pipeline.
A Step holds a reference to the agent that will execute, configuration
such as retries and timeout, and optional plugins. It does not execute
anything by itself. Steps are composed into :class:Pipeline objects and
run by the :class:~flujo.application.runner.Flujo engine.
Use :meth:arun to invoke the underlying agent directly during unit tests.
input
property
input: object | None
Return the declarative input value (legacy alias).
__call__
__call__(*args: Any, **kwargs: Any) -> NoReturn
Disallow direct invocation of a Step.
__getattr__
__getattr__(item: str) -> Any
Raise a helpful error when trying to access non-existent attributes.
arun
arun(data: StepInT, **kwargs: Any) -> Coroutine[object, object, StepOutT]
Return the agent coroutine to run this step directly in tests.
fallback
fallback(fallback_step: 'Step[Any, Any]') -> 'Step[StepInT, StepOutT]'
Set a fallback step to execute if this step fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallback_step
|
'Step[Any, Any]'
|
The step to execute if this step fails |
required |
Returns:
| Type | Description |
|---|---|
'Step[StepInT, StepOutT]'
|
self for method chaining |
add_plugin
add_plugin(plugin: 'ValidationPlugin') -> 'Step[StepInT, StepOutT]'
Add a validation plugin to this step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
plugin
|
'ValidationPlugin'
|
The validation plugin to add |
required |
Returns:
| Type | Description |
|---|---|
'Step[StepInT, StepOutT]'
|
self for method chaining |
review
classmethod
review(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[Any, Any]'
Construct a review step using the provided agent.
Also infers input/output types from the agent's signature for pipeline validation.
solution
classmethod
solution(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[Any, Any]'
Construct a solution step using the provided agent.
Also infers input/output types from the agent's signature for pipeline validation.
validate_step
classmethod
validate_step(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, strict: bool = True, **config: Any) -> 'Step[Any, Any]'
Construct a validation step using the provided agent.
Also infers input/output types from the agent's signature for pipeline validation.
from_callable
classmethod
from_callable(callable_: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], name: str | None = None, updates_context: bool = False, validate_fields: bool = False, sink_to: str | None = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, is_adapter: bool = False, adapter_id: str | None = None, adapter_allow: str | None = None, config: StepConfig | None = None, **config_kwargs: Any) -> 'Step[StepInT, StepOutT]'
Create a Step from an async callable.
from_mapper
classmethod
from_mapper(mapper: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], name: str | None = None, updates_context: bool = False, sink_to: str | None = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[StepInT, StepOutT]'
Alias for :meth:from_callable to improve readability.
human_in_the_loop
classmethod
human_in_the_loop(name: str, message_for_user: str | None = None, input_schema: Type[BaseModel] | None = None) -> 'HumanInTheLoopStep'
Construct a HumanInTheLoop step.
refine_until
classmethod
refine_until(name: str, generator_pipeline: 'Pipeline[Any, Any]', critic_pipeline: 'Pipeline[Any, RefinementCheck]', max_refinements: int = 5, feedback_mapper: Optional[Callable[[Any, RefinementCheck], Any]] = None, **config_kwargs: Any) -> 'Pipeline[Any, Any]'
Build a refinement loop pipeline: generator >> capture >> critic >> post-mapper.
granular
classmethod
granular(name: str, agent: Any, input_: Any = None, *, max_turns: int = 20, history_max_tokens: int = 128000, blob_threshold_bytes: int = 20000, enforce_idempotency: bool = False, resume_fingerprint_mode: Literal['strict', 'compat'] | None = None, **config_kwargs: Any) -> 'Pipeline[Any, Any]'
Build a granular execution pipeline for crash-safe, resumable agent runs.
Wraps a GranularStep in a LoopStep with abort-on-failure semantics. Each turn is persisted atomically with CAS guards to prevent double-execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Step name for identification |
required |
agent
|
Any
|
The agent to execute |
required |
input_
|
Any
|
Optional input data or template |
None
|
max_turns
|
int
|
Maximum number of turns before forced completion (default 20) |
20
|
history_max_tokens
|
int
|
Token budget for message history (default 128K) |
128000
|
blob_threshold_bytes
|
int
|
Payload size triggering blob offload (default 20KB) |
20000
|
enforce_idempotency
|
bool
|
Require idempotency keys on tool calls (default False) |
False
|
resume_fingerprint_mode
|
Literal['strict', 'compat'] | None
|
Optional per-step fingerprint mode override |
None
|
**config_kwargs
|
Any
|
Additional step configuration |
{}
|
Returns:
| Type | Description |
|---|---|
'Pipeline[Any, Any]'
|
Pipeline containing LoopStep(GranularStep) with on_failure="abort" |
use_input
use_input(key: str) -> 'Pipeline[dict[str, Any], StepOutT]'
Create a small adapter pipeline that selects a key from a dict input.
This is a common pattern when working with :meth:parallel branches
where each branch only needs a portion of the upstream output.
gather
classmethod
gather(name: str, *, wait_for: List[str], **config_kwargs: Any) -> 'Step[Any, JSONObject]'
Collect outputs from multiple parallel branches.
The step expects a dictionary input (e.g. from :meth:parallel) and
returns a dictionary containing only the specified keys.
Pipeline
Bases: BaseModel, Generic[PipeInT, PipeOutT]
Ordered collection of :class:Step objects.
Pipeline instances are immutable containers that define the execution
graph. They can be composed with the >> operator and validated before
running. Execution is handled by the :class:~flujo.application.runner.Flujo
class.
input_type
property
input_type: Any
Head input type for the pipeline (derived from the first step).
output_type
property
output_type: Any
Tail output type for the pipeline (derived from the last step).
model_validate
classmethod
model_validate(obj: Any, *args: Any, **kwargs: Any) -> 'Pipeline[Any, Any]'
Preserve concrete Step subclasses (e.g., ParallelStep) when instances are provided.
When callers pass already-constructed Step/ParallelStep objects, bypass Pydantic re-validation/coercion to avoid losing subclass-specific fields (like branches).
model_post_init
model_post_init(__context: Any) -> None
Ensure head/tail types are initialized after construction.
from_yaml
classmethod
from_yaml(yaml_source: str, *, is_path: bool = True) -> 'Pipeline[object, object]'
Load a Pipeline from YAML. When is_path=True, yaml_source is treated as a file path.
validate_graph
validate_graph(*, raise_on_error: bool = False, include_imports: bool = False, _visited_pipelines: Optional[set[int]] = None, _visited_paths: Optional[set[str]] = None, _report_cache: Optional[dict[str, 'ValidationReport']] = None) -> ValidationReport
Validate that all steps have agents, compatible types, and static lints.
Adds advanced static checks: - V-P1: Parallel context merge conflict detection for default CONTEXT_UPDATE without field_mapping - V-A5: Unbound output warning when a step's output is unused and it does not update context - V-F1: Incompatible fallback signature between step and fallback_step
as_step
as_step(name: str, *, inherit_context: bool = True, **kwargs: Any) -> 'Step[PipeInT, PipelineResult[Any]]'
Wrap this pipeline as a composable Step, delegating to Flujo runner's as_step.
StepConfig
Bases: BaseModel
Configuration options applied to every step.
Parameters
max_retries: How many times the step should be retried on failure. timeout_s: Optional timeout in seconds for the agent execution. temperature: Optional temperature setting for LLM based agents. top_k: Optional top-k sampling parameter for LLM based agents. top_p: Optional nucleus sampling parameter for LLM based agents. preserve_fallback_diagnostics: Whether to preserve diagnostic feedback from fallback executions. When True, successful fallbacks retain feedback for monitoring/debugging. When False, successful fallbacks clear feedback for backward compatibility.
MapStep
Bases: LoopStep[TContext]
Map a pipeline over an iterable stored on the context.
MapStep wraps LoopStep to iterate over context.<iterable_input>
and run pipeline_to_run for each item. The collected outputs are returned
as a list.
get_loop_body_pipeline
get_loop_body_pipeline() -> Pipeline[Any, Any]
Return the configured pipeline to run per item (the original).
get_max_loops
get_max_loops() -> int
Get the maximum number of loops based on iterable size.
get_exit_condition_callable
get_exit_condition_callable() -> Callable[[object, Optional[TContext]], bool]
Get the exit condition callable for mapping.
get_initial_input_to_loop_body_mapper
get_initial_input_to_loop_body_mapper() -> Callable[[object, Optional[TContext]], object]
Get the initial input mapper for mapping.
get_iteration_input_mapper
get_iteration_input_mapper() -> Callable[[object, Optional[TContext], int], object]
Get the iteration input mapper for mapping.
get_loop_output_mapper
get_loop_output_mapper() -> Callable[[object, Optional[TContext]], List[object]]
Get the loop output mapper for mapping.
model_post_init
model_post_init(__context: object) -> None
Ensure transient runtime state is cleared and defaults initialized.
from_pipeline
classmethod
from_pipeline(*, name: str, pipeline: Pipeline[Any, Any], iterable_input: str, **kwargs: object) -> 'MapStep[TContext]'
Create a MapStep from a pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the step |
required |
pipeline
|
Pipeline[Any, Any]
|
Pipeline to execute for each item |
required |
iterable_input
|
str
|
Name of the context field containing the iterable |
required |
**kwargs
|
object
|
Additional configuration options |
{}
|
Returns:
| Type | Description |
|---|---|
'MapStep[TContext]'
|
Configured MapStep instance |
ParallelStep
Bases: Step[object, object], Generic[TContext]
Execute multiple branch pipelines concurrently.
Each entry in branches is run in parallel and the outputs are returned
as a dictionary keyed by branch name. Context fields can be selectively
copied to branches via context_include_keys and merged back using
merge_strategy.
TreeSearchStep
Bases: Step[object, object], Generic[TContext]
Run a quota-aware tree search with proposer/evaluator agents.
MergeStrategy
Bases: Enum
Strategies for merging branch contexts back into the main context.
The CONTEXT_UPDATE strategy is recommended for most use cases as it provides proper validation and handles complex context structures safely. Use NO_MERGE for performance-critical scenarios where context updates are not needed.
BranchFailureStrategy
Bases: Enum
Policies for handling branch failures in ParallelStep.
BaseContext
Task
Bases: BaseModel
Represents a task to be solved by the orchestrator.
Candidate
Bases: BaseModel
Represents a potential solution and its evaluation metadata.
Checklist
Bases: BaseModel
A checklist for evaluating a solution.
ChecklistItem
Bases: BaseModel
A single item in a checklist for evaluating a solution.
SearchNode
Bases: BaseModel
A single node in a tree search frontier.
attach_context
attach_context(context: Optional['PipelineContext']) -> None
Attach runtime context and capture a snapshot for persistence.
rehydrate_context
rehydrate_context(context_type: type['PipelineContext']) -> Optional['PipelineContext']
Rebuild runtime context from the stored snapshot if needed.
SearchState
Bases: BaseModel
Persistent tree search state stored in the pipeline context.
sorted_open_nodes
sorted_open_nodes() -> list[SearchNode]
Return open nodes sorted by A* priority.
pop_best_open
pop_best_open() -> SearchNode | None
Pop the best node from the open set.
PipelineResult
Bases: BaseModel, Generic[ContextT]
Aggregated result of running a pipeline.
For backward compatibility, this object exposes a top-level success flag
that reflects overall pipeline status (computed by callers/runners). Some
older tests and integrations expect result.success to exist.
status
property
status: str
Best-effort status indicator for backward compatibility.
output
property
output: Any | None
Return the output of the last step in the pipeline.
This is a convenience property for backward compatibility with tests and code that expects result.output.
StepResult
Bases: BaseModel
Result of executing a single pipeline step.
metadata
property
metadata: dict[str, Any]
Alias for metadata_ for backward compatibility and test expectations.
UsageLimits
Bases: BaseModel
Defines resource consumption limits for a pipeline run.
ExecutedCommandLog
Bases: BaseModel
Structured log entry for a command executed in the loop.
FlujoAgentResult
Flujo-controlled agent result that abstracts vendor specifics.
This class provides a vendor-agnostic representation of agent execution results. It encapsulates the output, usage metrics, and cost information in a way that Flujo's orchestration layer can consume without knowing about the underlying agent backend implementation.
Attributes:
| Name | Type | Description |
|---|---|---|
output |
The actual output from the agent (any type). |
|
cost_usd |
Optional explicit cost in USD (for cases where cost is set directly on the result object rather than in usage). |
|
token_counts |
Optional total token count (for explicit cost reporting when detailed usage breakdown is not available). |
__init__
__init__(output: Any, usage: Optional[FlujoAgentUsage] = None, cost_usd: Optional[float] = None, token_counts: Optional[int] = None) -> None
Initialize FlujoAgentResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
Any
|
The actual output from the agent. |
required |
usage
|
Optional[FlujoAgentUsage]
|
Optional usage metrics conforming to FlujoAgentUsage protocol. |
None
|
cost_usd
|
Optional[float]
|
Optional explicit cost in USD. |
None
|
token_counts
|
Optional[int]
|
Optional total token count. |
None
|
usage
usage() -> Optional[FlujoAgentUsage]
Get usage metrics.
This method provides backward compatibility with code that expects usage to be accessed as a method call (e.g., result.usage()).
Returns:
| Type | Description |
|---|---|
Optional[FlujoAgentUsage]
|
Optional[FlujoAgentUsage]: Usage metrics if available, None otherwise. |
FlujoAgentUsage
Bases: Protocol
Vendor-agnostic usage metrics interface.
This protocol defines the standard interface for usage metrics that Flujo expects from any agent backend. Implementations should adapt their vendor-specific usage objects to match this interface.
input_tokens
instance-attribute
input_tokens: int
Number of input/prompt tokens used.
output_tokens
instance-attribute
output_tokens: int
Number of output/completion tokens used.
cost_usd
instance-attribute
cost_usd: Optional[float]
Cost in USD, if available. None if not provided by the agent backend.
ExecutionBackend
Bases: Protocol
Protocol for executing pipeline steps.
execute_step
async
execute_step(request: StepExecutionRequest) -> StepOutcome[StepResult]
Execute a single step and return a typed outcome.
StepExecutionRequest
dataclass
Serializable request for executing a single step.
Contains the step to execute, input data, context object, resources, and execution configuration.
AgentProcessors
Bases: BaseModel
Collections of prompt and output processors.
PluginOutcome
Bases: BaseModel
Result returned by a validation plugin.
ValidationPlugin
Bases: Protocol
Protocol that all validation plugins must implement.
Validator
Bases: Protocol
A generic, stateful protocol for any component that can validate a step's output.
validate
async
validate(output_to_check: Any, *, context: Optional[BaseModel] = None) -> ValidationResult
Validates the given output.
ValidationResult
Bases: BaseModel
The standard output from any validator, providing a pass/fail signal and scoring.
EvaluationReport
EvaluationScore
Bases: BaseModel
Structured evaluation returned by the shadow judge.
MultiSignalEvaluator
Evaluate solutions with rubric and objective signals.
ValidationFinding
Bases: BaseModel
Represents a single validation finding.
ValidationReport
Bases: BaseModel
Aggregated validation results for a pipeline.
AppResources
Bases: BaseModel
Base class for user-defined resource containers.
adapter_step
adapter_step(func: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> 'Step[StepInT, StepOutT]'
adapter_step(func: None = None, *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> Callable[[Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]]], 'Step[StepInT, StepOutT]']
adapter_step(func: Callable[..., Coroutine[object, object, object]] | None = None, *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> object
Alias for :func:step that marks the created step as an adapter.
typed_context
typed_context(context_cls: Type[CtxT]) -> Type[CtxT]
Declare a typed context class for pipelines.
Usage
class MyContext(BaseContext): counter: int = 0 result: str | None = None
TypedCtx = typed_context(MyContext)
flujo.exceptions
Custom exceptions for the orchestrator.
FlujoError
Bases: Exception
Unified base exception for all Flujo errors.
This is the new base class for all Flujo exceptions. It provides enhanced error messages with suggestions and error codes.
Examples:
>>> raise FlujoError("Something went wrong", suggestion="Try X", code="E001")
>>> raise ConfigurationError("Missing config", suggestion="Set FLUJO_API_KEY")
ConfigurationError
ExecutionError
ControlFlowError
Bases: FlujoError
Non-retryable control flow signals (NEVER catch and swallow).
These exceptions are used for orchestration and should always be re-raised to let the runner handle workflow control.
ContextError
ValidationError
SettingsError
OrchestratorRetryError
RewardModelUnavailable
FeatureDisabled
PricingNotConfiguredError
Bases: ConfigurationError
Raised when strict pricing mode is enabled but pricing is not configured for a model.
InfiniteRedirectError
Bases: ControlFlowError
Raised when a redirect loop is detected in pipeline execution.
This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.
InfiniteFallbackError
Bases: ControlFlowError
Raised when a fallback loop is detected during execution.
This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.
PipelineContextInitializationError
ContextInheritanceError
UsageLimitExceededError
PipelineAbortSignal
Bases: ControlFlowError
Special exception hooks can raise to stop a pipeline gracefully.
This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.
PausedException
Bases: ControlFlowError
Internal exception used to pause a pipeline.
This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.
ImproperStepInvocationError
Bases: ExecutionError
DEPRECATED: Raised when a Step object is invoked directly.
.. deprecated:: 0.4.0
Use :class:StepInvocationError instead.
Will be removed in v1.0.0.
MissingAgentError
TypeMismatchError
AgentIOValidationError
NonRetryableError
HitlPolicyError
ResumeError
ReplayError
PluginError
ContextFieldError
StepInvocationError
Bases: ExecutionError
Unified exception for step invocation errors.
This exception replaces ImproperStepInvocationError for consistency and provides enhanced error messages for better debugging.
Note: ImproperStepInvocationError is deprecated and will be removed in a future version. Use StepInvocationError for new code.
ParallelStepError
TemplateResolutionError
ContextIsolationError
ContextMergeError
ContextMutationError
Infrastructure
flujo.infra
Infrastructure components for flujo.
LocalBackend
Bases: Generic[TContext]
Backend that executes steps in the current process.
InMemoryLRUCache
dataclass
LRU cache implementation with TTL support.
init_telemetry
init_telemetry(settings_obj: Optional[Settings] = None) -> None
Configure global logging and tracing for the process.
Call once at application startup. If settings_obj is not provided the
default :class:~flujo.infra.settings.Settings object is used. When telemetry
is enabled the real logfire library is configured, otherwise a fallback
logger that proxies to logging is provided.
load_settings
load_settings(force_reload: bool = False) -> Any
Load settings with configuration file overrides. If force_reload is True, reload config/settings.
get_cli_defaults
get_cli_defaults(command: str, force_reload: bool = False) -> JSONObject
Get CLI defaults for a specific command. If force_reload is True, reload config/settings.
get_state_uri
get_state_uri(force_reload: bool = False) -> Optional[str]
Get the state URI from configuration. If force_reload is True, reload config/settings.
flujo.state
WorkflowState
Bases: BaseModel
Serialized snapshot of a running workflow.
StateBackend
Bases: ABC
Abstract base class for state backends.
State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.
save_state
abstractmethod
async
save_state(run_id: str, state: JSONObject) -> None
Save workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
JSONObject
|
Dictionary containing workflow state data |
required |
load_state
abstractmethod
async
load_state(run_id: str) -> Optional[JSONObject]
Load workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
Returns:
| Type | Description |
|---|---|
Optional[JSONObject]
|
Dictionary containing workflow state data, or None if not found |
delete_state
abstractmethod
async
delete_state(run_id: str) -> None
Delete workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
list_workflows
async
list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List workflows with optional filtering and pagination.
get_workflow_stats
async
get_workflow_stats() -> JSONObject
Get statistics about stored workflows.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: int = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
get_failed_workflows
async
get_failed_workflows(hours_back: int = 24) -> List[JSONObject]
Get failed workflows from the last N hours with error details.
list_background_tasks
async
list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List background tasks with optional filtering and pagination.
get_failed_background_tasks
async
get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]
Get failed background tasks within a time window.
get_trace
abstractmethod
async
get_trace(run_id: str) -> Any
Retrieve and deserialize the trace tree for a given run_id.
save_trace
abstractmethod
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
trace
|
JSONObject
|
Dictionary containing trace tree data |
required |
save_spans
async
save_spans(run_id: str, spans: list[JSONObject]) -> None
Persist normalized spans for a run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
spans
|
list[JSONObject]
|
List of span dictionaries (span_id, parent_span_id, name, etc.) |
required |
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
cleanup_stale_background_tasks
async
cleanup_stale_background_tasks(stale_hours: int = 24) -> int
Mark stale background tasks as failed (timeout).
persist_evaluation
async
persist_evaluation(run_id: str, score: float, feedback: str | None = None, step_name: str | None = None, metadata: JSONObject | None = None) -> None
Persist shadow evaluation result (default: no-op).
list_evaluations
async
list_evaluations(limit: int = 20, run_id: str | None = None) -> list[JSONObject]
List persisted shadow evaluation results (default: not implemented).
save_run_start
async
save_run_start(run_data: JSONObject) -> None
Persist initial run metadata.
save_step_result
async
save_step_result(step_data: JSONObject) -> None
Persist a single step execution record.
save_run_end
async
save_run_end(run_id: str, end_data: JSONObject) -> None
Update run metadata when execution finishes.
get_run_details
async
get_run_details(run_id: str) -> Optional[JSONObject]
Retrieve stored metadata for a run.
list_runs
async
list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]
Return stored runs with optional filtering.
list_run_steps
async
list_run_steps(run_id: str) -> List[JSONObject]
Return all step records for a run ordered by step index.
set_system_state
async
set_system_state(key: str, value: JSONObject) -> None
Upsert a global key/value pair used for connector watermarks.
get_system_state
async
get_system_state(key: str) -> Optional[JSONObject]
Fetch a previously stored global key/value pair.
shutdown
async
shutdown() -> None
Gracefully release any resources held by the backend.
Default is a no-op. Concrete backends should override when they hold threads, file handles, or async connections that need closing.
InMemoryBackend
Bases: StateBackend
Simple in-memory backend for testing and defaults.
This backend mirrors the serialization logic of the persistent backends by
storing a serialized copy of the workflow state. Values are serialized with
_serialize_for_json on save and reconstructed with safe_deserialize when
loaded.
get_trace
async
get_trace(run_id: str) -> Any
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
FileBackend
Bases: StateBackend
Persist workflow state to JSON files.
get_trace
async
get_trace(run_id: str) -> Optional[JSONObject]
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
SQLiteBackend
Bases: SQLiteTraceMixin, SQLiteBackendBase
save_state
async
save_state(run_id: str, state: JSONObject) -> None
Save workflow state to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
JSONObject
|
Dictionary containing workflow state data |
required |
delete_state
async
delete_state(run_id: str) -> None
Delete workflow state.
list_states
async
list_states(status: Optional[str] = None) -> List[JSONObject]
List workflow states with optional status filter.
Uses the pooled connection when available to minimize connection overhead.
list_background_tasks
async
list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List background tasks with optional filtering.
get_failed_background_tasks
async
get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]
Return failed background tasks within a time window.
cleanup_stale_background_tasks
async
cleanup_stale_background_tasks(stale_hours: int = 24) -> int
Mark running background tasks older than the cutoff as failed.
list_workflows
async
list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
Enhanced workflow listing with additional filters and metadata.
list_runs
async
list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]
List runs with optional filtering and include workflow metadata.
get_workflow_stats
async
get_workflow_stats() -> JSONObject
Get comprehensive workflow statistics.
get_workflow_stats_sync
get_workflow_stats_sync() -> JSONObject
Synchronous stats snapshot for observability collectors.
Prometheus collectors are synchronous and can be invoked from async runtimes
(running event loop threads). This method avoids run_sync() by querying
SQLite via the standard library driver.
get_failed_workflows
async
get_failed_workflows(hours_back: int = 24) -> List[JSONObject]
Get failed workflows from the last N hours.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: float = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
delete_run
async
delete_run(run_id: str) -> None
Delete a run from the runs table (cascades to traces). Audit log deletion.
Agents and Processing
flujo.agents
Agent factory utilities and wrapper classes.
This module provides factory functions for creating agents and wrapper classes for async execution. It focuses on agent creation and resilience wrapping, while system prompts are now in the flujo.prompts module.
This is the public API for the flujo agents package.
AsyncAgentWrapper
Bases: Generic[AgentInT, AgentOutT], AsyncAgentProtocol[AgentInT, AgentOutT]
Wraps a pydantic_ai.Agent to provide an asynchronous interface with retry and timeout capabilities.
run_async
async
run_async(*args: Any, **kwargs: Any) -> Any
Run the agent asynchronously with retry and timeout support.
Returns:
| Name | Type | Description |
|---|---|---|
FlujoAgentResult |
Any
|
Vendor-agnostic result containing output and usage metrics. |
Note |
Any
|
Return type is Any for AsyncAgentProtocol compatibility, but this |
Any
|
method always returns FlujoAgentResult at runtime. |
run
async
run(*args: Any, **kwargs: Any) -> Any
Run the agent (alias for run_async).
Returns:
| Name | Type | Description |
|---|---|---|
FlujoAgentResult |
Any
|
Vendor-agnostic result containing output and usage metrics. |
Note |
Any
|
Return type is Any for AsyncAgentProtocol compatibility, but this |
Any
|
method always returns FlujoAgentResult at runtime. |
enable_structured_output
enable_structured_output(*, json_schema: dict[str, Any] | None = None, name: str | None = None) -> None
Enable structured output with JSON schema when supported.
This sets a best-effort response_format hint; pydantic-ai/provider may ignore it when unsupported. Kept non-fatal by design.
TemplatedAsyncAgentWrapper
Bases: AsyncAgentWrapper[AgentInT, AgentOutT]
Async wrapper that supports just-in-time system prompt rendering from a template using runtime context and previous step output.
The wrapper temporarily overrides the underlying agent's system_prompt for a single run and restores it afterwards to keep agent instances stateless.
DeterministicRepairProcessor
Tier-1 deterministic fixer for malformed JSON emitted by LLMs.
LoggingReviewAgent
NoOpReflectionAgent
Bases: AsyncAgentProtocol[Any, str]
A stub agent that does nothing, used when reflection is disabled.
NoOpChecklistAgent
Bases: AsyncAgentProtocol[Any, Checklist]
A stub agent that returns an empty Checklist, used as a fallback for checklist agents.
AsyncAgentProtocol
Bases: Protocol[AgentInT, AgentOutT]
Generic asynchronous agent interface.
Note: For agents requiring a typed pipeline context, implementing the
:class:flujo.domain.ContextAwareAgentProtocol is now the recommended and
type-safe approach.
monitored_agent
monitored_agent(agent_name: str) -> Callable[[Type[_Any]], Type[_Any]]
Class decorator that records telemetry for the agent's run method.
make_agent
make_agent(model: str, system_prompt: str, output_type: Type[Any], tools: list[Any] | None = None, processors: Optional[AgentProcessors] = None, **kwargs: Any) -> tuple[AgentLike, AgentProcessors]
Creates a pydantic_ai.Agent, injecting the correct API key and returns it with processors.
make_agent_async
make_agent_async(model: str, system_prompt: str, output_type: Type[Any], max_retries: int = 3, timeout: int | None = None, processors: Optional[AgentProcessors] = None, auto_repair: bool = True, **kwargs: Any) -> AsyncAgentWrapper[Any, Any]
Creates a pydantic_ai.Agent and returns an AsyncAgentWrapper exposing .run_async.
The wrapper uses an internal adapter pattern to convert pydantic-ai responses to FlujoAgentResult, providing vendor-agnostic results to Flujo's orchestration layer.
Parameters
model : str The model identifier (e.g., "openai:gpt-4o") system_prompt : str The system prompt for the agent output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent (e.g., temperature, model_settings, max_tokens, etc.)
Returns
AsyncAgentWrapper Wrapper that returns FlujoAgentResult (vendor-agnostic interface). The adapter pattern is used internally to isolate pydantic-ai specifics.
make_templated_agent_async
make_templated_agent_async(model: str, template_string: str, variables_spec: Optional[dict[str, Any]], output_type: Type[Any], max_retries: int = 3, timeout: int | None = None, processors: Optional[AgentProcessors] = None, auto_repair: bool = True, **kwargs: Any) -> TemplatedAsyncAgentWrapper[Any, Any]
Create an agent and wrap it with TemplatedAsyncAgentWrapper to enable just-in-time system prompt rendering.
The wrapper uses an internal adapter pattern to convert pydantic-ai responses to FlujoAgentResult, providing vendor-agnostic results to Flujo's orchestration layer.
Parameters
model : str The model identifier (e.g., "openai:gpt-4o") template_string : str Template string for system prompt with variable placeholders variables_spec : Optional[dict[str, Any]] Variable specifications for template rendering output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent
Returns
TemplatedAsyncAgentWrapper Wrapper that returns FlujoAgentResult (vendor-agnostic interface). The adapter pattern is used internally to isolate pydantic-ai specifics.
make_repair_agent
make_repair_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, str]'
Create the internal JSON repair agent.
get_repair_agent
get_repair_agent() -> 'AsyncAgentWrapper[Any, str]'
Lazily create the internal repair agent.
get_raw_output_from_exception
get_raw_output_from_exception(exc: Exception) -> str
Best-effort extraction of raw output from validation-related exceptions.
make_review_agent
make_review_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, Checklist]'
Create a review agent with default settings.
make_solution_agent
make_solution_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, str]'
Create a solution agent with default settings.
make_validator_agent
make_validator_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, Checklist]'
Create a validator agent with default settings.
get_reflection_agent
get_reflection_agent(model: str | None = None) -> AsyncAgentProtocol[Any, Any] | NoOpReflectionAgent
Returns a new instance of the reflection agent, or a no-op if disabled.
make_self_improvement_agent
make_self_improvement_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, ImprovementReport]'
Create the SelfImprovementAgent.
__getattr__
__getattr__(name: str) -> Any
Handle access to removed global agent instances.
flujo.processors
Processor
Bases: Protocol
Generic processor interface.
process
async
process(data: Any, context: Optional[BaseModel] = None) -> Any
Process data with optional pipeline context.
AddContextVariables
Prepend context variables to a prompt.
StripMarkdownFences
Extract content from a fenced code block.
EnforceJsonResponse
Ensure the output is valid JSON.
SerializePydantic
Serialize any object with a model_dump method to a plain dict.
DiffProcessor
Compute JSON patch operations between two JSON-like payloads.
DeterministicRepairProcessor
Tier-1 deterministic fixer for malformed JSON emitted by LLMs.
JsonRegionExtractorProcessor
Bases: Processor
Stage 0: Extract the largest balanced JSON object/array and bounded-unescape.
- Scans input text to find the maximal balanced {...} or [...] region, ignoring braces/brackets inside quoted strings.
- If both object and array candidates exist, prefer the one matching the schema
root when provided via
expected_root. - Applies bounded unescape for double-encoded JSON (up to
max_unescape_depth).
This is a safe, deterministic pre-stage that often enables plain JSON decoding to succeed without LLM-based repair.
SmartTypeCoercionProcessor
Bases: Processor
Stage 3: Schema-aware, safe type coercion (Ajv-style, minimal version).
This minimal implementation applies whitelisted, unambiguous conversions without requiring a full JSON Schema. It supports per-type allowlists: - integer: ["str->int"] - number: ["str->float"] - boolean: ["str->bool"] - array: ["str->array"] (string that looks like JSON array)
A future iteration can accept and use a JSON Schema to narrow conversions by field paths and support anyOf/oneOf branch selection.
TolerantJsonDecoderProcessor
Bases: Processor
Stage 1: Tiered tolerant JSON decode.
Order: - orjson.loads (fast path) → fallback to json.loads - if tolerant_level >= 1: try json5/pyjson5 - if tolerant_level >= 2: try json-repair (log patch preview)
Returns decoded Python object on success; otherwise returns the original input.
flujo.steps
Utilities and Tools
flujo.utils
Flujo utilities.
run_sync
run_sync(coro: Coroutine[object, object, T], *, running_loop_error: str | None = None) -> T
Run an async coroutine from synchronous code using anyio.
Raises:
| Type | Description |
|---|---|
TypeError
|
If called from a running event loop thread. |
format_prompt
format_prompt(template: str, **kwargs: Any) -> str
Convenience wrapper around :class:AdvancedPromptFormatter.
This helper respects the global template configuration from flujo.toml, including strict mode and resolution logging.
Parameters
template: Template string to render. **kwargs: Values referenced inside the template.
Returns
str The rendered template.
summarize_and_redact_prompt
summarize_and_redact_prompt(prompt_text: str, max_length: int = 200, settings: Optional[Settings] = None) -> str
Return a truncated and redacted version of a prompt.
create_field_serializer
create_field_serializer(field_name: str, serializer_func: Callable[[Any], Any]) -> Callable[[Any], Any]
Helper for Pydantic @field_serializer style usage.
create_serializer_for_type
create_serializer_for_type(obj_type: type[Any], serializer_func: Callable[[Any], Any]) -> Callable[[Any], Any]
Return a serializer that dispatches to serializer_func when type matches.
lookup_custom_serializer
lookup_custom_serializer(value: Any) -> Optional[Callable[[Any], Any]]
Return the first matching serializer for value's type (exact or subclass).
lookup_custom_deserializer
lookup_custom_deserializer(obj_type: type[Any]) -> Optional[Callable[[Any], Any]]
Return the first matching deserializer for obj_type (exact or subclass).
register_custom_serializer
register_custom_serializer(obj_type: type[Any], serializer_func: Callable[[Any], Any]) -> None
Register a serializer for a specific type.
register_custom_deserializer
register_custom_deserializer(obj_type: type[Any], deserializer_func: Callable[[Any], Any]) -> None
Register a deserializer for a specific type.
reset_custom_serializer_registry
reset_custom_serializer_registry() -> None
Reset serializer/deserializer registries (testing helper).
safe_deserialize
safe_deserialize(serialized_data: Any, target_type: Optional[type[Any]] = None, default_deserializer: Optional[Callable[[Any], Any]] = None) -> Any
Best-effort deserialization counterpart for _serialize_for_json.
flujo.tracing
flujo.telemetry
Telemetry integrations for production monitoring.
OpenTelemetryHook
Fallback when OpenTelemetry is not available.
StateBackendSpanExporter
Fallback when OpenTelemetry is not available.
CLI and Testing
flujo.cli
CLI package.
flujo.testing
Testing utilities for flujo.
StubAgent
Simple agent for testing that returns preset outputs.
DummyPlugin
override_agent
override_agent(step: Any, new_agent: Any) -> Iterator[None]
Temporarily override the agent of a Step within a context.
assert_validator_failed
assert_validator_failed(result: PipelineResult[Any], validator_name: str, expected_feedback_part: Optional[str] = None) -> None
Assert that a specific validator failed during the run.
assert_context_updated
assert_context_updated(result: PipelineResult[Any], **expected_updates: Any) -> None
Assert that the final context contains the expected updates.
Recipes and Plugins
flujo.recipes
Recipe modules for common workflow patterns.
This module provides factory functions for common workflow patterns.
RECOMMENDED: Use the factory functions for better transparency, composability, and future YAML/AI support: - make_default_pipeline() - Creates a Review → Solution → Validate pipeline - make_agentic_loop_pipeline() - Creates an explorative agent workflow - run_default_pipeline() - Executes a default pipeline - run_agentic_loop_pipeline() - Executes an agentic loop pipeline
make_default_pipeline
make_default_pipeline(review_agent: 'AsyncAgentProtocol[Any, Any]', solution_agent: 'AsyncAgentProtocol[Any, Any]', validator_agent: 'AsyncAgentProtocol[Any, Any]', reflection_agent: Optional['AsyncAgentProtocol[Any, Any]'] = None, max_retries: int = 3, k_variants: int = 1, max_iters: int = 3, reflection_limit: Optional[int] = None) -> Pipeline[str, Checklist]
Create a default Review → Solution → Validate pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
review_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that creates a checklist of requirements |
required |
solution_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that generates a solution |
required |
validator_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that validates the solution against requirements |
required |
reflection_agent
|
Optional['AsyncAgentProtocol[Any, Any]']
|
Optional agent for reflection/improvement |
None
|
max_retries
|
int
|
Maximum retries for each step |
3
|
k_variants
|
int
|
Number of solution variants to generate per iteration |
1
|
max_iters
|
int
|
Maximum number of iterations for improvement |
3
|
reflection_limit
|
Optional[int]
|
Optional limit on reflection iterations |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline[str, Checklist]
|
A Pipeline object that can be inspected, composed, and executed |
make_state_machine_pipeline
make_state_machine_pipeline(*, nodes: dict[str, Step[Any, Any] | Pipeline[Any, Any]], context_model: type[PipelineContext], router_field: str = 'next_state', end_state_field: str = 'is_complete', max_loops: int = 50) -> Pipeline[Any, Any]
Create a simple state machine pipeline.
Each iteration runs the state pipeline specified in context.<router_field>.
The loop exits when context.<end_state_field> evaluates to True.
make_agentic_loop_pipeline
make_agentic_loop_pipeline(planner_agent: 'AsyncAgentProtocol[Any, Any]', agent_registry: dict[str, 'AsyncAgentProtocol[Any, Any]'], max_loops: int = 10, max_retries: int = 3) -> Pipeline[str, Any]
Create an agentic loop pipeline for explorative workflows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
planner_agent
|
'AsyncAgentProtocol[Any, Any]'
|
Agent that decides what command to run next |
required |
agent_registry
|
dict[str, 'AsyncAgentProtocol[Any, Any]']
|
Dictionary of available agents to execute |
required |
max_loops
|
int
|
Maximum number of loop iterations |
10
|
max_retries
|
int
|
Maximum retries for each step |
3
|
Returns:
| Type | Description |
|---|---|
Pipeline[str, Any]
|
A Pipeline object with a LoopStep containing the agentic logic |
run_default_pipeline
async
run_default_pipeline(pipeline: Pipeline[str, Checklist], task: Task) -> Optional[Candidate]
Run a default pipeline and return the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline
|
Pipeline[str, Checklist]
|
Pipeline created by make_default_pipeline |
required |
task
|
Task
|
Task to execute |
required |
Returns:
| Type | Description |
|---|---|
Optional[Candidate]
|
Candidate with solution and checklist, or None if failed |
run_agentic_loop_pipeline
async
run_agentic_loop_pipeline(pipeline: Pipeline[str, Any], initial_goal: str, resume_from: Optional[PipelineResult[PipelineContext]] = None, human_input: Optional[str] = 'human') -> PipelineResult[PipelineContext]
Run an agentic loop pipeline and return the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline
|
Pipeline[str, Any]
|
Pipeline created by make_agentic_loop_pipeline |
required |
initial_goal
|
str
|
Initial goal for the agentic loop |
required |
resume_from
|
Optional[PipelineResult[PipelineContext]]
|
Optional paused result to resume from |
None
|
human_input
|
Optional[str]
|
Human input to provide when resuming (default: "human") |
'human'
|
Returns:
| Type | Description |
|---|---|
PipelineResult[PipelineContext]
|
PipelineResult with the execution context and results |
flujo.plugins
Built-in plugins for flujo.
Models and Prompts
flujo.models
Public facade for Flujo domain models.
This module is a stable import path for user-facing types.
Task
Bases: BaseModel
Represents a task to be solved by the orchestrator.
Candidate
Bases: BaseModel
Represents a potential solution and its evaluation metadata.
Checklist
Bases: BaseModel
A checklist for evaluating a solution.
ChecklistItem
Bases: BaseModel
A single item in a checklist for evaluating a solution.
SearchNode
Bases: BaseModel
A single node in a tree search frontier.
attach_context
attach_context(context: Optional['PipelineContext']) -> None
Attach runtime context and capture a snapshot for persistence.
rehydrate_context
rehydrate_context(context_type: type['PipelineContext']) -> Optional['PipelineContext']
Rebuild runtime context from the stored snapshot if needed.
SearchState
Bases: BaseModel
Persistent tree search state stored in the pipeline context.
sorted_open_nodes
sorted_open_nodes() -> list[SearchNode]
Return open nodes sorted by A* priority.
pop_best_open
pop_best_open() -> SearchNode | None
Pop the best node from the open set.
PipelineResult
Bases: BaseModel, Generic[ContextT]
Aggregated result of running a pipeline.
For backward compatibility, this object exposes a top-level success flag
that reflects overall pipeline status (computed by callers/runners). Some
older tests and integrations expect result.success to exist.
status
property
status: str
Best-effort status indicator for backward compatibility.
output
property
output: Any | None
Return the output of the last step in the pipeline.
This is a convenience property for backward compatibility with tests and code that expects result.output.
StepResult
Bases: BaseModel
Result of executing a single pipeline step.
metadata
property
metadata: dict[str, Any]
Alias for metadata_ for backward compatibility and test expectations.
UsageLimits
Bases: BaseModel
Defines resource consumption limits for a pipeline run.
RefinementCheck
Bases: BaseModel
Standardized output from a critic pipeline in a refinement loop.
ImprovementSuggestion
Bases: BaseModel
A single suggestion from the SelfImprovementAgent.
ImprovementReport
Bases: BaseModel
Aggregated improvement suggestions returned by the agent.
HumanInteraction
Bases: BaseModel
Records a single human interaction in a HITL conversation.
PipelineContext
Bases: BaseModel
Runtime context shared by all steps in a pipeline run.
The base PipelineContext tracks essential execution metadata and is
automatically created for every call to :meth:Flujo.run. Custom context
models should inherit from this class to add application specific fields
while retaining the built in ones.
steps
property
steps: JSONObject
Expose recorded step outputs.
sandbox
property
sandbox: SandboxProtocol | None
Return the sandbox handle attached to this context when available.
retrieve
async
retrieve(query_text: str | None = None, *, query_vector: list[float] | None = None, limit: int = 5) -> list[ScoredMemory]
Retrieve memories using the configured memory store.
If query_vector is not provided, an embedding model will be used when available. Returns an empty list when no memory store is configured.
EvaluationReport
EvaluationScore
Bases: BaseModel
Structured evaluation returned by the shadow judge.
flujo.prompts
System prompts for Flujo agents.
This module contains all the default system prompts used by Flujo agents. Users can import and override these prompts for their own use cases.
flujo.signature_tools
analyze_signature
analyze_signature(func: Callable[..., Any]) -> SignatureAnalysis
Inspect func and determine its pipeline injection requirements.
Parameters
func: Callable to inspect. It may be a standard function or a callable object.
Returns
SignatureAnalysis
Named tuple describing whether context or resources keyword
parameters are required and the inferred input/output types.
Raises
ConfigurationError
If context or resources parameters are annotated with invalid
types.