Skip to content

API Reference

Welcome to the Flujo API reference. This documentation is automatically generated from the source code using mkdocstrings.

Core Modules

flujo.application

Application-level components for flujo.

Flujo

Bases: Generic[RunnerInT, RunnerOutT, ContextT]

Execute a pipeline sequentially.

Parameters

pipeline : Pipeline | Step | None, optional Pipeline object to run directly. Deprecated when using registry. registry : PipelineRegistry, optional Registry holding named pipelines. pipeline_name : str, optional Name of the pipeline registered in registry. pipeline_version : str, default "latest" Version to load from the registry when the run starts. state_backend : StateBackend, optional Backend used to persist :class:WorkflowState for durable execution. delete_on_completion : bool, default False If True remove persisted state once the run finishes.

disable_tracing

disable_tracing() -> None

Disable tracing by removing the TraceManager hook.

run_async async

run_async(
    initial_input: RunnerInT,
    *,
    run_id: str | None = None,
    initial_context_data: Optional[Dict[str, Any]] = None,
) -> AsyncIterator[PipelineResult[ContextT]]

Run the pipeline asynchronously.

Parameters

run_id: Optional identifier for this run. When provided the runner will load and persist state under this ID, enabling durable execution without embedding the ID in the context model.

This method should be used when an asyncio event loop is already running, such as within Jupyter notebooks or async web frameworks.

It yields any streaming output from the final step and then the final PipelineResult object.

run

run(
    initial_input: RunnerInT,
    *,
    run_id: str | None = None,
    initial_context_data: Optional[Dict[str, Any]] = None,
) -> PipelineResult[ContextT]

Run the pipeline synchronously.

This helper should only be called from code that is not already running inside an asyncio event loop. If a running loop is detected a TypeError is raised instructing the user to use run_async instead.

resume_async async

resume_async(
    paused_result: PipelineResult[ContextT],
    human_input: Any,
) -> PipelineResult[ContextT]

Resume a paused pipeline with human input.

as_step

as_step(
    name: str,
    *,
    inherit_context: bool = True,
    **kwargs: Any,
) -> Step[RunnerInT, PipelineResult[ContextT]]

Return this Flujo runner as a composable :class:Step.

Parameters

name: Name of the resulting step. **kwargs: Additional Step configuration passed to :class:Step.

Returns

Step Step that executes this runner when invoked inside another pipeline.

SelfImprovementAgent

Agent that analyzes failures and suggests improvements.

run_pipeline_async async

run_pipeline_async(
    inputs: Any, *, runner: Flujo[Any, Any, Any]
) -> PipelineResult[Any]

Adapter to run a :class:Flujo engine as a pydantic-evals task.

evaluate_and_improve async

evaluate_and_improve(
    task_function: Callable[
        [Any], Awaitable[PipelineResult[Any]]
    ],
    dataset: Any,
    improvement_agent: SelfImprovementAgent,
    pipeline_definition: Optional[
        Pipeline[Any, Any] | Step[Any, Any]
    ] = None,
) -> ImprovementReport

Run dataset evaluation and return improvement suggestions.

flujo.domain

Domain components for flujo.

Step

Bases: BaseModel, Generic[StepInT, StepOutT]

Declarative node in a pipeline.

A Step holds a reference to the agent that will execute, configuration such as retries and timeout, and optional plugins. It does not execute anything by itself. Steps are composed into :class:Pipeline objects and run by the :class:~flujo.application.runner.Flujo engine.

Use :meth:arun to invoke the underlying agent directly during unit tests.

__call__

__call__(*args: Any, **kwargs: Any) -> Any

Disallow direct invocation of a Step.

__getattr__

__getattr__(item: str) -> Any

Raise a helpful error when trying to access non-existent attributes.

arun async

arun(data: StepInT, **kwargs: Any) -> StepOutT

Run this step's agent directly for testing purposes.

fallback

fallback(
    fallback_step: "Step[Any, Any]",
) -> "Step[StepInT, StepOutT]"

Set a fallback step to execute if this step fails.

Parameters:

Name Type Description Default
fallback_step 'Step[Any, Any]'

The step to execute if this step fails

required

Returns:

Type Description
'Step[StepInT, StepOutT]'

self for method chaining

add_plugin

add_plugin(
    plugin: "ValidationPlugin",
) -> "Step[StepInT, StepOutT]"

Add a validation plugin to this step.

Parameters:

Name Type Description Default
plugin 'ValidationPlugin'

The validation plugin to add

required

Returns:

Type Description
'Step[StepInT, StepOutT]'

self for method chaining

review classmethod

