State Backends API
Flujo provides persistent state management for workflows, allowing them to be paused, resumed, and monitored across process restarts.
Overview
The state management system has been optimized for production use cases with the following key features:
- Durable Workflows: Workflows can be paused and resumed across process restarts
- Optimized SQLite Backend: High-performance, indexed database for large-scale deployments
- Admin Queries: Built-in observability and monitoring capabilities
- Automatic Migration: Seamless upgrades for existing deployments
For detailed information about the optimized SQLite backend, see State Backend Optimization and the comprehensive SQLite Backend Guide.
API Reference
flujo.state.models.WorkflowState
Bases: BaseModel
Serialized snapshot of a running workflow.
flujo.state.backends.base.StateBackend
Bases: ABC
Abstract base class for state backends.
State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.
save_state
abstractmethod
async
save_state(run_id: str, state: Dict[str, Any]) -> None
Save workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
Dict[str, Any]
|
Dictionary containing workflow state data |
required |
load_state
abstractmethod
async
load_state(run_id: str) -> Optional[Dict[str, Any]]
Load workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
Returns:
Type | Description |
---|---|
Optional[Dict[str, Any]]
|
Dictionary containing workflow state data, or None if not found |
delete_state
abstractmethod
async
delete_state(run_id: str) -> None
Delete workflow state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
list_workflows
async
list_workflows(
status: Optional[str] = None,
pipeline_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
List workflows with optional filtering and pagination.
get_workflow_stats
async
get_workflow_stats() -> Dict[str, Any]
Get statistics about stored workflows.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: int = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
get_failed_workflows
async
get_failed_workflows(
hours_back: int = 24,
) -> List[Dict[str, Any]]
Get failed workflows from the last N hours with error details.
get_trace
abstractmethod
async
get_trace(run_id: str) -> Any
Retrieve and deserialize the trace tree for a given run_id.
save_trace
abstractmethod
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
trace
|
Dict[str, Any]
|
Dictionary containing trace tree data |
required |
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
save_run_start
async
save_run_start(run_data: Dict[str, Any]) -> None
Persist initial run metadata.
save_step_result
async
save_step_result(step_data: Dict[str, Any]) -> None
Persist a single step execution record.
save_run_end
async
save_run_end(run_id: str, end_data: Dict[str, Any]) -> None
Update run metadata when execution finishes.
get_run_details
async
get_run_details(run_id: str) -> Optional[Dict[str, Any]]
Retrieve stored metadata for a run.
list_run_steps
async
list_run_steps(run_id: str) -> List[Dict[str, Any]]
Return all step records for a run ordered by step index.
flujo.state.backends.memory.InMemoryBackend
Bases: StateBackend
Simple in-memory backend for testing and defaults.
This backend mirrors the serialization logic of the persistent backends by
storing a serialized copy of the workflow state. Values are serialized with
safe_serialize
on save and reconstructed with safe_deserialize
when
loaded.
get_trace
async
get_trace(run_id: str) -> Any
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
flujo.state.backends.file.FileBackend
Bases: StateBackend
Persist workflow state to JSON files.
get_trace
async
get_trace(run_id: str) -> Optional[Dict[str, Any]]
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
flujo.state.backends.sqlite.SQLiteBackend
Bases: StateBackend
SQLite-backed persistent storage for workflow state with optimized schema.
close
async
close() -> None
Close database connections and cleanup resources.
__aenter__
async
__aenter__() -> 'SQLiteBackend'
Async context manager entry.
__aexit__
async
__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None
Async context manager exit with cleanup.
save_state
async
save_state(run_id: str, state: Dict[str, Any]) -> None
Save workflow state to the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
Dict[str, Any]
|
Dictionary containing workflow state data |
required |
delete_state
async
delete_state(run_id: str) -> None
Delete workflow state.
list_states
async
list_states(
status: Optional[str] = None,
) -> List[Dict[str, Any]]
List workflow states with optional status filter.
list_workflows
async
list_workflows(
status: Optional[str] = None,
pipeline_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
Enhanced workflow listing with additional filters and metadata.
list_runs
async
list_runs(
status: Optional[str] = None,
pipeline_name: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]
List runs from the new structured schema for lens CLI.
get_workflow_stats
async
get_workflow_stats() -> Dict[str, Any]
Get comprehensive workflow statistics.
get_failed_workflows
async
get_failed_workflows(
hours_back: int = 24,
) -> List[Dict[str, Any]]
Get failed workflows from the last N hours.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: float = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
save_trace
async
save_trace(run_id: str, trace: Dict[str, Any]) -> None
Persist a trace tree as normalized spans for a given run_id.
get_trace
async
get_trace(run_id: str) -> Optional[Dict[str, Any]]
Retrieve and reconstruct the trace tree for a given run_id. Audit log access.
get_spans
async
get_spans(
run_id: str,
status: Optional[str] = None,
name: Optional[str] = None,
) -> List[Dict[str, Any]]
Get individual spans with optional filtering. Audit log export.
get_span_statistics
async
get_span_statistics(
pipeline_name: Optional[str] = None,
time_range: Optional[Tuple[float, float]] = None,
) -> Dict[str, Any]
Get aggregated span statistics.
delete_run
async
delete_run(run_id: str) -> None
Delete a run from the runs table (cascades to traces). Audit log deletion.