Durable and Resumable Workflows
Running a long or important workflow can feel a bit like playing an old video game – if the power goes out you lose all your progress. Flujo
avoids this problem by saving the state of a pipeline after each successful step. When your process restarts, the run can pick up exactly where it left off.
Why Durability Matters
Servers restart, deployments happen and sometimes a workflow simply has to wait for human input. By persisting the current step index and context, a pipeline can safely resume instead of starting over.
The StateBackend Interface
Flujo uses a pluggable backend interface for persistence. Every backend implements the same three methods:
from flujo.state.backends.base import StateBackend
class StateBackend(ABC):
"""Abstract interface for workflow state persistence."""
async def save_state(self, run_id: str, state: Dict[str, Any]) -> None:
...
async def load_state(self, run_id: str) -> Optional[Dict[str, Any]]:
...
async def delete_state(self, run_id: str) -> None:
...
WorkflowState Model
The runner serializes a WorkflowState
object which contains the pipeline version, step index and JSON‑serializable context.
from flujo.state.models import WorkflowState
WorkflowState(
run_id: str,
pipeline_id: str,
pipeline_name: str,
pipeline_version: str,
current_step_index: int,
pipeline_context: Dict[str, Any],
last_step_output: Any | None,
status: Literal["running", "paused", "completed", "failed", "cancelled"],
created_at: datetime,
updated_at: datetime,
)
Execution Lifecycle
- A
WorkflowState
is created after the first step completes. - The state is updated after every successful step.
- On completion or failure the final status is recorded.
Getting Started
Here is a minimal example using the SQLiteBackend
:
from pathlib import Path
from flujo import Flujo, Step, PipelineRegistry, step
from flujo.state import SQLiteBackend
@step
async def greet(name: str) -> str:
return f"Hello, {name}!"
pipeline = greet
registry = PipelineRegistry()
registry.register(pipeline, "hello", "1.0.0")
backend = SQLiteBackend(Path("workflow.db"))
runner = Flujo(
registry=registry,
pipeline_name="hello",
pipeline_version="1.0.0",
state_backend=backend,
)
result = runner.run("world", run_id="run1")
# To resume later just create a new runner with the same ``run_id``
runner2 = Flujo(
registry=registry,
pipeline_name="hello",
pipeline_version="1.0.0",
state_backend=backend,
)
resumed = runner2.run("world", run_id="run1")
Built-in Backends
InMemoryBackend
For tests and demos. Data lives only in memory so everything is lost on restart. The backend uses the same serialization helpers as the file and SQLite backends, ensuring custom types round-trip consistently.
FileBackend
Stores each run as a JSON file in a directory. Great for simple serverless setups but not safe for concurrent writes.
SQLiteBackend
Persists state in a single SQLite database file. Robust enough for many production scenarios and perfect for local development. For comprehensive documentation including admin queries, performance optimization, and operational best practices, see the SQLite Backend Guide.
See also: Managing Pipeline Versions.