review(
    agent: AsyncAgentProtocol[Any, Any],
    *,
    plugins: Optional[
        list[tuple[ValidationPlugin, int]]
    ] = None,
    validators: Optional[List[Validator]] = None,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    **config: Any,
) -> "Step[Any, Any]"

Construct a review step using the provided agent.

solution classmethod

solution(
    agent: AsyncAgentProtocol[Any, Any],
    *,
    plugins: Optional[
        list[tuple[ValidationPlugin, int]]
    ] = None,
    validators: Optional[List[Validator]] = None,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    **config: Any,
) -> "Step[Any, Any]"

Construct a solution step using the provided agent.

validate_step classmethod

validate_step(
    agent: AsyncAgentProtocol[Any, Any],
    *,
    plugins: Optional[
        list[tuple[ValidationPlugin, int]]
    ] = None,
    validators: Optional[List[Validator]] = None,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    strict: bool = True,
    **config: Any,
) -> "Step[Any, Any]"

Construct a validation step using the provided agent.

from_callable classmethod

from_callable(
    callable_: Callable[
        Concatenate[StepInT, P],
        Coroutine[Any, Any, StepOutT],
    ],
    name: str | None = None,
    updates_context: bool = False,
    validate_fields: bool = False,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    is_adapter: bool = False,
    **config: Any,
) -> "Step[StepInT, StepOutT]"

Create a Step from an async callable.

from_mapper classmethod

from_mapper(
    mapper: Callable[
        Concatenate[StepInT, P],
        Coroutine[Any, Any, StepOutT],
    ],
    name: str | None = None,
    updates_context: bool = False,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    **config: Any,
) -> "Step[StepInT, StepOutT]"

Alias for :meth:from_callable to improve readability.

human_in_the_loop classmethod

human_in_the_loop(
    name: str,
    message_for_user: str | None = None,
    input_schema: Type[BaseModel] | None = None,
) -> "HumanInTheLoopStep"

Construct a HumanInTheLoop step.

refine_until classmethod

refine_until(
    name: str,
    generator_pipeline: "Pipeline[Any, Any]",
    critic_pipeline: "Pipeline[Any, RefinementCheck]",
    max_refinements: int = 5,
    feedback_mapper: Optional[
        Callable[[Any, RefinementCheck], Any]
    ] = None,
    **config_kwargs: Any,
) -> "LoopStep[ContextModelT]"

Convenience for the generator -> critic refinement loop pattern.

use_input

use_input(key: str) -> 'Pipeline[Any, StepOutT]'

Create a small adapter pipeline that selects a key from a dict input.

This is a common pattern when working with :meth:parallel branches where each branch only needs a portion of the upstream output.

gather classmethod

gather(
    name: str, *, wait_for: List[str], **config_kwargs: Any
) -> "Step[Any, Dict[str, Any]]"

Collect outputs from multiple parallel branches.

The step expects a dictionary input (e.g. from :meth:parallel) and returns a dictionary containing only the specified keys.

StepConfig

Bases: BaseModel

Configuration options applied to every step.

Parameters

max_retries: How many times the step should be retried on failure. timeout_s: Optional timeout in seconds for the agent execution. temperature: Optional temperature setting for LLM based agents.

Task

Bases: BaseModel

Represents a task to be solved by the orchestrator.

Candidate

Bases: BaseModel

Represents a potential solution and its evaluation metadata.

Checklist

Bases: BaseModel

A checklist for evaluating a solution.

ChecklistItem

Bases: BaseModel

A single item in a checklist for evaluating a solution.

PipelineResult

Bases: BaseModel, Generic[ContextT]

Aggregated result of running a pipeline.

StepResult

Bases: BaseModel

Result of executing a single pipeline step.

UsageLimits

Bases: BaseModel

Defines resource consumption limits for a pipeline run.

ExecutedCommandLog

Bases: BaseModel

Structured log entry for a command executed in the loop.

ExecutionBackend

Bases: Protocol

Protocol for executing pipeline steps.

execute_step async

execute_step(request: StepExecutionRequest) -> StepResult

Execute a single step and return the result.

StepExecutionRequest dataclass

Serializable request for executing a single step.

Contains the step to execute, input data, context object, resources, and execution configuration.

AgentProcessors

Bases: BaseModel

Collections of prompt and output processors.

PluginOutcome

Bases: BaseModel

Result returned by a validation plugin.

ValidationPlugin

Bases: Protocol

Protocol that all validation plugins must implement.

Validator

Bases: Protocol

A generic, stateful protocol for any component that can validate a step's output.

validate async

