Skip to content

State Backends API

Flujo provides persistent state management for workflows, allowing them to be paused, resumed, and monitored across process restarts.

Overview

The state management system has been optimized for production use cases with the following key features:

  • Durable Workflows: Workflows can be paused and resumed across process restarts
  • Optimized SQLite Backend: High-performance, indexed database for large-scale deployments
  • Admin Queries: Built-in observability and monitoring capabilities
  • Automatic Migration: Seamless upgrades for existing deployments

For detailed information about the optimized SQLite backend, see State Backend Optimization and the comprehensive SQLite Backend Guide.

API Reference

flujo.state.models.WorkflowState

Bases: BaseModel

Serialized snapshot of a running workflow.

flujo.state.backends.base.StateBackend

Bases: ABC

Abstract base class for state backends.

State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.

save_state abstractmethod async

save_state(run_id: str, state: Dict[str, Any]) -> None

Save workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state Dict[str, Any]

Dictionary containing workflow state data

required

load_state abstractmethod async

load_state(run_id: str) -> Optional[Dict[str, Any]]

Load workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary containing workflow state data, or None if not found

delete_state abstractmethod async

delete_state(run_id: str) -> None

Delete workflow state.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required

list_workflows async

list_workflows(
    status: Optional[str] = None,
    pipeline_id: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

List workflows with optional filtering and pagination.

get_workflow_stats async

get_workflow_stats() -> Dict[str, Any]

Get statistics about stored workflows.

cleanup_old_workflows async

cleanup_old_workflows(days_old: int = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

get_failed_workflows async

get_failed_workflows(
    hours_back: int = 24,
) -> List[Dict[str, Any]]

Get failed workflows from the last N hours with error details.

get_trace abstractmethod async

get_trace(run_id: str) -> Any

Retrieve and deserialize the trace tree for a given run_id.

save_trace abstractmethod async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
trace Dict[str, Any]

Dictionary containing trace tree data

required

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

save_run_start async

save_run_start(run_data: Dict[str, Any]) -> None

Persist initial run metadata.

save_step_result async

save_step_result(step_data: Dict[str, Any]) -> None

Persist a single step execution record.

save_run_end async

save_run_end(run_id: str, end_data: Dict[str, Any]) -> None

Update run metadata when execution finishes.

get_run_details async

get_run_details(run_id: str) -> Optional[Dict[str, Any]]

Retrieve stored metadata for a run.

list_run_steps async

list_run_steps(run_id: str) -> List[Dict[str, Any]]

Return all step records for a run ordered by step index.

flujo.state.backends.memory.InMemoryBackend

Bases: StateBackend

Simple in-memory backend for testing and defaults.

This backend mirrors the serialization logic of the persistent backends by storing a serialized copy of the workflow state. Values are serialized with safe_serialize on save and reconstructed with safe_deserialize when loaded.

get_trace async

get_trace(run_id: str) -> Any

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

flujo.state.backends.file.FileBackend

Bases: StateBackend

Persist workflow state to JSON files.

get_trace async

get_trace(run_id: str) -> Optional[Dict[str, Any]]

Retrieve trace data for a given run_id.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Save trace data for a given run_id.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

flujo.state.backends.sqlite.SQLiteBackend

Bases: StateBackend

SQLite-backed persistent storage for workflow state with optimized schema.

close async

close() -> None

Close database connections and cleanup resources.

__aenter__ async

__aenter__() -> 'SQLiteBackend'

Async context manager entry.

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit with cleanup.

save_state async

save_state(run_id: str, state: Dict[str, Any]) -> None

Save workflow state to the database.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the workflow run

required
state Dict[str, Any]

Dictionary containing workflow state data

required

delete_state async

delete_state(run_id: str) -> None

Delete workflow state.

list_states async

list_states(
    status: Optional[str] = None,
) -> List[Dict[str, Any]]

List workflow states with optional status filter.

list_workflows async

list_workflows(
    status: Optional[str] = None,
    pipeline_id: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

Enhanced workflow listing with additional filters and metadata.

list_runs async

list_runs(
    status: Optional[str] = None,
    pipeline_name: Optional[str] = None,
    limit: Optional[int] = None,
    offset: int = 0,
) -> List[Dict[str, Any]]

List runs from the new structured schema for lens CLI.

get_workflow_stats async

get_workflow_stats() -> Dict[str, Any]

Get comprehensive workflow statistics.

get_failed_workflows async

get_failed_workflows(
    hours_back: int = 24,
) -> List[Dict[str, Any]]

Get failed workflows from the last N hours.

cleanup_old_workflows async

cleanup_old_workflows(days_old: float = 30) -> int

Delete workflows older than specified days. Returns number of deleted workflows.

save_trace async

save_trace(run_id: str, trace: Dict[str, Any]) -> None

Persist a trace tree as normalized spans for a given run_id.

get_trace async

get_trace(run_id: str) -> Optional[Dict[str, Any]]

Retrieve and reconstruct the trace tree for a given run_id. Audit log access.

get_spans async

get_spans(
    run_id: str,
    status: Optional[str] = None,
    name: Optional[str] = None,
) -> List[Dict[str, Any]]

Get individual spans with optional filtering. Audit log export.

get_span_statistics async

get_span_statistics(
    pipeline_name: Optional[str] = None,
    time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]

Get aggregated span statistics.

delete_run async

delete_run(run_id: str) -> None

Delete a run from the runs table (cascades to traces). Audit log deletion.