Database Backends: SQLite and PostgreSQL
Flujo supports two production-ready database backends for persistent workflow state: SQLite and PostgreSQL. This guide explains when to use each, how to configure them, manage migrations, and optimize performance.
Quick Decision Guide
| Use Case | Recommended Backend | Reason |
|---|---|---|
| Development & Testing | SQLite | Zero configuration, fast setup |
| Single-server deployments | SQLite | Simple, reliable, sufficient for most workloads |
| High-volume production (>1000 runs/day) | PostgreSQL | Better concurrency, advanced indexing |
| Multi-server/microservices | PostgreSQL | Shared state, connection pooling |
| Complex metadata queries | PostgreSQL | JSONB with GIN indexes for fast filtering |
| Embedded/IoT applications | SQLite | Lightweight, no external dependencies |
SQLite Backend
Overview
SQLite is a self-contained, file-based database that requires no server process. It's perfect for development, single-server deployments, and applications with moderate concurrency requirements.
Key Features
- Zero Configuration: Just specify a file path
- Durable Persistence: Workflows survive restarts
- WAL Mode: Write-Ahead Logging for better concurrency
- Automatic Migrations: Schema upgrades handled automatically
- Thread-Safe: Concurrent access with proper locking
- Portable: Single file, easy to backup and move
Installation
SQLite support is built-in—no additional dependencies required.
from flujo.state.backends.sqlite import SQLiteBackend
from pathlib import Path
backend = SQLiteBackend(Path("workflow_state.db"))
Configuration
Via Code
from pathlib import Path
from flujo.state.backends.sqlite import SQLiteBackend
# Absolute path
backend = SQLiteBackend(Path("/var/lib/flujo/workflow_state.db"))
# Relative path (resolved from current working directory)
backend = SQLiteBackend(Path("workflow_state.db"))
Via Configuration File (flujo.toml)
[state]
uri = "sqlite:///./workflow_state.db" # Relative path
# or
uri = "sqlite:////var/lib/flujo/workflow_state.db" # Absolute path
Via Environment Variable
export FLUJO_STATE_URI="sqlite:///./workflow_state.db"
URI Format
SQLite URIs follow this pattern:
sqlite:///<path>
Examples:
- sqlite:///./workflow_state.db → Relative to current working directory
- sqlite:///workflow_state.db → Relative to current working directory
- sqlite:////var/lib/flujo/workflow_state.db → Absolute path (note the four slashes)
- sqlite:////tmp/flujo_ops.db → Absolute path on Unix systems
Path Resolution Rules:
- Absolute paths: sqlite:////abs/path/to/db (four slashes) are used as-is
- Relative paths: sqlite:///./db or sqlite:///db are resolved relative to the current working directory
- The path normalization follows RFC 3986
Basic Usage
from flujo.state.backends.sqlite import SQLiteBackend
from pathlib import Path
from datetime import datetime, timezone
# Initialize
backend = SQLiteBackend(Path("workflow_state.db"))
# Save state
state = {
"run_id": "run_123",
"pipeline_id": "data_processing",
"pipeline_name": "Data Processing Pipeline",
"pipeline_version": "1.0.0",
"current_step_index": 2,
"pipeline_context": {"input_data": "sample.csv"},
"status": "running",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
"total_steps": 5,
}
await backend.save_state("run_123", state)
# Load state
loaded = await backend.load_state("run_123")
# List runs with filtering
runs = await backend.list_runs(
status="running",
pipeline_name="Data Processing Pipeline",
limit=10,
offset=0,
metadata_filter={"batch_id": "batch-001"}
)
Migrations
SQLite migrations are handled automatically. When you upgrade Flujo, the backend detects schema changes and applies migrations on first access.
# Migration happens automatically
backend = SQLiteBackend(Path("existing_workflow_state.db"))
# Schema is upgraded transparently on first use
Manual Migration (if needed):
# Run migrations manually
flujo migrate --dry-run # Preview changes
flujo migrate # Apply migrations
Performance Optimization
WAL Mode
SQLite uses Write-Ahead Logging (WAL) mode by default, which provides: - Better concurrency (readers don't block writers) - Improved performance - Automatic crash recovery
No configuration needed—it's enabled automatically.
Indexes
The backend creates strategic indexes automatically:
-- These are created automatically
CREATE INDEX idx_workflow_state_status ON workflow_state(status);
CREATE INDEX idx_workflow_state_pipeline_id ON workflow_state(pipeline_id);
CREATE INDEX idx_workflow_state_created_at ON workflow_state(created_at);
CREATE INDEX idx_workflow_state_updated_at ON workflow_state(updated_at);
Large Dataset Tips
# Use pagination for large result sets
runs = await backend.list_runs(limit=100, offset=0)
# Filter early to reduce result size
recent_failures = await backend.list_runs(
status="failed",
limit=50
)
# Regular cleanup to prevent bloat
deleted = await backend.cleanup_old_workflows(days_old=90)
Limitations
- Concurrent Writers: SQLite handles concurrent reads well, but many concurrent writers can cause lock contention
- File Size: While SQLite can handle large databases (100GB+), performance degrades with very large files
- Network Access: SQLite files must be accessible from the application server (no remote access)
Best Practices
-
Regular Backups: Copy the database file regularly
cp workflow_state.db workflow_state_backup_$(date +%Y%m%d).db -
Cleanup Old Data: Prevent database bloat
await backend.cleanup_old_workflows(days_old=30) -
Monitor File Size: Watch for database growth
import os size_mb = os.path.getsize("workflow_state.db") / (1024 * 1024) if size_mb > 1000: # 1GB threshold print("Database is getting large, consider cleanup") -
Use Separate Databases: For different environments or pipeline types
prod_backend = SQLiteBackend(Path("prod_workflow_state.db")) dev_backend = SQLiteBackend(Path("dev_workflow_state.db"))
PostgreSQL Backend
Overview
PostgreSQL is a powerful, open-source relational database ideal for high-volume production deployments, multi-server architectures, and applications requiring advanced querying capabilities.
Key Features
- High Concurrency: Excellent performance with many concurrent connections
- JSONB Support: Native JSON storage with GIN indexes for fast queries
- Connection Pooling: Built-in async connection pool management
- Advanced Indexing: GIN indexes for efficient metadata filtering
- Scalability: Handles millions of workflow runs
- Multi-Server: Shared state across multiple application instances
Installation
PostgreSQL support requires the asyncpg package:
# Install with postgres extra
pip install flujo[postgres]
# Or install asyncpg directly
pip install asyncpg
Note: If you configure a Postgres URI but asyncpg is not installed, Flujo will raise a clear error with installation instructions.
Configuration
Via Code
from flujo.state.backends.postgres import PostgresBackend
# Connection string (DSN format)
backend = PostgresBackend(
"postgres://user:password@localhost:5432/flujo_db",
auto_migrate=True,
pool_min_size=1,
pool_max_size=10
)
Via Configuration File (flujo.toml)
[state]
uri = "postgres://user:password@localhost:5432/flujo_db"
[state.postgres]
pool_min_size = 1
pool_max_size = 10
auto_migrate = true
Via Environment Variable
export FLUJO_STATE_URI="postgres://user:password@localhost:5432/flujo_db"
export FLUJO_AUTO_MIGRATE="true"
Connection String Format
PostgreSQL connection strings follow the standard DSN format:
postgres://[user[:password]@][host][:port][/database][?param1=value1&...]
postgresql://[user[:password]@][host][:port][/database][?param1=value1&...]
Examples:
- postgres://user:pass@localhost:5432/flujo_db
- postgres://user@localhost/flujo_db (default port 5432, no password)
- postgresql://user:pass@db.example.com:5432/flujo_db?sslmode=require
Common Parameters:
- sslmode=require - Require SSL connection
- connect_timeout=10 - Connection timeout in seconds
- application_name=flujo - Application identifier
Connection Pooling
PostgresBackend uses asyncpg's connection pooling for efficient resource management:
backend = PostgresBackend(
dsn,
pool_min_size=1, # Minimum connections in pool
pool_max_size=10, # Maximum connections in pool
auto_migrate=True
)
Pool Sizing Guidelines:
- Small deployments (< 100 concurrent requests): pool_min_size=1, pool_max_size=5
- Medium deployments (100-1000 concurrent): pool_min_size=2, pool_max_size=10
- Large deployments (> 1000 concurrent): pool_min_size=5, pool_max_size=20
Basic Usage
from flujo.state.backends.postgres import PostgresBackend
from datetime import datetime, timezone
# Initialize
backend = PostgresBackend(
"postgres://user:pass@localhost:5432/flujo_db",
auto_migrate=True
)
# Save state
state = {
"run_id": "run_123",
"pipeline_id": "data_processing",
"pipeline_name": "Data Processing Pipeline",
"pipeline_version": "1.0.0",
"current_step_index": 2,
"pipeline_context": {"input_data": "sample.csv"},
"status": "running",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
"total_steps": 5,
"metadata": {"batch_id": "batch-001", "priority": "high"},
}
await backend.save_state("run_123", state)
# Load state
loaded = await backend.load_state("run_123")
# List runs with metadata filtering (uses GIN index)
runs = await backend.list_runs(
status="running",
pipeline_name="Data Processing Pipeline",
limit=10,
offset=0,
metadata_filter={"batch_id": "batch-001"} # Fast with GIN index
)
Migrations
PostgreSQL migrations are managed through SQL migration files in flujo/state/migrations/.
Automatic Migrations
By default, migrations are applied automatically on backend initialization:
backend = PostgresBackend(dsn, auto_migrate=True) # Default
Manual Migrations
For production deployments, you may want to run migrations manually:
# Preview migrations
flujo migrate --dry-run
# Apply all pending migrations
flujo migrate
# Apply up to specific version
flujo migrate --target-version 2
Migration Files
Migrations are numbered sequentially:
001_init.sql- Initial schema creation002_metadata_index.sql- GIN index for metadata JSONB column
Example Migration (002_metadata_index.sql):
BEGIN;
-- Create GIN index for efficient JSONB querying on metadata
CREATE INDEX IF NOT EXISTS idx_workflow_state_metadata_gin
ON workflow_state USING GIN (metadata);
-- Update schema version
INSERT INTO flujo_schema_versions (version, applied_at)
VALUES (2, NOW())
ON CONFLICT (version) DO NOTHING;
COMMIT;
Migration Tracking
Migrations are tracked in the flujo_schema_versions table:
SELECT version, applied_at
FROM flujo_schema_versions
ORDER BY version;
Disabling Auto-Migration
For production environments where you want explicit control:
backend = PostgresBackend(dsn, auto_migrate=False)
# Then run migrations manually
# flujo migrate
If auto_migrate=False and schema is missing, the backend will raise an error with instructions.
Performance Optimization
GIN Indexes for Metadata
PostgreSQL uses GIN (Generalized Inverted Index) indexes for efficient JSONB queries:
-- This index is created by migration 002
CREATE INDEX idx_workflow_state_metadata_gin
ON workflow_state USING GIN (metadata);
This enables fast queries like:
# This query uses the GIN index for fast filtering
runs = await backend.list_runs(
metadata_filter={"batch_id": "batch-001", "priority": "high"}
)
GIN Index Benefits:
- Fast containment checks (@> operator)
- Efficient filtering on nested JSON structures
- Scales well with large metadata objects
Query Optimization
# Good: Uses indexes
runs = await backend.list_runs(
status="running",
pipeline_name="Data Processing",
limit=100
)
# Good: Metadata filter uses GIN index
runs = await backend.list_runs(
metadata_filter={"batch_id": "batch-001"}
)
# Avoid: Full table scan (no filters)
all_runs = await backend.list_runs() # Use pagination!
Connection Pool Tuning
Monitor pool usage and adjust:
# For high-throughput applications
backend = PostgresBackend(
dsn,
pool_min_size=5,
pool_max_size=20
)
Advanced Features
JSONB Metadata Queries
PostgreSQL's JSONB support enables powerful metadata queries:
# Filter by nested metadata
runs = await backend.list_runs(
metadata_filter={
"user": {"id": 123, "role": "admin"},
"tags": ["urgent", "production"]
}
)
Direct SQL Queries
For advanced analytics, you can query the database directly:
-- Get workflow statistics
SELECT
pipeline_name,
status,
COUNT(*) as count,
AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_seconds
FROM workflow_state
WHERE created_at > NOW() - INTERVAL '7 days'
GROUP BY pipeline_name, status
ORDER BY count DESC;
-- Find workflows with specific metadata
SELECT run_id, pipeline_name, metadata
FROM workflow_state
WHERE metadata @> '{"batch_id": "batch-001"}'::jsonb;
Best Practices
-
Use Connection Pooling: Let the backend manage connections
backend = PostgresBackend(dsn, pool_max_size=10) -
Enable SSL in Production: Use SSL for secure connections
postgres://user:pass@host:5432/db?sslmode=require -
Monitor Pool Usage: Watch for connection pool exhaustion
# Check pool stats (if available in your monitoring) # pool.size - current connections # pool.max_size - maximum connections -
Regular Vacuum: PostgreSQL handles this automatically, but monitor
-- Check table bloat SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size FROM pg_tables WHERE schemaname = 'public'; -
Backup Strategy: Use PostgreSQL's native backup tools
# pg_dump for backups pg_dump -h localhost -U user -d flujo_db > backup.sql
Migration System
How Migrations Work
Flujo uses a versioned migration system:
- Migration Files: SQL files in
flujo/state/migrations/numbered sequentially - Version Tracking:
flujo_schema_versionstable tracks applied migrations - Automatic Application: Migrations run automatically on backend initialization (if
auto_migrate=True) - Idempotent: Migrations can be run multiple times safely
Migration File Format
BEGIN;
-- Your migration SQL here
CREATE INDEX IF NOT EXISTS idx_example ON table_name(column_name);
-- Always update schema version
INSERT INTO flujo_schema_versions (version, applied_at)
VALUES (3, NOW())
ON CONFLICT (version) DO NOTHING;
COMMIT;
Creating New Migrations
- Create Migration File: Add
00X_description.sqltoflujo/state/migrations/ - Use Transactions: Wrap in
BEGIN;andCOMMIT; - Update Version: Always increment and record the version number
- Test: Run migrations in a test environment first
Example (003_add_custom_column.sql):
BEGIN;
ALTER TABLE workflow_state
ADD COLUMN IF NOT EXISTS custom_field TEXT;
INSERT INTO flujo_schema_versions (version, applied_at)
VALUES (3, NOW())
ON CONFLICT (version) DO NOTHING;
COMMIT;
Migration Best Practices
-
Always Use IF NOT EXISTS: Prevents errors on re-runs
CREATE INDEX IF NOT EXISTS ... ALTER TABLE ... ADD COLUMN IF NOT EXISTS ... -
Test Backwards Compatibility: Ensure old code works with new schema
- Document Breaking Changes: Note any API changes in migration comments
- Run in Production Carefully: Test migrations in staging first
Choosing Between SQLite and PostgreSQL
Use SQLite When:
- ✅ Development and testing environments
- ✅ Single-server deployments
- ✅ Low to moderate concurrency (< 50 concurrent writes)
- ✅ Simple deployment requirements
- ✅ Embedded or IoT applications
- ✅ Prototyping and demos
Use PostgreSQL When:
- ✅ Production deployments with high volume (> 1000 runs/day)
- ✅ Multi-server or microservices architecture
- ✅ High concurrency requirements (> 50 concurrent writes)
- ✅ Complex metadata queries with JSONB
- ✅ Need for advanced analytics and reporting
- ✅ Shared state across multiple application instances
- ✅ Enterprise-grade reliability and monitoring
Migration Path
You can migrate from SQLite to PostgreSQL:
-
Export from SQLite:
# Export all runs runs = await sqlite_backend.list_runs() -
Import to PostgreSQL:
# Import runs for run in runs: state = await sqlite_backend.load_state(run['run_id']) await postgres_backend.save_state(run['run_id'], state) -
Update Configuration: Change
FLUJO_STATE_URIto Postgres connection string
Troubleshooting
SQLite Issues
Database is locked - Cause: Too many concurrent writers - Solution: Reduce concurrency or use PostgreSQL for high-write scenarios
Database file not found - Cause: Path resolution issue - Solution: Use absolute paths or verify current working directory
Slow queries - Cause: Missing indexes or large database - Solution: Ensure indexes exist, use pagination, cleanup old data
PostgreSQL Issues
Connection refused - Cause: Database server not running or wrong host/port - Solution: Verify connection string and database server status
Pool exhaustion
- Cause: Too many concurrent connections
- Solution: Increase pool_max_size or reduce application concurrency
Migration errors
- Cause: Manual schema changes or migration conflicts
- Solution: Check flujo_schema_versions table, run migrations manually
Slow metadata queries - Cause: Missing GIN index - Solution: Ensure migration 002 is applied:
SELECT version FROM flujo_schema_versions WHERE version = 2;
Configuration Examples
Development (SQLite)
# flujo.toml
[state]
uri = "sqlite:///./dev_workflow_state.db"
Production (PostgreSQL)
# flujo.toml
[state]
uri = "postgres://user:password@db.example.com:5432/flujo_prod"
[state.postgres]
pool_min_size = 2
pool_max_size = 10
auto_migrate = false # Run migrations manually in production
Environment-Specific
# Development
export FLUJO_STATE_URI="sqlite:///./dev_workflow_state.db"
# Staging
export FLUJO_STATE_URI="postgres://user:pass@staging-db:5432/flujo_staging"
# Production
export FLUJO_STATE_URI="postgres://user:pass@prod-db:5432/flujo_prod?sslmode=require"
Summary
Both SQLite and PostgreSQL are production-ready backends for Flujo:
- SQLite: Perfect for development, single-server deployments, and moderate workloads
- PostgreSQL: Ideal for high-volume production, multi-server architectures, and advanced querying
Choose based on your deployment requirements, concurrency needs, and scalability goals. Both backends support automatic migrations, provide excellent performance, and offer comprehensive observability features.