validate(
    output_to_check: Any,
    *,
    context: Optional[BaseModel] = None,
) -> ValidationResult

Validates the given output.

ValidationResult

Bases: BaseModel

The standard output from any validator, providing a clear pass/fail signal and feedback.

ValidationFinding

Bases: BaseModel

Represents a single validation finding.

ValidationReport

Bases: BaseModel

Aggregated validation results for a pipeline.

AppResources

Bases: BaseModel

Base class for user-defined resource containers.

adapter_step

adapter_step(
    func: Callable[
        Concatenate[StepInT, P],
        Coroutine[Any, Any, StepOutT],
    ],
    *,
    name: str | None = None,
    updates_context: bool = False,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    **config_kwargs: Any,
) -> "Step[StepInT, StepOutT]"
adapter_step(
    *,
    name: str | None = None,
    updates_context: bool = False,
    processors: Optional[AgentProcessors] = None,
    persist_feedback_to_context: Optional[str] = None,
    persist_validation_results_to: Optional[str] = None,
    **config_kwargs: Any,
) -> Callable[
    [
        Callable[
            Concatenate[StepInT, P],
            Coroutine[Any, Any, StepOutT],
        ]
    ],
    "Step[StepInT, StepOutT]",
]
adapter_step(
    func: Callable[
        Concatenate[StepInT, P],
        Coroutine[Any, Any, StepOutT],
    ]
    | None = None,
    **kwargs: Any,
) -> Any

Alias for :func:step that marks the created step as an adapter.

flujo.exceptions

Custom exceptions for the orchestrator.

OrchestratorError

Bases: Exception

Base exception for the application.

SettingsError

Bases: OrchestratorError

Raised for configuration-related errors.

OrchestratorRetryError

Bases: OrchestratorError

Raised when an agent operation fails after all retries.

RewardModelUnavailable

Bases: OrchestratorError

Raised when the reward model is required but unavailable.

FeatureDisabled

Bases: OrchestratorError

Raised when a disabled feature is invoked.

ConfigurationError

Bases: SettingsError

Raised when a required configuration for a provider is missing.

PricingNotConfiguredError

Bases: ConfigurationError

Raised when strict pricing mode is enabled but pricing is not configured for a model.

InfiniteRedirectError

Bases: OrchestratorError

Raised when a redirect loop is detected in pipeline execution.

InfiniteFallbackError

Bases: OrchestratorError

Raised when a fallback loop is detected during execution.

PipelineContextInitializationError

Bases: OrchestratorError

Raised when a typed pipeline context fails to initialize.

ContextInheritanceError

Bases: OrchestratorError

Raised when inheriting context for a nested pipeline fails.

UsageLimitExceededError

Bases: OrchestratorError

Raised when a pipeline run exceeds its defined usage limits.

PipelineAbortSignal

Bases: Exception

Special exception hooks can raise to stop a pipeline gracefully.

PausedException

Bases: OrchestratorError

Internal exception used to pause a pipeline.

ImproperStepInvocationError

Bases: OrchestratorError

Raised when a Step object is invoked directly.

MissingAgentError

Bases: ConfigurationError

Raised when a pipeline step is missing its agent.

TypeMismatchError

Bases: ConfigurationError

Raised when consecutive steps have incompatible types.

AgentIOValidationError

Bases: OrchestratorError

Raised when an agent's input or output validation fails.

FlujoFrameworkError

Bases: Exception

Base exception for Flujo framework with enhanced error messages.

ContextFieldError

Bases: FlujoFrameworkError

Raised when trying to set a field that doesn't exist in the context.

StepInvocationError

Bases: FlujoFrameworkError

Unified exception for step invocation errors.

This exception replaces ImproperStepInvocationError for consistency and provides enhanced error messages for better debugging.

Note: ImproperStepInvocationError is deprecated and will be removed in a future version. Use StepInvocationError for new code.

ParallelStepError

Bases: FlujoFrameworkError

Raised when there's an issue with parallel step execution.

Infrastructure

flujo.infra

Infrastructure components for flujo.

LocalBackend

Bases: ExecutionBackend

Backend that executes steps in the current process.

init_telemetry

init_telemetry(
    settings_obj: Optional[Settings] = None,
) -> None

Configure global logging and tracing for the process.

Call once at application startup. If settings_obj is not provided the default :class:~flujo.infra.settings.Settings object is used. When telemetry is enabled the real logfire library is configured, otherwise a fallback logger that proxies to logging is provided.

make_review_agent

make_review_agent(
    model: str | None = None,
) -> AsyncAgentWrapper[Any, Checklist]

