Skip to content

API Reference

Welcome to the Flujo API reference. This documentation is automatically generated from the source code using mkdocstrings.

Core Modules

flujo.application

Application-level components for flujo.

Flujo

Bases: Generic[RunnerInT, RunnerOutT, ContextT]

Execute a pipeline sequentially.

Parameters

pipeline : Pipeline | Step | None, optional Pipeline object to run directly. Deprecated when using registry. registry : PipelineRegistry, optional Registry holding named pipelines. pipeline_name : str, optional Name of the pipeline registered in registry. pipeline_version : str, default "latest" Version to load from the registry when the run starts. state_backend : StateBackend, optional Backend used to persist :class:WorkflowState for durable execution. delete_on_completion : bool, default False If True remove persisted state once the run finishes. persist_state : bool, default True Disable state persistence entirely for ephemeral runs when False. state_providers : Dict[str, StateProvider], optional External state providers for :class:ContextReference hydration. Ignored when a custom executor_factory is supplied; pass providers directly to the factory instead.

disable_tracing

disable_tracing() -> None

Disable tracing by removing trace hooks and clearing active manager.

aclose async

aclose() -> None

Asynchronously release runner-owned resources.

close

close() -> None

Synchronously release runner-owned resources (best-effort in async contexts).

run_async

run_async(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> _RunAsyncHandle[ContextT]

Run the pipeline asynchronously.

Returns an object that is both async-iterable (streaming) and awaitable (returns the final PipelineResult), preserving legacy convenience.

run_result_async async

run_result_async(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> PipelineResult[ContextT]

Run the pipeline asynchronously and return the final PipelineResult.

run_stream async

run_stream(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[StepOutcome[StepResult]]

Run the pipeline and stream StepOutcome events as they complete.

run_outcomes async

run_outcomes(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[StepOutcome[StepResult]]

Alias for run_stream() with clearer naming.

replay_from_trace async

replay_from_trace(run_id: str) -> PipelineResult[ContextT]

Replay a prior run deterministically using recorded trace and responses (FSD-013).

get_failed_background_tasks async

get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> list[BackgroundTaskInfo]

List failed background tasks for optional parent run.

resume_background_task async

resume_background_task(task_id: str, new_data: object | None = None) -> PipelineResult[ContextT]

Resume a failed background task synchronously.

retry_failed_background_tasks async

retry_failed_background_tasks(parent_run_id: str, max_retries: int = 3) -> list[PipelineResult[ContextT]]

Retry failed background tasks for a given parent run.

cleanup_stale_background_tasks async

cleanup_stale_background_tasks(stale_hours: int = 24) -> int

Mark stale running background tasks as failed.

run_with_events async

run_with_events(initial_input: RunnerInT, *, run_id: str | None = None, initial_context_data: Optional[JSONObject] = None) -> AsyncIterator[object]

Run pipeline yielding lifecycle events (StepOutcome/Chunk) and final PipelineResult.

SelfImprovementAgent

Agent that analyzes failures and suggests improvements.

run_pipeline_async async

run_pipeline_async(inputs: Any, *, runner: Flujo[Any, Any, Any]) -> PipelineResult[Any]

Adapter to run a :class:Flujo engine as a pydantic-evals task.

evaluate_and_improve async

evaluate_and_improve(task_function: Callable[[Any], Awaitable[PipelineResult[Any]]], dataset: Any, improvement_agent: SelfImprovementAgent, pipeline_definition: Optional[Pipeline[Any, Any] | Step[Any, Any]] = None) -> ImprovementReport

Run dataset evaluation and return improvement suggestions.

flujo.domain

Domain components for flujo.

Step

Bases: BaseModel, Generic[StepInT, StepOutT]

Declarative node in a pipeline.

A Step holds a reference to the agent that will execute, configuration such as retries and timeout, and optional plugins. It does not execute anything by itself. Steps are composed into :class:Pipeline objects and run by the :class:~flujo.application.runner.Flujo engine.

Use :meth:arun to invoke the underlying agent directly during unit tests.

input property

input: object | None

Return the declarative input value (legacy alias).

__call__

__call__(*args: Any, **kwargs: Any) -> NoReturn

Disallow direct invocation of a Step.

__getattr__

__getattr__(item: str) -> Any

Raise a helpful error when trying to access non-existent attributes.

arun

arun(data: StepInT, **kwargs: Any) -> Coroutine[object, object, StepOutT]

Return the agent coroutine to run this step directly in tests.

fallback

fallback(fallback_step: 'Step[Any, Any]') -> 'Step[StepInT, StepOutT]'

Set a fallback step to execute if this step fails.

Parameters:

Name Type Description Default
fallback_step 'Step[Any, Any]'

The step to execute if this step fails

required

Returns:

Type Description
'Step[StepInT, StepOutT]'

self for method chaining

add_plugin

add_plugin(plugin: 'ValidationPlugin') -> 'Step[StepInT, StepOutT]'

Add a validation plugin to this step.

Parameters:

Name Type Description Default
plugin 'ValidationPlugin'

The validation plugin to add

required

Returns:

Type Description
'Step[StepInT, StepOutT]'

self for method chaining

review classmethod

review(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[Any, Any]'

Construct a review step using the provided agent.

Also infers input/output types from the agent's signature for pipeline validation.

solution classmethod

solution(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[Any, Any]'

Construct a solution step using the provided agent.

Also infers input/output types from the agent's signature for pipeline validation.

validate_step classmethod

validate_step(agent: AsyncAgentProtocol[Any, Any], *, plugins: Optional[list[tuple[ValidationPlugin, int]]] = None, validators: Optional[List[Validator]] = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, strict: bool = True, **config: Any) -> 'Step[Any, Any]'

Construct a validation step using the provided agent.

Also infers input/output types from the agent's signature for pipeline validation.

from_callable classmethod

from_callable(callable_: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], name: str | None = None, updates_context: bool = False, validate_fields: bool = False, sink_to: str | None = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, is_adapter: bool = False, adapter_id: str | None = None, adapter_allow: str | None = None, config: StepConfig | None = None, **config_kwargs: Any) -> 'Step[StepInT, StepOutT]'

Create a Step from an async callable.

from_mapper classmethod

from_mapper(mapper: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], name: str | None = None, updates_context: bool = False, sink_to: str | None = None, processors: Optional[AgentProcessors] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config: Any) -> 'Step[StepInT, StepOutT]'

Alias for :meth:from_callable to improve readability.

human_in_the_loop classmethod

human_in_the_loop(name: str, message_for_user: str | None = None, input_schema: Type[BaseModel] | None = None) -> 'HumanInTheLoopStep'

Construct a HumanInTheLoop step.

refine_until classmethod

refine_until(name: str, generator_pipeline: 'Pipeline[Any, Any]', critic_pipeline: 'Pipeline[Any, RefinementCheck]', max_refinements: int = 5, feedback_mapper: Optional[Callable[[Any, RefinementCheck], Any]] = None, **config_kwargs: Any) -> 'Pipeline[Any, Any]'

Build a refinement loop pipeline: generator >> capture >> critic >> post-mapper.

granular classmethod

granular(name: str, agent: Any, input_: Any = None, *, max_turns: int = 20, history_max_tokens: int = 128000, blob_threshold_bytes: int = 20000, enforce_idempotency: bool = False, resume_fingerprint_mode: Literal['strict', 'compat'] | None = None, **config_kwargs: Any) -> 'Pipeline[Any, Any]'

Build a granular execution pipeline for crash-safe, resumable agent runs.

Wraps a GranularStep in a LoopStep with abort-on-failure semantics. Each turn is persisted atomically with CAS guards to prevent double-execution.

Parameters:

Name Type Description Default
name str

Step name for identification

required
agent Any

The agent to execute

required
input_ Any

Optional input data or template

None
max_turns int

Maximum number of turns before forced completion (default 20)

20
history_max_tokens int

Token budget for message history (default 128K)

128000
blob_threshold_bytes int

Payload size triggering blob offload (default 20KB)

20000
enforce_idempotency bool

Require idempotency keys on tool calls (default False)

False
resume_fingerprint_mode Literal['strict', 'compat'] | None

Optional per-step fingerprint mode override

None
**config_kwargs Any

Additional step configuration

{}

Returns:

Type Description
'Pipeline[Any, Any]'

Pipeline containing LoopStep(GranularStep) with on_failure="abort"

use_input

use_input(key: str) -> 'Pipeline[dict[str, Any], StepOutT]'

Create a small adapter pipeline that selects a key from a dict input.

This is a common pattern when working with :meth:parallel branches where each branch only needs a portion of the upstream output.

gather classmethod

gather(name: str, *, wait_for: List[str], **config_kwargs: Any) -> 'Step[Any, JSONObject]'

Collect outputs from multiple parallel branches.

The step expects a dictionary input (e.g. from :meth:parallel) and returns a dictionary containing only the specified keys.

Pipeline

Bases: BaseModel, Generic[PipeInT, PipeOutT]

Ordered collection of :class:Step objects.

Pipeline instances are immutable containers that define the execution graph. They can be composed with the >> operator and validated before running. Execution is handled by the :class:~flujo.application.runner.Flujo class.

input_type property

input_type: Any

Head input type for the pipeline (derived from the first step).

output_type property

output_type: Any

Tail output type for the pipeline (derived from the last step).

model_validate classmethod

model_validate(obj: Any, *args: Any, **kwargs: Any) -> 'Pipeline[Any, Any]'

Preserve concrete Step subclasses (e.g., ParallelStep) when instances are provided.

When callers pass already-constructed Step/ParallelStep objects, bypass Pydantic re-validation/coercion to avoid losing subclass-specific fields (like branches).

model_post_init

model_post_init(__context: Any) -> None

Ensure head/tail types are initialized after construction.

from_yaml classmethod

from_yaml(yaml_source: str, *, is_path: bool = True) -> 'Pipeline[object, object]'

Load a Pipeline from YAML. When is_path=True, yaml_source is treated as a file path.

validate_graph

validate_graph(*, raise_on_error: bool = False, include_imports: bool = False, _visited_pipelines: Optional[set[int]] = None, _visited_paths: Optional[set[str]] = None, _report_cache: Optional[dict[str, 'ValidationReport']] = None) -> ValidationReport

Validate that all steps have agents, compatible types, and static lints.

Adds advanced static checks: - V-P1: Parallel context merge conflict detection for default CONTEXT_UPDATE without field_mapping - V-A5: Unbound output warning when a step's output is unused and it does not update context - V-F1: Incompatible fallback signature between step and fallback_step

as_step

as_step(name: str, *, inherit_context: bool = True, **kwargs: Any) -> 'Step[PipeInT, PipelineResult[Any]]'

Wrap this pipeline as a composable Step, delegating to Flujo runner's as_step.

StepConfig

Bases: BaseModel

Configuration options applied to every step.

Parameters

max_retries: How many times the step should be retried on failure. timeout_s: Optional timeout in seconds for the agent execution. temperature: Optional temperature setting for LLM based agents. top_k: Optional top-k sampling parameter for LLM based agents. top_p: Optional nucleus sampling parameter for LLM based agents. preserve_fallback_diagnostics: Whether to preserve diagnostic feedback from fallback executions. When True, successful fallbacks retain feedback for monitoring/debugging. When False, successful fallbacks clear feedback for backward compatibility.

MapStep

Bases: LoopStep[TContext]

Map a pipeline over an iterable stored on the context.

MapStep wraps LoopStep to iterate over context.<iterable_input> and run pipeline_to_run for each item. The collected outputs are returned as a list.

get_loop_body_pipeline

get_loop_body_pipeline() -> Pipeline[Any, Any]

Return the configured pipeline to run per item (the original).

get_max_loops

get_max_loops() -> int

Get the maximum number of loops based on iterable size.

get_exit_condition_callable

get_exit_condition_callable() -> Callable[[object, Optional[TContext]], bool]

Get the exit condition callable for mapping.

get_initial_input_to_loop_body_mapper

get_initial_input_to_loop_body_mapper() -> Callable[[object, Optional[TContext]], object]

Get the initial input mapper for mapping.

get_iteration_input_mapper

get_iteration_input_mapper() -> Callable[[object, Optional[TContext], int], object]

Get the iteration input mapper for mapping.

get_loop_output_mapper

get_loop_output_mapper() -> Callable[[object, Optional[TContext]], List[object]]

Get the loop output mapper for mapping.

model_post_init

model_post_init(__context: object) -> None

Ensure transient runtime state is cleared and defaults initialized.

from_pipeline classmethod

from_pipeline(*, name: str, pipeline: Pipeline[Any, Any], iterable_input: str, **kwargs: object) -> 'MapStep[TContext]'

Create a MapStep from a pipeline.

Parameters:

Name Type Description Default
name str

Name of the step

required
pipeline Pipeline[Any, Any]

Pipeline to execute for each item

required
iterable_input str

Name of the context field containing the iterable

required
**kwargs object

Additional configuration options

{}

Returns:

Type Description
'MapStep[TContext]'

Configured MapStep instance

ParallelStep

Bases: Step[object, object], Generic[TContext]

Execute multiple branch pipelines concurrently.

Each entry in branches is run in parallel and the outputs are returned as a dictionary keyed by branch name. Context fields can be selectively copied to branches via context_include_keys and merged back using merge_strategy.

TreeSearchStep

Bases: Step[object, object], Generic[TContext]

Run a quota-aware tree search with proposer/evaluator agents.

MergeStrategy

Bases: Enum

Strategies for merging branch contexts back into the main context.

The CONTEXT_UPDATE strategy is recommended for most use cases as it provides proper validation and handles complex context structures safely. Use NO_MERGE for performance-critical scenarios where context updates are not needed.

BranchFailureStrategy

Bases: Enum

Policies for handling branch failures in ParallelStep.

BaseContext

Bases: PipelineContext

Base context for typed pipeline contexts.

Task

Bases: BaseModel

Represents a task to be solved by the orchestrator.

Candidate

Bases: BaseModel

Represents a potential solution and its evaluation metadata.

Checklist

Bases: BaseModel

A checklist for evaluating a solution.

ChecklistItem

Bases: BaseModel

A single item in a checklist for evaluating a solution.

SearchNode

Bases: BaseModel

A single node in a tree search frontier.

attach_context

attach_context(context: Optional['PipelineContext']) -> None

Attach runtime context and capture a snapshot for persistence.

rehydrate_context

rehydrate_context(context_type: type['PipelineContext']) -> Optional['PipelineContext']

Rebuild runtime context from the stored snapshot if needed.

SearchState

Bases: BaseModel

Persistent tree search state stored in the pipeline context.

sorted_open_nodes

sorted_open_nodes() -> list[SearchNode]

Return open nodes sorted by A* priority.

pop_best_open

pop_best_open() -> SearchNode | None

Pop the best node from the open set.

PipelineResult

Bases: BaseModel, Generic[ContextT]

Aggregated result of running a pipeline.

For backward compatibility, this object exposes a top-level success flag that reflects overall pipeline status (computed by callers/runners). Some older tests and integrations expect result.success to exist.

status property

status: str

Best-effort status indicator for backward compatibility.

output property

output: Any | None

Return the output of the last step in the pipeline.

This is a convenience property for backward compatibility with tests and code that expects result.output.

StepResult

Bases: BaseModel

Result of executing a single pipeline step.

metadata property

metadata: dict[str, Any]

Alias for metadata_ for backward compatibility and test expectations.

UsageLimits

Bases: BaseModel

Defines resource consumption limits for a pipeline run.

ExecutedCommandLog

Bases: BaseModel

Structured log entry for a command executed in the loop.

FlujoAgentResult

Flujo-controlled agent result that abstracts vendor specifics.

This class provides a vendor-agnostic representation of agent execution results. It encapsulates the output, usage metrics, and cost information in a way that Flujo's orchestration layer can consume without knowing about the underlying agent backend implementation.

Attributes:

Name Type Description
output

The actual output from the agent (any type).

cost_usd

Optional explicit cost in USD (for cases where cost is set directly on the result object rather than in usage).

token_counts

Optional total token count (for explicit cost reporting when detailed usage breakdown is not available).

__init__

__init__(output: Any, usage: Optional[FlujoAgentUsage] = None, cost_usd: Optional[float] = None, token_counts: Optional[int] = None) -> None

Initialize FlujoAgentResult.

Parameters:

Name Type Description Default
output Any

The actual output from the agent.

required
usage Optional[FlujoAgentUsage]

Optional usage metrics conforming to FlujoAgentUsage protocol.

None
cost_usd Optional[float]

Optional explicit cost in USD.

None
token_counts Optional[int]

Optional total token count.

None

usage

usage() -> Optional[FlujoAgentUsage]

Get usage metrics.

This method provides backward compatibility with code that expects usage to be accessed as a method call (e.g., result.usage()).

Returns:

Type Description
Optional[FlujoAgentUsage]

Optional[FlujoAgentUsage]: Usage metrics if available, None otherwise.

FlujoAgentUsage

Bases: Protocol

Vendor-agnostic usage metrics interface.

This protocol defines the standard interface for usage metrics that Flujo expects from any agent backend. Implementations should adapt their vendor-specific usage objects to match this interface.

input_tokens instance-attribute

input_tokens: int

Number of input/prompt tokens used.

output_tokens instance-attribute

output_tokens: int

Number of output/completion tokens used.

cost_usd instance-attribute

cost_usd: Optional[float]

Cost in USD, if available. None if not provided by the agent backend.

ExecutionBackend

Bases: Protocol

Protocol for executing pipeline steps.

execute_step async

execute_step(request: StepExecutionRequest) -> StepOutcome[StepResult]

Execute a single step and return a typed outcome.

StepExecutionRequest dataclass

Serializable request for executing a single step.

Contains the step to execute, input data, context object, resources, and execution configuration.

AgentProcessors

Bases: BaseModel

Collections of prompt and output processors.

PluginOutcome

Bases: BaseModel

Result returned by a validation plugin.

ValidationPlugin

Bases: Protocol

Protocol that all validation plugins must implement.

Validator

Bases: Protocol

A generic, stateful protocol for any component that can validate a step's output.

validate async

validate(output_to_check: Any, *, context: Optional[BaseModel] = None) -> ValidationResult

Validates the given output.

ValidationResult

Bases: BaseModel

The standard output from any validator, providing a pass/fail signal and scoring.

EvaluationReport

Bases: EvaluationScore

Detailed evaluation report for multi-signal scoring.

EvaluationScore

Bases: BaseModel

Structured evaluation returned by the shadow judge.

MultiSignalEvaluator

Evaluate solutions with rubric and objective signals.

ValidationFinding

Bases: BaseModel

Represents a single validation finding.

ValidationReport

Bases: BaseModel

Aggregated validation results for a pipeline.

AppResources

Bases: BaseModel

Base class for user-defined resource containers.

adapter_step

adapter_step(func: Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]], *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> 'Step[StepInT, StepOutT]'
adapter_step(func: None = None, *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> Callable[[Callable[Concatenate[StepInT, P], Coroutine[object, object, StepOutT]]], 'Step[StepInT, StepOutT]']
adapter_step(func: Callable[..., Coroutine[object, object, object]] | None = None, *, name: str | None = None, updates_context: bool = False, adapter_id: str, adapter_allow: str, processors: Optional['AgentProcessors'] = None, persist_feedback_to_context: Optional[str] = None, persist_validation_results_to: Optional[str] = None, **config_kwargs: object) -> object

Alias for :func:step that marks the created step as an adapter.

typed_context

typed_context(context_cls: Type[CtxT]) -> Type[CtxT]

Declare a typed context class for pipelines.

Usage

class MyContext(BaseContext): counter: int = 0 result: str | None = None

TypedCtx = typed_context(MyContext)

flujo.exceptions

Custom exceptions for the orchestrator.

FlujoError

Bases: Exception

Unified base exception for all Flujo errors.

This is the new base class for all Flujo exceptions. It provides enhanced error messages with suggestions and error codes.

Examples:

>>> raise FlujoError("Something went wrong", suggestion="Try X", code="E001")
>>> raise ConfigurationError("Missing config", suggestion="Set FLUJO_API_KEY")

ConfigurationError

Bases: FlujoError

Errors related to configuration and setup.

ExecutionError

Bases: FlujoError

Errors occurring during pipeline execution.

ControlFlowError

Bases: FlujoError

Non-retryable control flow signals (NEVER catch and swallow).

These exceptions are used for orchestration and should always be re-raised to let the runner handle workflow control.

ContextError

Bases: FlujoError

Errors related to context management.

ValidationError

Bases: FlujoError

Errors related to validation (types, templates, schemas).

SettingsError

Bases: ConfigurationError

Raised for configuration-related errors.

OrchestratorRetryError

Bases: ExecutionError

Raised when an agent operation fails after all retries.

RewardModelUnavailable

Bases: ConfigurationError

Raised when the reward model is required but unavailable.

FeatureDisabled

Bases: ConfigurationError

Raised when a disabled feature is invoked.

PricingNotConfiguredError

Bases: ConfigurationError

Raised when strict pricing mode is enabled but pricing is not configured for a model.

InfiniteRedirectError

Bases: ControlFlowError

Raised when a redirect loop is detected in pipeline execution.

This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.

InfiniteFallbackError

Bases: ControlFlowError

Raised when a fallback loop is detected during execution.

This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.

PipelineContextInitializationError

Bases: ContextError

Raised when a typed pipeline context fails to initialize.

ContextInheritanceError

Bases: ContextError

Raised when inheriting context for a nested pipeline fails.

UsageLimitExceededError

Bases: ExecutionError

Raised when a pipeline run exceeds its defined usage limits.

PipelineAbortSignal

Bases: ControlFlowError

Special exception hooks can raise to stop a pipeline gracefully.

This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.

PausedException

Bases: ControlFlowError

Internal exception used to pause a pipeline.

This is a control flow exception and should NEVER be caught and converted to a StepResult. Always re-raise it.

ImproperStepInvocationError

Bases: ExecutionError

DEPRECATED: Raised when a Step object is invoked directly.

.. deprecated:: 0.4.0 Use :class:StepInvocationError instead. Will be removed in v1.0.0.

MissingAgentError

Bases: ConfigurationError

Raised when a pipeline step is missing its agent.

TypeMismatchError

Bases: ValidationError

Raised when consecutive steps have incompatible types.

AgentIOValidationError

Bases: ExecutionError

Raised when an agent's input or output validation fails.

NonRetryableError

Bases: ExecutionError

Base class for errors that should not be retried in the pipeline.

HitlPolicyError

Bases: ConfigurationError

Raised when HITL steps are disabled by policy or configuration.

ResumeError

Bases: ExecutionError

Raised when a pipeline cannot be resumed due to invalid state.

ReplayError

Bases: ExecutionError

Raised when a replay attempt cannot proceed.

PluginError

Bases: ExecutionError

Raised when a plugin execution fails.

ContextFieldError

Bases: ContextError

Raised when trying to set a field that doesn't exist in the context.

StepInvocationError

Bases: ExecutionError

Unified exception for step invocation errors.

This exception replaces ImproperStepInvocationError for consistency and provides enhanced error messages for better debugging.

Note: ImproperStepInvocationError is deprecated and will be removed in a future version. Use StepInvocationError for new code.

ParallelStepError

Bases: ExecutionError

Raised when there's an issue with parallel step execution.

TemplateResolutionError

Bases: ValidationError

Raised when a template variable cannot be resolved (strict mode).

ContextIsolationError

Bases: ContextError

Raised when context isolation fails or is violated.

ContextMergeError

Bases: ContextError

Raised when context merging fails.

ContextMutationError

Bases: ContextError

Raised when a context mutation violation is detected (strict mode).

Infrastructure

flujo.infra

Infrastructure components for flujo.

LocalBackend

Bases: Generic[TContext]

Backend that executes steps in the current process.

InMemoryLRUCache dataclass

LRU cache implementation with TTL support.

init_telemetry

init_telemetry(settings_obj: Optional[Settings] = None) -> None

Configure global logging and tracing for the process.

Call once at application startup. If settings_obj is not provided the default :class:~flujo.infra.settings.Settings object is used. When telemetry is enabled the real logfire library is configured, otherwise a fallback logger that proxies to logging is provided.

load_settings

load_settings(force_reload: bool = False) -> Any

Load settings with configuration file overrides. If force_reload is True, reload config/settings.

get_cli_defaults

get_cli_defaults(command: str, force_reload: bool = False) -> JSONObject

Get CLI defaults for a specific command. If force_reload is True, reload config/settings.

get_state_uri

get_state_uri(force_reload: bool = False) -> Optional[str]

Get the state URI from configuration. If force_reload is True, reload config/settings.

flujo.state

WorkflowState

Bases: BaseModel

Serialized snapshot of a running workflow.

StateBackend

Bases: ABC

Abstract base class for state backends.

State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.

save_state abstractmethod async

save_state(run_id: str, state: JSONObject) -> None

Save workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state JSONObject

Dictionary containing workflow state data

required

load_state abstractmethod async

load_state(run_id: str) -> Optional[JSONObject]

Load workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

Returns:

Type Description
Optional[JSONObject]

Dictionary containing workflow state data, or None if not found

delete_state abstractmethod async

delete_state(run_id: str) -> None

Delete workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

list_workflows async

list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]

List workflows with optional filtering and pagination.

get_workflow_stats async

get_workflow_stats() -> JSONObject

Get statistics about stored workflows.

cleanup_old_workflows async

cleanup_old_workflows(days_old: int = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

get_failed_workflows async

get_failed_workflows(hours_back: int = 24) -> List[JSONObject]

Get failed workflows from the last N hours with error details.

list_background_tasks async

list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]

List background tasks with optional filtering and pagination.

get_failed_background_tasks async

get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]

Get failed background tasks within a time window.

get_trace abstractmethod async

get_trace(run_id: str) -> Any

Retrieve and deserialize the trace tree for a given run_id.

save_trace abstractmethod async

save_trace(run_id: str, trace: JSONObject) -> None

Save trace data for a given run_id.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
trace JSONObject

Dictionary containing trace tree data

required

save_spans async

save_spans(run_id: str, spans: list[JSONObject]) -> None

Persist normalized spans for a run.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
spans list[JSONObject]

List of span dictionaries (span_id, parent_span_id, name, etc.)

required

get_spans async

get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject

Get aggregated span statistics.

cleanup_stale_background_tasks async

cleanup_stale_background_tasks(stale_hours: int = 24) -> int

Mark stale background tasks as failed (timeout).

persist_evaluation async

persist_evaluation(run_id: str, score: float, feedback: str | None = None, step_name: str | None = None, metadata: JSONObject | None = None) -> None

Persist shadow evaluation result (default: no-op).

list_evaluations async

list_evaluations(limit: int = 20, run_id: str | None = None) -> list[JSONObject]

List persisted shadow evaluation results (default: not implemented).

save_run_start async

save_run_start(run_data: JSONObject) -> None

Persist initial run metadata.

save_step_result async

save_step_result(step_data: JSONObject) -> None

Persist a single step execution record.

save_run_end async

save_run_end(run_id: str, end_data: JSONObject) -> None

Update run metadata when execution finishes.

get_run_details async

get_run_details(run_id: str) -> Optional[JSONObject]

Retrieve stored metadata for a run.

list_runs async

list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]

Return stored runs with optional filtering.

list_run_steps async

list_run_steps(run_id: str) -> List[JSONObject]

Return all step records for a run ordered by step index.

set_system_state async

set_system_state(key: str, value: JSONObject) -> None

Upsert a global key/value pair used for connector watermarks.

get_system_state async

get_system_state(key: str) -> Optional[JSONObject]

Fetch a previously stored global key/value pair.

shutdown async

shutdown() -> None

Gracefully release any resources held by the backend.

Default is a no-op. Concrete backends should override when they hold threads, file handles, or async connections that need closing.

InMemoryBackend

Bases: StateBackend

Simple in-memory backend for testing and defaults.

This backend mirrors the serialization logic of the persistent backends by storing a serialized copy of the workflow state. Values are serialized with _serialize_for_json on save and reconstructed with safe_deserialize when loaded.

get_trace async

get_trace(run_id: str) -> Any

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: JSONObject) -> None

