State Backends API
Flujo provides persistent state management for workflows, allowing them to be paused, resumed, and monitored across process restarts.
Overview
The state management system has been optimized for production use cases with the following key features:
- Durable Workflows: Workflows can be paused and resumed across process restarts
- Optimized SQLite Backend: High-performance, indexed database for large-scale deployments
- Admin Queries: Built-in observability and monitoring capabilities
- Automatic Migration: Seamless upgrades for existing deployments
For detailed information about the optimized SQLite backend, see State Backend Optimization and the comprehensive SQLite Backend Guide.
API Reference
flujo.state.models.WorkflowState
Bases: BaseModel
Serialized snapshot of a running workflow.
flujo.state.backends.base.StateBackend
Bases: ABC
Abstract base class for state backends.
State backends are responsible for persisting and retrieving workflow state. They handle serialization of complex objects automatically using the enhanced serialization utilities.
save_state
abstractmethod
async
save_state(run_id: str, state: JSONObject) -> None
Save workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
JSONObject
|
Dictionary containing workflow state data |
required |
load_state
abstractmethod
async
load_state(run_id: str) -> Optional[JSONObject]
Load workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
Returns:
| Type | Description |
|---|---|
Optional[JSONObject]
|
Dictionary containing workflow state data, or None if not found |
delete_state
abstractmethod
async
delete_state(run_id: str) -> None
Delete workflow state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
list_workflows
async
list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List workflows with optional filtering and pagination.
get_workflow_stats
async
get_workflow_stats() -> JSONObject
Get statistics about stored workflows.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: int = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
get_failed_workflows
async
get_failed_workflows(hours_back: int = 24) -> List[JSONObject]
Get failed workflows from the last N hours with error details.
list_background_tasks
async
list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List background tasks with optional filtering and pagination.
get_failed_background_tasks
async
get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]
Get failed background tasks within a time window.
get_trace
abstractmethod
async
get_trace(run_id: str) -> Any
Retrieve and deserialize the trace tree for a given run_id.
save_trace
abstractmethod
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
trace
|
JSONObject
|
Dictionary containing trace tree data |
required |
save_spans
async
save_spans(run_id: str, spans: list[JSONObject]) -> None
Persist normalized spans for a run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
spans
|
list[JSONObject]
|
List of span dictionaries (span_id, parent_span_id, name, etc.) |
required |
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
cleanup_stale_background_tasks
async
cleanup_stale_background_tasks(stale_hours: int = 24) -> int
Mark stale background tasks as failed (timeout).
persist_evaluation
async
persist_evaluation(run_id: str, score: float, feedback: str | None = None, step_name: str | None = None, metadata: JSONObject | None = None) -> None
Persist shadow evaluation result (default: no-op).
list_evaluations
async
list_evaluations(limit: int = 20, run_id: str | None = None) -> list[JSONObject]
List persisted shadow evaluation results (default: not implemented).
save_run_start
async
save_run_start(run_data: JSONObject) -> None
Persist initial run metadata.
save_step_result
async
save_step_result(step_data: JSONObject) -> None
Persist a single step execution record.
save_run_end
async
save_run_end(run_id: str, end_data: JSONObject) -> None
Update run metadata when execution finishes.
get_run_details
async
get_run_details(run_id: str) -> Optional[JSONObject]
Retrieve stored metadata for a run.
list_runs
async
list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]
Return stored runs with optional filtering.
list_run_steps
async
list_run_steps(run_id: str) -> List[JSONObject]
Return all step records for a run ordered by step index.
set_system_state
async
set_system_state(key: str, value: JSONObject) -> None
Upsert a global key/value pair used for connector watermarks.
get_system_state
async
get_system_state(key: str) -> Optional[JSONObject]
Fetch a previously stored global key/value pair.
shutdown
async
shutdown() -> None
Gracefully release any resources held by the backend.
Default is a no-op. Concrete backends should override when they hold threads, file handles, or async connections that need closing.
flujo.state.backends.memory.InMemoryBackend
Bases: StateBackend
Simple in-memory backend for testing and defaults.
This backend mirrors the serialization logic of the persistent backends by
storing a serialized copy of the workflow state. Values are serialized with
_serialize_for_json on save and reconstructed with safe_deserialize when
loaded.
get_trace
async
get_trace(run_id: str) -> Any
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
flujo.state.backends.file.FileBackend
Bases: StateBackend
Persist workflow state to JSON files.
get_trace
async
get_trace(run_id: str) -> Optional[JSONObject]
Retrieve trace data for a given run_id.
save_trace
async
save_trace(run_id: str, trace: JSONObject) -> None
Save trace data for a given run_id.
get_spans
async
get_spans(run_id: str, status: Optional[str] = None, name: Optional[str] = None) -> List[JSONObject]
Get individual spans with optional filtering.
get_span_statistics
async
get_span_statistics(pipeline_name: Optional[str] = None, time_range: Optional[Tuple[float, float]] = None) -> JSONObject
Get aggregated span statistics.
flujo.state.backends.sqlite.SQLiteBackend
Bases: SQLiteTraceMixin, SQLiteBackendBase
save_state
async
save_state(run_id: str, state: JSONObject) -> None
Save workflow state to the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for the workflow run |
required |
state
|
JSONObject
|
Dictionary containing workflow state data |
required |
delete_state
async
delete_state(run_id: str) -> None
Delete workflow state.
list_states
async
list_states(status: Optional[str] = None) -> List[JSONObject]
List workflow states with optional status filter.
Uses the pooled connection when available to minimize connection overhead.
list_background_tasks
async
list_background_tasks(parent_run_id: Optional[str] = None, status: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
List background tasks with optional filtering.
get_failed_background_tasks
async
get_failed_background_tasks(parent_run_id: Optional[str] = None, hours_back: int = 24) -> List[JSONObject]
Return failed background tasks within a time window.
cleanup_stale_background_tasks
async
cleanup_stale_background_tasks(stale_hours: int = 24) -> int
Mark running background tasks older than the cutoff as failed.
list_workflows
async
list_workflows(status: Optional[str] = None, pipeline_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> List[JSONObject]
Enhanced workflow listing with additional filters and metadata.
list_runs
async
list_runs(status: Optional[str] = None, pipeline_name: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, metadata_filter: Optional[JSONObject] = None) -> List[JSONObject]
List runs with optional filtering and include workflow metadata.
get_workflow_stats
async
get_workflow_stats() -> JSONObject
Get comprehensive workflow statistics.
get_workflow_stats_sync
get_workflow_stats_sync() -> JSONObject
Synchronous stats snapshot for observability collectors.
Prometheus collectors are synchronous and can be invoked from async runtimes
(running event loop threads). This method avoids run_sync() by querying
SQLite via the standard library driver.
get_failed_workflows
async
get_failed_workflows(hours_back: int = 24) -> List[JSONObject]
Get failed workflows from the last N hours.
cleanup_old_workflows
async
cleanup_old_workflows(days_old: float = 30) -> int
Delete workflows older than specified days. Returns number of deleted workflows.
delete_run
async
delete_run(run_id: str) -> None
Delete a run from the runs table (cascades to traces). Audit log deletion.