Create a review agent with default settings.

make_solution_agent

make_solution_agent(
    model: str | None = None,
) -> AsyncAgentWrapper[Any, str]

Create a solution agent with default settings.

make_validator_agent

make_validator_agent(
    model: str | None = None,
) -> AsyncAgentWrapper[Any, Checklist]

Create a validator agent with default settings.

get_reflection_agent

get_reflection_agent(
    model: str | None = None,
) -> AsyncAgentProtocol[Any, Any] | NoOpReflectionAgent

Returns a new instance of the reflection agent, or a no-op if disabled.

make_agent_async

make_agent_async(
    model: str,
    system_prompt: str,
    output_type: Type[Any],
    max_retries: int = 3,
    timeout: int | None = None,
    processors: Optional[AgentProcessors] = None,
    auto_repair: bool = True,
    **kwargs: Any,
) -> AsyncAgentWrapper[Any, Any]

Creates a pydantic_ai.Agent and returns an AsyncAgentWrapper exposing .run_async.

Parameters

model : str The model identifier (e.g., "openai:gpt-4o") system_prompt : str The system prompt for the agent output_type : Type[Any] The expected output type max_retries : int, optional Maximum number of retries for failed calls timeout : int, optional Timeout in seconds for agent calls processors : Optional[AgentProcessors], optional Custom processors for the agent auto_repair : bool, optional Whether to enable automatic repair of failed outputs **kwargs : Any Additional arguments to pass to the underlying pydantic_ai.Agent (e.g., temperature, model_settings, max_tokens, etc.)

load_settings

load_settings(force_reload: bool = False) -> Any

Load settings with configuration file overrides. If force_reload is True, reload config/settings.

get_cli_defaults

get_cli_defaults(
    command: str, force_reload: bool = False
) -> Dict[str, Any]

Get CLI defaults for a specific command. If force_reload is True, reload config/settings.

get_state_uri

get_state_uri(force_reload: bool = False) -> Optional[str]

Get the state URI from configuration. If force_reload is True, reload config/settings.

flujo.state

WorkflowState

Bases: BaseModel

Serialized snapshot of a running workflow.

StateBackend

Bases: ABC

Abstract base class for state backends.

State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.

save_state abstractmethod async

save_state(run_id: str, state: Dict[str, Any]) -> None

Save workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state Dict[str, Any]

Dictionary containing workflow state data

required

load_state abstractmethod async

load_state(run_id: str) -> Optional[Dict[str, Any]]

Load workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary containing workflow state data, or None if not found

delete_state abstractmethod async

delete_state(run_id: str) -> None

Delete workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

list_workflows async