Save trace data for a given run_id.

get_spans async

get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject

Get aggregated span statistics.

FileBackend

Bases: StateBackend

Persist workflow state to JSON files.

get_trace async

get_trace(run_id: str) -> Optional[JSONObject]

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: JSONObject) -> None

Save trace data for a given run_id.

get_spans async

get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject

Get aggregated span statistics.

SQLiteBackend

Bases: SQLiteTraceMixin, SQLiteBackendBase

save_state async

save_state(run_id: str, state: JSONObject) -> None

Save workflow state to the database.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state JSONObject

Dictionary containing workflow state data

required

delete_state async

delete_state(run_id: str) -> None

Delete workflow state.

list_states async

list_states(status: Optional[str] = None) -> List[JSONObject]

List workflow states with optional status filter.

Uses the pooled connection when available to minimize connection overhead.

list_background_tasks async

list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]

List background tasks with optional filtering.

get_failed_background_tasks async

get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]

Return failed background tasks within a time window.

cleanup_stale_background_tasks async

cleanup_stale_background_tasks(stale_hours: int = 24) -> int

Mark running background tasks older than the cutoff as failed.

list_workflows async

list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]

Enhanced workflow listing with additional filters and metadata.

list_runs async

list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]

List runs with optional filtering and include workflow metadata.