list_workflows(
    status: Optional[str] = None,
    pipeline_id: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

List workflows with optional filtering and pagination.

get_workflow_stats async

get_workflow_stats() -> Dict[str, Any]

Get statistics about stored workflows.

cleanup_old_workflows async

cleanup_old_workflows(days_old: int = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

get_failed_workflows async

get_failed_workflows(
    hours_back: int = 24,
) -> List[Dict[str, Any]]

Get failed workflows from the last N hours with error details.

get_trace abstractmethod async

get_trace(run_id: str) -> Any

Retrieve and deserialize the trace tree for a given run_id.

save_trace abstractmethod async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
trace Dict[str, Any]

Dictionary containing trace tree data

required

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

save_run_start async

save_run_start(run_data: Dict[str, Any]) -> None

Persist initial run metadata.

save_step_result async

save_step_result(step_data: Dict[str, Any]) -> None

Persist a single step execution record.

save_run_end async

save_run_end(run_id: str, end_data: Dict[str, Any]) -> None

Update run metadata when execution finishes.

get_run_details async

get_run_details(run_id: str) -> Optional[Dict[str, Any]]

Retrieve stored metadata for a run.

list_run_steps async

list_run_steps(run_id: str) -> List[Dict[str, Any]]

Return all step records for a run ordered by step index.

InMemoryBackend

Bases: StateBackend

Simple in-memory backend for testing and defaults.

This backend mirrors the serialization logic of the persistent backends by storing a serialized copy of the workflow state. Values are serialized with safe_serialize on save and reconstructed with safe_deserialize when loaded.

get_trace async

get_trace(run_id: str) -> Any

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

FileBackend

Bases: StateBackend

Persist workflow state to JSON files.

get_trace async

get_trace(run_id: str) -> Optional[Dict[str, Any]]

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

SQLiteBackend

Bases: StateBackend

SQLite-backed persistent storage for workflow state with optimized schema.

close async

close() -> None

Close database connections and cleanup resources.

__aenter__ async

__aenter__() -> 'SQLiteBackend'

Async context manager entry.

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit with cleanup.

save_state async

save_state(run_id: str, state: Dict[str, Any]) -> None

Save workflow state to the database.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state Dict[str, Any]

Dictionary containing workflow state data

required

delete_state async

delete_state(run_id: str) -> None

Delete workflow state.

list_states async

list_states(
    status: Optional[str] = None,
) -> List[Dict[str, Any]]

List workflow states with optional status filter.

list_workflows async

list_workflows(
    status: Optional[str] = None,
    pipeline_id: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

Enhanced workflow listing with additional filters and metadata.

list_runs async

list_runs(
    status: Optional[str] = None,
    pipeline_name: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

List runs from the new structured schema for lens CLI.

get_workflow_stats async

get_workflow_stats() -> Dict[str, Any]

Get comprehensive workflow statistics.

get_failed_workflows async

get_failed_workflows(
    hours_back: int = 24,
) -> List[Dict[str, Any]]

Get failed workflows from the last N hours.

cleanup_old_workflows async

cleanup_old_workflows(days_old: float = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Persist a trace tree as normalized spans for a given run_id.

get_trace async

get_trace(run_id: str) -> Optional[Dict[str, Any]]

Retrieve and reconstruct the trace tree for a given run_id. Audit log access.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering. Audit log export.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

delete_run async

delete_run(run_id: str) -> None

Delete a run from the runs table (cascades to traces). Audit log deletion.

Agents and Processing

flujo.agents

Agent utilities including validation and monitoring decorators.

validated_agent

validated_agent(
    input_model: Type[BaseModel],
    output_model: Type[BaseModel],
) -> Callable[[Type[_Any]], Type[_Any]]

Class decorator that validates the run method's input and output.

monitored_agent

monitored_agent(
    agent_name: str,
) -> Callable[[Type[_Any]], Type[_Any]]

Class decorator that records telemetry for the agent's run method.

flujo.processors

Processor

Bases: Protocol

Generic processor interface.

process async

process(
    data: Any, context: Optional[BaseModel] = None
) -> Any

Process data with optional pipeline context.

AddContextVariables

Prepend context variables to a prompt.

StripMarkdownFences

Extract content from a fenced code block.

EnforceJsonResponse

Ensure the output is valid JSON.

SerializePydantic

Serialize any object with a model_dump method to a plain dict.

DeterministicRepairProcessor

Tier-1 deterministic fixer for malformed JSON emitted by LLMs.

flujo.steps

Utilities and Tools

flujo.utils

Flujo utilities.

format_prompt

format_prompt(template: str, **kwargs: Any) -> str

Convenience wrapper around :class:AdvancedPromptFormatter.

Parameters

template: Template string to render. **kwargs: Values referenced inside the template.

Returns

str The rendered template.

summarize_and_redact_prompt

summarize_and_redact_prompt(
    prompt_text: str,
    max_length: int = 200,
    settings: Optional[Settings] = None,
) -> str

Return a truncated and redacted version of a prompt.

create_field_serializer

create_field_serializer(
    field_name: str, serializer_func: Callable[[Any], Any]
) -> Callable[[Any], Any]

Create a field_serializer method for a specific field.

Parameters:

Name Type Description Default
field_name str

Name of the field to serialize

required
serializer_func Callable[[Any], Any]

Function that serializes the field value

required

Returns:

Type Description
Callable[[Any], Any]

A serializer function that can be used within field_serializer methods

create_serializer_for_type

create_serializer_for_type(
    obj_type: Type[Any],
    serializer_func: Callable[[Any], Any],
) -> Callable[[Any], Any]

Create a serializer function that handles a specific type.

Parameters:

Name Type Description Default
obj_type Type[Any]

The type to create a serializer for

required
serializer_func Callable[[Any], Any]

Function that serializes the type

required

Returns:

Type Description
Callable[[Any], Any]

A serializer function that handles the specific type

lookup_custom_serializer

lookup_custom_serializer(
    value: Any,
) -> Optional[Callable[[Any], Any]]

Look up a registered serializer for a value's type.

Parameters:

Name Type Description Default
value Any

The value to find a serializer for

required

Returns:

Type Description
Optional[Callable[[Any], Any]]

The registered serializer function, or None if not found

Example

serializer = lookup_custom_serializer(some_value) if serializer: ... result = serializer(some_value)

lookup_custom_deserializer

lookup_custom_deserializer(
    obj_type: Type[Any],
) -> Optional[Callable[[Any], Any]]

Look up a registered deserializer for a type.

Parameters:

Name Type Description Default
obj_type Type[Any]

The type to find a deserializer for

required

Returns:

Type Description
Optional[Callable[[Any], Any]]

The registered deserializer function, or None if not found

Example

deserializer = lookup_custom_deserializer(MyCustomType) if deserializer: ... result = deserializer(serialized_data)

register_custom_serializer

register_custom_serializer(
    obj_type: Type[Any],
    serializer_func: Callable[[Any], Any],
) -> None

Register a custom serializer for a specific type globally.

This function registers a serializer that will be used by safe_serialize and other serialization functions when encountering objects of the specified type.

Parameters:

Name Type Description Default
obj_type Type[Any]

The type to register a serializer for

required
serializer_func Callable[[Any], Any]

Function that converts the type to a serializable format

required
Example

from datetime import datetime def serialize_datetime(dt: datetime) -> str: ... return dt.strftime("%Y-%m-%d %H:%M:%S") register_custom_serializer(datetime, serialize_datetime)

register_custom_deserializer

register_custom_deserializer(
    obj_type: Type[Any],
    deserializer_func: Callable[[Any], Any],
) -> None

Register a custom deserializer for a specific type globally.

This function registers a deserializer that will be used by reconstruction functions when encountering serialized data that should be converted back to the original type.

Parameters:

Name Type Description Default
obj_type Type[Any]

The type to register a deserializer for

required
deserializer_func Callable[[Any], Any]

Function that converts serialized data back to the original type

required
Example

from datetime import datetime def deserialize_datetime(data: str) -> datetime: ... return datetime.fromisoformat(data) register_custom_deserializer(datetime, deserialize_datetime)

safe_deserialize

safe_deserialize(
    serialized_data: Any,
    target_type: Optional[Type[Any]] = None,
    default_deserializer: Optional[
        Callable[[Any], Any]
    ] = None,
) -> Any

Safely deserialize an object with intelligent fallback handling.

This function provides robust deserialization for: - Pydantic models (v1 and v2) - Dataclasses - Lists, tuples, sets, frozensets, dicts - Enums - Special float values (inf, -inf, nan) - Primitives (str, int, bool, None) - Datetime objects (datetime, date, time) - Bytes and memoryview objects - Complex numbers - Custom types registered via register_custom_deserializer

Parameters:

Name Type Description Default
serialized_data Any

The serialized data to deserialize

required
target_type Optional[Type[Any]]

Optional type hint for the expected result type

None
default_deserializer Optional[Callable[[Any], Any]]

Optional custom deserializer for unknown types

None

Returns:

Type Description
Any

The deserialized object

Raises:

Type Description
TypeError

If object cannot be deserialized and no default_deserializer is provided

Note
  • Special float values (inf, -inf, nan) are converted from strings
  • Datetime objects are converted from ISO format strings
  • Bytes are converted from base64 strings
  • Complex numbers are converted from dict with 'real' and 'imag' keys
  • Custom types registered via register_custom_deserializer are automatically handled

safe_serialize

safe_serialize(
    obj: Any,
    default_serializer: Optional[
        Callable[[Any], Any]
    ] = None,
    _seen: Optional[Set[int]] = None,
    _recursion_depth: int = 0,
    circular_ref_placeholder: Any = "<circular-ref>",
) -> Any

Safely serialize an object with intelligent fallback handling. Handles circular references robustly by only clearing the _seen set at the top-level call. The circular_ref_placeholder controls what is returned for circular references (default '').

serialize_to_json

serialize_to_json(obj: Any, **kwargs: Any) -> str

Serialize an object to a JSON string.

Parameters:

Name Type Description Default
obj Any

The object to serialize

required
**kwargs Any

Additional arguments to pass to json.dumps

{}

Returns:

Type Description
str

JSON string representation of the object

Raises:

Type Description
TypeError

If the object cannot be serialized to JSON

serialize_to_json_robust

serialize_to_json_robust(obj: Any, **kwargs: Any) -> str

Serialize an object to a JSON string using robust_serialize. Ensures the output is always valid JSON for roundtrip.

flujo.tracing

flujo.telemetry

Telemetry integrations for production monitoring.

OpenTelemetryHook

Fallback when OpenTelemetry is not available.

CLI and Testing

flujo.cli

CLI package.

flujo.testing

Testing utilities for flujo.

StubAgent

Simple agent for testing that returns preset outputs.

DummyPlugin

A validation plugin used for testing.

override_agent

override_agent(step: Any, new_agent: Any) -> Iterator[None]

Temporarily override the agent of a Step within a context.

assert_validator_failed

assert_validator_failed(
    result: PipelineResult[Any],
    validator_name: str,
    expected_feedback_part: Optional[str] = None,
) -> None

Assert that a specific validator failed during the run.

assert_context_updated

assert_context_updated(
    result: PipelineResult[Any], **expected_updates: Any
) -> None

Assert that the final context contains the expected updates.

Recipes and Plugins

flujo.recipes

Recipe modules for common workflow patterns.

This module provides factory functions for common workflow patterns.

RECOMMENDED: Use the factory functions for better transparency, composability, and future YAML/AI support: - make_default_pipeline() - Creates a Review → Solution → Validate pipeline - make_agentic_loop_pipeline() - Creates an explorative agent workflow - run_default_pipeline() - Executes a default pipeline - run_agentic_loop_pipeline() - Executes an agentic loop pipeline

Default

Default recipe for Review → Solution → Validate workflows.

DEPRECATED: This class-based approach is deprecated. Use the new factory functions for better transparency, composability, and future YAML/AI support:

  • Use make_default_pipeline() to create a Pipeline object
  • Use run_default_pipeline() to execute the pipeline

See flujo.recipes.factories for the new approach.

make_default_pipeline

make_default_pipeline(
    review_agent: "AsyncAgentProtocol[Any, Any]",
    solution_agent: "AsyncAgentProtocol[Any, Any]",
    validator_agent: "AsyncAgentProtocol[Any, Any]",
    reflection_agent: Optional[
        "AsyncAgentProtocol[Any, Any]"
    ] = None,
    max_retries: int = 3,
    k_variants: int = 1,
    max_iters: int = 3,
    reflection_limit: Optional[int] = None,
) -> Pipeline[str, Checklist]

Create a default Review → Solution → Validate pipeline.

Parameters:

Name Type Description Default
review_agent 'AsyncAgentProtocol[Any, Any]'

Agent that creates a checklist of requirements

required
solution_agent 'AsyncAgentProtocol[Any, Any]'

Agent that generates a solution

required
validator_agent 'AsyncAgentProtocol[Any, Any]'

Agent that validates the solution against requirements

required
reflection_agent Optional['AsyncAgentProtocol[Any, Any]']

Optional agent for reflection/improvement

None
max_retries int

Maximum retries for each step

3
k_variants int

Number of solution variants to generate per iteration

1
max_iters int

Maximum number of iterations for improvement

3
reflection_limit Optional[int]

Optional limit on reflection iterations

None

Returns:

Type Description
Pipeline[str, Checklist]

A Pipeline object that can be inspected, composed, and executed

make_state_machine_pipeline

make_state_machine_pipeline(
    *,
    nodes: Dict[str, Step[Any, Any] | Pipeline[Any, Any]],
    context_model: type[PipelineContext],
    router_field: str = "next_state",
    end_state_field: str = "is_complete",
    max_loops: int = 50,
) -> Pipeline[Any, Any]

Create a simple state machine pipeline.

Each iteration runs the state pipeline specified in context.<router_field>. The loop exits when context.<end_state_field> evaluates to True.

make_agentic_loop_pipeline

make_agentic_loop_pipeline(
    planner_agent: "AsyncAgentProtocol[Any, Any]",
    agent_registry: Dict[
        str, "AsyncAgentProtocol[Any, Any]"
    ],
    max_loops: int = 10,
    max_retries: int = 3,
) -> Pipeline[str, Any]

Create an agentic loop pipeline for explorative workflows.

Parameters:

Name Type Description Default
planner_agent 'AsyncAgentProtocol[Any, Any]'

Agent that decides what command to run next

required
agent_registry Dict[str, 'AsyncAgentProtocol[Any, Any]']

Dictionary of available agents to execute

required
max_loops int

Maximum number of loop iterations

10
max_retries int

Maximum retries for each step

3

Returns:

Type Description
Pipeline[str, Any]

A Pipeline object with a LoopStep containing the agentic logic

run_default_pipeline async

run_default_pipeline(
    pipeline: Pipeline[str, Checklist], task: Task
) -> Optional[Candidate]

Run a default pipeline and return the result.

Parameters:

Name Type Description Default
pipeline Pipeline[str, Checklist]

Pipeline created by make_default_pipeline

required
task Task

Task to execute

required

Returns:

Type Description
Optional[Candidate]

Candidate with solution and checklist, or None if failed

run_agentic_loop_pipeline async

run_agentic_loop_pipeline(
    pipeline: Pipeline[str, Any],
    initial_goal: str,
    resume_from: Optional[
        PipelineResult[PipelineContext]
    ] = None,
    human_input: Optional[str] = "human",
) -> PipelineResult[PipelineContext]

Run an agentic loop pipeline and return the result.

Parameters:

Name Type Description Default
pipeline Pipeline[str, Any]

Pipeline created by make_agentic_loop_pipeline

required
initial_goal str

Initial goal for the agentic loop

required
resume_from Optional[PipelineResult[PipelineContext]]

Optional paused result to resume from

None
human_input Optional[str]

Human input to provide when resuming (default: "human")

'human'

Returns:

Type Description
PipelineResult[PipelineContext]

PipelineResult with the execution context and results

flujo.plugins

Built-in plugins for flujo.

SQLSyntaxValidator

Validation plugin that checks SQL syntax.

Models and Prompts

flujo.models

Domain models for flujo.

Task

Bases: BaseModel

Represents a task to be solved by the orchestrator.

Candidate

Bases: BaseModel

Represents a potential solution and its evaluation metadata.

Checklist

Bases: BaseModel

A checklist for evaluating a solution.

ChecklistItem

Bases: BaseModel

A single item in a checklist for evaluating a solution.

PipelineResult

Bases: BaseModel, Generic[ContextT]

Aggregated result of running a pipeline.

StepResult

Bases: BaseModel

Result of executing a single pipeline step.

UsageLimits

Bases: BaseModel

Defines resource consumption limits for a pipeline run.

RefinementCheck

Bases: BaseModel

Standardized output from a critic pipeline in a refinement loop.

ImprovementSuggestion

Bases: BaseModel

A single suggestion from the SelfImprovementAgent.

ImprovementReport

Bases: BaseModel

Aggregated improvement suggestions returned by the agent.

HumanInteraction

Bases: BaseModel

Records a single human interaction in a HITL conversation.

PipelineContext

Bases: BaseModel

Runtime context shared by all steps in a pipeline run.

The base PipelineContext tracks essential execution metadata and is automatically created for every call to :meth:Flujo.run. Custom context models should inherit from this class to add application specific fields while retaining the built in ones.

Attributes

run_id: Unique identifier for the pipeline run. initial_prompt: First input provided to the run. Useful for logging and telemetry. scratchpad: Free form dictionary for transient state between steps. hitl_history: Records each human interaction when using HITL steps. command_log: Stores commands executed by an :class:~flujo.recipes.AgenticLoop.

flujo.prompts

System prompts for Flujo agents.

This module contains all the default system prompts used by Flujo agents. Users can import and override these prompts for their own use cases.

Core Utilities

flujo.caching

InMemoryCache

Bases: CacheBackend

Simple in-memory cache for step results.

flujo.console_tracer

ConsoleTracer

Configurable tracer that prints rich output to the console.

__init__

__init__(
    *,
    level: Literal["info", "debug"] = "debug",
    log_inputs: bool = True,
    log_outputs: bool = True,
    colorized: bool = True,
) -> None

Create the tracer.

Parameters

level: Output verbosity; either "info" or "debug". log_inputs: Whether to print step inputs when level is "debug". log_outputs: Whether to print step outputs when level is "debug". colorized: If True use colored output via Rich.

hook async

hook(payload: HookPayload) -> None

Dispatch hook payloads to the appropriate handler.

flujo.monitor

FlujoMonitor

Simple in-memory monitor for agent calls.

flujo.registry

PipelineRegistry

Simple in-memory registry for pipeline objects.

register

register(
    pipeline: Pipeline[Any, Any], name: str, version: str
) -> None

Register pipeline under name and version.

get

get(
    name: str, version: str
) -> Optional[Pipeline[Any, Any]]

Return the pipeline registered for name and version if present.

get_latest_version

get_latest_version(name: str) -> Optional[str]

Return the latest registered version for name.

get_latest

get_latest(name: str) -> Optional[Pipeline[Any, Any]]

Return the latest registered pipeline for name if any.

flujo.signature_tools

analyze_signature

analyze_signature(
    func: Callable[..., Any],
) -> SignatureAnalysis

Inspect func and determine its pipeline injection requirements.

Parameters

func: Callable to inspect. It may be a standard function or a callable object.

Returns

SignatureAnalysis Named tuple describing whether context or resources keyword parameters are required and the inferred input/output types.

Raises

ConfigurationError If context or resources parameters are annotated with invalid types.

flujo.validation

BaseValidator

Bases: Validator

A helpful base class for creating validators.

validator

validator(
    func: Callable[[Any], Tuple[bool, Optional[str]]],
) -> Validator

Decorator to create a stateless Validator from a function.