get_workflow_stats async

get_workflow_stats() -> JSONObject

Get comprehensive workflow statistics.

get_workflow_stats_sync

get_workflow_stats_sync() -> JSONObject

Synchronous stats snapshot for observability collectors.

Prometheus collectors are synchronous and can be invoked from async runtimes (running event loop threads). This method avoids run_sync() by querying SQLite via the standard library driver.

get_failed_workflows async

get_failed_workflows(hours_back: int = 24) -> List[JSONObject]

Get failed workflows from the last N hours.

cleanup_old_workflows async

cleanup_old_workflows(days_old: float = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

delete_run async

delete_run(run_id: str) -> None

Delete a run from the runs table (cascades to traces). Audit log deletion.

Agents and Processing

flujo.agents

Agent factory utilities and wrapper classes.

This module provides factory functions for creating agents and wrapper classes for async execution. It focuses on agent creation and resilience wrapping, while system prompts are now in the flujo.prompts module.

This is the public API for the flujo agents package.

AsyncAgentWrapper

Bases: Generic[AgentInT, AgentOutT], AsyncAgentProtocol[AgentInT, AgentOutT]

Wraps a pydantic_ai.Agent to provide an asynchronous interface with retry and timeout capabilities.

run_async async

run_async(*args: Any, **kwargs: Any) -> Any

Run the agent asynchronously with retry and timeout support.

Returns:

Name Type Description
FlujoAgentResult Any

Vendor-agnostic result containing output and usage metrics.

Note Any

Return type is Any for AsyncAgentProtocol compatibility, but this

Any

method always returns FlujoAgentResult at runtime.

run async

run(*args: Any, **kwargs: Any) -> Any

Run the agent (alias for run_async).

Returns:

Name Type Description
FlujoAgentResult Any

Vendor-agnostic result containing output and usage metrics.

Note Any

Return type is Any for AsyncAgentProtocol compatibility, but this

Any

method always returns FlujoAgentResult at runtime.

enable_structured_output

enable_structured_output(*, json_schema: dict[str, Any] | None = None, name: str | None = None) -> None

Enable structured output with JSON schema when supported.

This sets a best-effort response_format hint; pydantic-ai/provider may ignore it when unsupported. Kept non-fatal by design.

TemplatedAsyncAgentWrapper

Bases: AsyncAgentWrapper[AgentInT, AgentOutT]

Async wrapper that supports just-in-time system prompt rendering from a template using runtime context and previous step output.

The wrapper temporarily overrides the underlying agent's system_prompt for a single run and restores it afterwards to keep agent instances stateless.

DeterministicRepairProcessor

Tier-1 deterministic fixer for malformed JSON emitted by LLMs.

LoggingReviewAgent

Bases: AsyncAgentProtocol[Any, Any]

Wrapper for review agent that adds logging.

NoOpReflectionAgent

Bases: AsyncAgentProtocol[Any, str]

A stub agent that does nothing, used when reflection is disabled.

NoOpChecklistAgent

Bases: AsyncAgentProtocol[Any, Checklist]

A stub agent that returns an empty Checklist, used as a fallback for checklist agents.

AsyncAgentProtocol

Bases: Protocol[AgentInT, AgentOutT]

Generic asynchronous agent interface.

Note: For agents requiring a typed pipeline context, implementing the :class:flujo.domain.ContextAwareAgentProtocol is now the recommended and type-safe approach.

monitored_agent

monitored_agent(agent_name: str) -> Callable[[Type[_Any]], Type[_Any]]

Class decorator that records telemetry for the agent's run method.

make_agent

make_agent(model: str, system_prompt: str, output_type: Type[Any], tools: list[Any] | None = None, processors: Optional[AgentProcessors] = None, **kwargs: Any) -> tuple[AgentLike, AgentProcessors]

Creates a pydantic_ai.Agent, injecting the correct API key and returns it with processors.

make_agent_async

make_agent_async(model: str, system_prompt: str, output_type: Type[Any], max_retries: int = 3, timeout: int | None = None, processors: Optional[AgentProcessors] = None, auto_repair: bool = True, **kwargs: Any) -> AsyncAgentWrapper[Any, Any]

Creates a pydantic_ai.Agent and returns an AsyncAgentWrapper exposing .run_async.

The wrapper uses an internal adapter pattern to convert pydantic-ai responses to FlujoAgentResult, providing vendor-agnostic results to Flujo's orchestration layer.

Parameters

model : str The model identifier (e.g., "openai:gpt-4o") system_prompt : str The system prompt for the agent output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent (e.g., temperature, model_settings, max_tokens, etc.)

Returns

AsyncAgentWrapper Wrapper that returns FlujoAgentResult (vendor-agnostic interface). The adapter pattern is used internally to isolate pydantic-ai specifics.

make_templated_agent_async

make_templated_agent_async(model: str, template_string: str, variables_spec: Optional[dict[str, Any]], output_type: Type[Any], max_retries: int = 3, timeout: int | None = None, processors: Optional[AgentProcessors] = None, auto_repair: bool = True, **kwargs: Any) -> TemplatedAsyncAgentWrapper[Any, Any]

Create an agent and wrap it with TemplatedAsyncAgentWrapper to enable just-in-time system prompt rendering.

The wrapper uses an internal adapter pattern to convert pydantic-ai responses to FlujoAgentResult, providing vendor-agnostic results to Flujo's orchestration layer.

Parameters

model : str The model identifier (e.g., "openai:gpt-4o") template_string : str Template string for system prompt with variable placeholders variables_spec : Optional[dict[str, Any]] Variable specifications for template rendering output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent

Returns

TemplatedAsyncAgentWrapper Wrapper that returns FlujoAgentResult (vendor-agnostic interface). The adapter pattern is used internally to isolate pydantic-ai specifics.

make_repair_agent

make_repair_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, str]'

Create the internal JSON repair agent.

get_repair_agent

get_repair_agent() -> 'AsyncAgentWrapper[Any, str]'

Lazily create the internal repair agent.

get_raw_output_from_exception

get_raw_output_from_exception(exc: Exception) -> str

Best-effort extraction of raw output from validation-related exceptions.

make_review_agent

make_review_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, Checklist]'

Create a review agent with default settings.

make_solution_agent

make_solution_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, str]'

Create a solution agent with default settings.

make_validator_agent

make_validator_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, Checklist]'

Create a validator agent with default settings.

get_reflection_agent

get_reflection_agent(model: str | None = None) -> AsyncAgentProtocol[Any, Any] | NoOpReflectionAgent

Returns a new instance of the reflection agent, or a no-op if disabled.

make_self_improvement_agent

make_self_improvement_agent(model: str | None = None) -> 'AsyncAgentWrapper[Any, ImprovementReport]'

Create the SelfImprovementAgent.

__getattr__

__getattr__(name: str) -> Any

Handle access to removed global agent instances.

flujo.processors

Processor

Bases: Protocol

Generic processor interface.

process async

process(data: Any, context: Optional[BaseModel] = None) -> Any

Process data with optional pipeline context.

AddContextVariables

Prepend context variables to a prompt.

StripMarkdownFences

Extract content from a fenced code block.

EnforceJsonResponse

Ensure the output is valid JSON.

SerializePydantic

Serialize any object with a model_dump method to a plain dict.

DiffProcessor

Compute JSON patch operations between two JSON-like payloads.

DeterministicRepairProcessor

Tier-1 deterministic fixer for malformed JSON emitted by LLMs.

JsonRegionExtractorProcessor

Bases: Processor

Stage 0: Extract the largest balanced JSON object/array and bounded-unescape.

  • Scans input text to find the maximal balanced {...} or [...] region, ignoring braces/brackets inside quoted strings.
  • If both object and array candidates exist, prefer the one matching the schema root when provided via expected_root.
  • Applies bounded unescape for double-encoded JSON (up to max_unescape_depth).

This is a safe, deterministic pre-stage that often enables plain JSON decoding to succeed without LLM-based repair.

SmartTypeCoercionProcessor

Bases: Processor

Stage 3: Schema-aware, safe type coercion (Ajv-style, minimal version).

This minimal implementation applies whitelisted, unambiguous conversions without requiring a full JSON Schema. It supports per-type allowlists: - integer: ["str->int"] - number: ["str->float"] - boolean: ["str->bool"] - array: ["str->array"] (string that looks like JSON array)

A future iteration can accept and use a JSON Schema to narrow conversions by field paths and support anyOf/oneOf branch selection.

TolerantJsonDecoderProcessor

Bases: Processor

Stage 1: Tiered tolerant JSON decode.

Order: - orjson.loads (fast path) → fallback to json.loads - if tolerant_level >= 1: try json5/pyjson5 - if tolerant_level >= 2: try json-repair (log patch preview)

Returns decoded Python object on success; otherwise returns the original input.

flujo.steps

Utilities and Tools

flujo.utils

Flujo utilities.

run_sync

run_sync(coro: Coroutine[object, object, T], *, running_loop_error: str | None = None) -> T

Run an async coroutine from synchronous code using anyio.

Raises:

Type Description
TypeError

If called from a running event loop thread.

format_prompt

format_prompt(template: str, **kwargs: Any) -> str

Convenience wrapper around :class:AdvancedPromptFormatter.

This helper respects the global template configuration from flujo.toml, including strict mode and resolution logging.

Parameters

template: Template string to render. **kwargs: Values referenced inside the template.

Returns

str The rendered template.

summarize_and_redact_prompt

summarize_and_redact_prompt(prompt_text: str, max_length: int = 200, settings: Optional[Settings] = None) -> str

Return a truncated and redacted version of a prompt.

create_field_serializer

create_field_serializer(field_name: str, serializer_func: Callable[[Any], Any]) -> Callable[[Any], Any]

Helper for Pydantic @field_serializer style usage.

create_serializer_for_type

create_serializer_for_type(obj_type: type[Any], serializer_func: Callable[[Any], Any]) -> Callable[[Any], Any]

Return a serializer that dispatches to serializer_func when type matches.

lookup_custom_serializer

lookup_custom_serializer(value: Any) -> Optional[Callable[[Any], Any]]

Return the first matching serializer for value's type (exact or subclass).

lookup_custom_deserializer

lookup_custom_deserializer(obj_type: type[Any]) -> Optional[Callable[[Any], Any]]

Return the first matching deserializer for obj_type (exact or subclass).

register_custom_serializer

register_custom_serializer(obj_type: type[Any], serializer_func: Callable[[Any], Any]) -> None

Register a serializer for a specific type.

register_custom_deserializer

register_custom_deserializer(obj_type: type[Any], deserializer_func: Callable[[Any], Any]) -> None

Register a deserializer for a specific type.

reset_custom_serializer_registry

reset_custom_serializer_registry() -> None

Reset serializer/deserializer registries (testing helper).

safe_deserialize

safe_deserialize(serialized_data: Any, target_type: Optional[type[Any]] = None, default_deserializer: Optional[Callable[[Any], Any]] = None) -> Any

Best-effort deserialization counterpart for _serialize_for_json.

flujo.tracing

flujo.telemetry

Telemetry integrations for production monitoring.

OpenTelemetryHook

Fallback when OpenTelemetry is not available.

StateBackendSpanExporter

Fallback when OpenTelemetry is not available.

CLI and Testing

flujo.cli

CLI package.

flujo.testing

Testing utilities for flujo.

StubAgent

Simple agent for testing that returns preset outputs.

DummyPlugin

Bases: ValidationPlugin

A validation plugin used for testing.

override_agent

override_agent(step: Any, new_agent: Any) -> Iterator[None]

Temporarily override the agent of a Step within a context.

assert_validator_failed

assert_validator_failed(result: PipelineResult[Any], validator_name: str, expected_feedback_part: Optional[str] = None) -> None

Assert that a specific validator failed during the run.

assert_context_updated

assert_context_updated(result: PipelineResult[Any], **expected_updates: Any) -> None

Assert that the final context contains the expected updates.

Recipes and Plugins

flujo.recipes

Recipe modules for common workflow patterns.

This module provides factory functions for common workflow patterns.

RECOMMENDED: Use the factory functions for better transparency, composability, and future YAML/AI support: - make_default_pipeline() - Creates a Review → Solution → Validate pipeline - make_agentic_loop_pipeline() - Creates an explorative agent workflow - run_default_pipeline() - Executes a default pipeline - run_agentic_loop_pipeline() - Executes an agentic loop pipeline

make_default_pipeline

make_default_pipeline(review_agent: 'AsyncAgentProtocol[Any, Any]', solution_agent: 'AsyncAgentProtocol[Any, Any]', validator_agent: 'AsyncAgentProtocol[Any, Any]', reflection_agent: Optional['AsyncAgentProtocol[Any, Any]'] = None, max_retries: int = 3, k_variants: int = 1, max_iters: int = 3, reflection_limit: Optional[int] = None) -> Pipeline[str, Checklist]

Create a default Review → Solution → Validate pipeline.

Parameters:

Name Type Description Default
review_agent 'AsyncAgentProtocol[Any, Any]'

Agent that creates a checklist of requirements

required
solution_agent 'AsyncAgentProtocol[Any, Any]'

Agent that generates a solution

required
validator_agent 'AsyncAgentProtocol[Any, Any]'

Agent that validates the solution against requirements

required
reflection_agent Optional['AsyncAgentProtocol[Any, Any]']

Optional agent for reflection/improvement

None
max_retries int

Maximum retries for each step

3
k_variants int

Number of solution variants to generate per iteration

1
max_iters int

Maximum number of iterations for improvement

3
reflection_limit Optional[int]

Optional limit on reflection iterations

None

Returns:

Type Description
Pipeline[str, Checklist]

A Pipeline object that can be inspected, composed, and executed

make_state_machine_pipeline

make_state_machine_pipeline(*, nodes: dict[str, Step[Any, Any] | Pipeline[Any, Any]], context_model: type[PipelineContext], router_field: str = 'next_state', end_state_field: str = 'is_complete', max_loops: int = 50) -> Pipeline[Any, Any]

Create a simple state machine pipeline.

Each iteration runs the state pipeline specified in context.<router_field>. The loop exits when context.<end_state_field> evaluates to True.

make_agentic_loop_pipeline

make_agentic_loop_pipeline(planner_agent: 'AsyncAgentProtocol[Any, Any]', agent_registry: dict[str, 'AsyncAgentProtocol[Any, Any]'], max_loops: int = 10, max_retries: int = 3) -> Pipeline[str, Any]

Create an agentic loop pipeline for explorative workflows.

Parameters:

Name Type Description Default
planner_agent 'AsyncAgentProtocol[Any, Any]'

Agent that decides what command to run next

required
agent_registry dict[str, 'AsyncAgentProtocol[Any, Any]']

Dictionary of available agents to execute

required
max_loops int

Maximum number of loop iterations

10
max_retries int

Maximum retries for each step

3

Returns:

Type Description
Pipeline[str, Any]

A Pipeline object with a LoopStep containing the agentic logic

run_default_pipeline async

run_default_pipeline(pipeline: Pipeline[str, Checklist], task: Task) -> Optional[Candidate]

Run a default pipeline and return the result.

Parameters:

Name Type Description Default
pipeline Pipeline[str, Checklist]

Pipeline created by make_default_pipeline

required
task Task

Task to execute

required

Returns:

Type Description
Optional[Candidate]

Candidate with solution and checklist, or None if failed

run_agentic_loop_pipeline async

run_agentic_loop_pipeline(pipeline: Pipeline[str, Any], initial_goal: str, resume_from: Optional[PipelineResult[PipelineContext]] = None, human_input: Optional[str] = 'human') -> PipelineResult[PipelineContext]

Run an agentic loop pipeline and return the result.

Parameters:

Name Type Description Default
pipeline Pipeline[str, Any]

Pipeline created by make_agentic_loop_pipeline

required
initial_goal str

Initial goal for the agentic loop

required
resume_from Optional[PipelineResult[PipelineContext]]

Optional paused result to resume from

None
human_input Optional[str]

Human input to provide when resuming (default: "human")

'human'

Returns:

Type Description
PipelineResult[PipelineContext]

PipelineResult with the execution context and results

flujo.plugins

Built-in plugins for flujo.

Models and Prompts

flujo.models

Public facade for Flujo domain models.

This module is a stable import path for user-facing types.

Task

Bases: BaseModel

Represents a task to be solved by the orchestrator.

Candidate

Bases: BaseModel

Represents a potential solution and its evaluation metadata.

Checklist

Bases: BaseModel

A checklist for evaluating a solution.

ChecklistItem

Bases: BaseModel

A single item in a checklist for evaluating a solution.

SearchNode

Bases: BaseModel

A single node in a tree search frontier.

attach_context

attach_context(context: Optional['PipelineContext']) -> None

Attach runtime context and capture a snapshot for persistence.

rehydrate_context

rehydrate_context(context_type: type['PipelineContext']) -> Optional['PipelineContext']

Rebuild runtime context from the stored snapshot if needed.

SearchState

Bases: BaseModel

Persistent tree search state stored in the pipeline context.

sorted_open_nodes

sorted_open_nodes() -> list[SearchNode]

Return open nodes sorted by A* priority.

pop_best_open

pop_best_open() -> SearchNode | None

Pop the best node from the open set.

PipelineResult

Bases: BaseModel, Generic[ContextT]

Aggregated result of running a pipeline.

For backward compatibility, this object exposes a top-level success flag that reflects overall pipeline status (computed by callers/runners). Some older tests and integrations expect result.success to exist.

status property

status: str

Best-effort status indicator for backward compatibility.

output property

output: Any | None

Return the output of the last step in the pipeline.

This is a convenience property for backward compatibility with tests and code that expects result.output.

StepResult

Bases: BaseModel

Result of executing a single pipeline step.

metadata property

metadata: dict[str, Any]

Alias for metadata_ for backward compatibility and test expectations.

UsageLimits

Bases: BaseModel

Defines resource consumption limits for a pipeline run.

RefinementCheck

Bases: BaseModel

Standardized output from a critic pipeline in a refinement loop.

ImprovementSuggestion

Bases: BaseModel

A single suggestion from the SelfImprovementAgent.

ImprovementReport

Bases: BaseModel

Aggregated improvement suggestions returned by the agent.

HumanInteraction

Bases: BaseModel

Records a single human interaction in a HITL conversation.

PipelineContext

Bases: BaseModel

Runtime context shared by all steps in a pipeline run.

The base PipelineContext tracks essential execution metadata and is automatically created for every call to :meth:Flujo.run. Custom context models should inherit from this class to add application specific fields while retaining the built in ones.

steps property

steps: JSONObject

Expose recorded step outputs.

sandbox property

sandbox: SandboxProtocol | None

Return the sandbox handle attached to this context when available.

retrieve async

retrieve(query_text: str | None = None, *, query_vector: list[float] | None = None, limit: int = 5) -> list[ScoredMemory]

Retrieve memories using the configured memory store.

If query_vector is not provided, an embedding model will be used when available. Returns an empty list when no memory store is configured.

EvaluationReport

Bases: EvaluationScore

Detailed evaluation report for multi-signal scoring.

EvaluationScore

Bases: BaseModel

Structured evaluation returned by the shadow judge.

flujo.prompts

System prompts for Flujo agents.

This module contains all the default system prompts used by Flujo agents. Users can import and override these prompts for their own use cases.

flujo.signature_tools

analyze_signature

analyze_signature(func: Callable[..., Any]) -> SignatureAnalysis

Inspect func and determine its pipeline injection requirements.

Parameters

func: Callable to inspect. It may be a standard function or a callable object.

Returns

SignatureAnalysis Named tuple describing whether context or resources keyword parameters are required and the inferred input/output types.

Raises

ConfigurationError If context or resources parameters are annotated with invalid types.