Optimized SQLite State Backend
Overview
The SQLite state backend has been optimized for production use cases involving large numbers of durable workflows. This refactor significantly improves performance and utility through:
- Performance at Scale: Structured SQL schema with indexed columns for fast queries
- Observability & Debugging: Direct database queryability for operators
- Advanced Features: Foundation for UI dashboards and admin tools
Key Improvements
1. Structured Schema
The new schema moves status, timestamps, and step indices into native SQL columns:
CREATE TABLE workflow_state (
run_id TEXT PRIMARY KEY,
pipeline_id TEXT NOT NULL,
pipeline_name TEXT NOT NULL,
pipeline_version TEXT NOT NULL,
current_step_index INTEGER NOT NULL DEFAULT 0,
pipeline_context TEXT NOT NULL, -- JSON blob
last_step_output TEXT, -- JSON blob, nullable
status TEXT NOT NULL CHECK (status IN ('running', 'paused', 'completed', 'failed', 'cancelled')),
created_at TEXT NOT NULL, -- ISO format datetime
updated_at TEXT NOT NULL, -- ISO format datetime
-- Additional metadata for better observability
total_steps INTEGER DEFAULT 0,
error_message TEXT,
execution_time_ms INTEGER,
memory_usage_mb REAL
);
2. Performance Indexes
Strategic indexes for common query patterns:
CREATE INDEX idx_workflow_state_status ON workflow_state(status);
CREATE INDEX idx_workflow_state_pipeline_id ON workflow_state(pipeline_id);
CREATE INDEX idx_workflow_state_created_at ON workflow_state(created_at);
CREATE INDEX idx_workflow_state_updated_at ON workflow_state(updated_at);
CREATE INDEX idx_workflow_state_status_updated ON workflow_state(status, updated_at);
CREATE INDEX idx_workflow_state_pipeline_status ON workflow_state(pipeline_id, status);
3. Automatic Migration
Existing databases are automatically migrated to the new schema without data loss.
Basic Usage
Standard Operations
from flujo.state import SQLiteBackend
from pathlib import Path
# Initialize backend
backend = SQLiteBackend(Path("workflow_state.db"))
# Save workflow state
state = {
"run_id": "run_123",
"pipeline_id": "data_processing",
"pipeline_name": "Data Processing Pipeline",
"pipeline_version": "1.0.0",
"current_step_index": 2,
"pipeline_context": {"input_data": "sample.csv"},
"last_step_output": {"processed_rows": 1000},
"status": "running",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"total_steps": 5,
"error_message": None,
"execution_time_ms": 1500,
"memory_usage_mb": 45.2
}
await backend.save_state("run_123", state)
# Load workflow state
loaded_state = await backend.load_state("run_123")
# Delete workflow state
await backend.delete_state("run_123")
Admin Queries and Observability
1. List Workflows with Filtering
# List all workflows
all_workflows = await backend.list_workflows()
# List only running workflows
running_workflows = await backend.list_workflows(status="running")
# List workflows for a specific pipeline
pipeline_workflows = await backend.list_workflows(pipeline_id="data_processing")
# List with pagination
recent_workflows = await backend.list_workflows(
limit=10,
offset=0
)
# Combined filtering
filtered_workflows = await backend.list_workflows(
status="failed",
pipeline_id="data_processing",
limit=5
)
2. Get Workflow Statistics
# Get comprehensive statistics
stats = await backend.get_workflow_stats()
print(f"Total workflows: {stats['total_workflows']}")
print(f"Recent workflows (24h): {stats['recent_workflows_24h']}")
print(f"Status breakdown: {stats['status_counts']}")
print(f"Average execution time: {stats['average_execution_time_ms']}ms")
# Example output:
# Total workflows: 1250
# Recent workflows (24h): 45
# Status breakdown: {'completed': 1000, 'running': 15, 'failed': 35, 'cancelled': 5}
# Average execution time: 2340.5ms
3. Find Failed Workflows
# Get failed workflows from last 24 hours
failed_workflows = await backend.get_failed_workflows(hours_back=24)
for workflow in failed_workflows:
print(f"Run ID: {workflow['run_id']}")
print(f"Pipeline: {workflow['pipeline_name']}")
print(f"Error: {workflow['error_message']}")
print(f"Failed at: {workflow['updated_at']}")
print("---")
# Get failed workflows from last week
recent_failures = await backend.get_failed_workflows(hours_back=168)
4. Cleanup Old Workflows
# Delete workflows older than 30 days
deleted_count = await backend.cleanup_old_workflows(days_old=30)
print(f"Deleted {deleted_count} old workflows")
# Delete workflows older than 7 days
weekly_cleanup = await backend.cleanup_old_workflows(days_old=7)
Direct SQL Queries
For advanced operators, the database is directly queryable:
Performance Monitoring
-- Find slow workflows (execution time > 5 minutes)
SELECT run_id, pipeline_name, execution_time_ms, created_at
FROM workflow_state
WHERE execution_time_ms > 300000
ORDER BY execution_time_ms DESC;
-- Find workflows with high memory usage
SELECT run_id, pipeline_name, memory_usage_mb, created_at
FROM workflow_state
WHERE memory_usage_mb > 100
ORDER BY memory_usage_mb DESC;
Error Analysis
-- Most common error messages
SELECT error_message, COUNT(*) as error_count
FROM workflow_state
WHERE status = 'failed' AND error_message IS NOT NULL
GROUP BY error_message
ORDER BY error_count DESC
LIMIT 10;
-- Failed workflows by pipeline
SELECT pipeline_name, COUNT(*) as failure_count
FROM workflow_state
WHERE status = 'failed'
GROUP BY pipeline_name
ORDER BY failure_count DESC;
Pipeline Performance
-- Average execution time by pipeline
SELECT
pipeline_name,
AVG(execution_time_ms) as avg_execution_time,
COUNT(*) as total_runs,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_runs
FROM workflow_state
WHERE execution_time_ms IS NOT NULL
GROUP BY pipeline_name
ORDER BY avg_execution_time DESC;
Recent Activity
-- Workflows created in the last hour
SELECT run_id, pipeline_name, status, created_at
FROM workflow_state
WHERE created_at > datetime('now', '-1 hour')
ORDER BY created_at DESC;
-- Status changes in the last 24 hours
SELECT run_id, pipeline_name, status, updated_at
FROM workflow_state
WHERE updated_at > datetime('now', '-1 day')
ORDER BY updated_at DESC;
Migration Strategy
Automatic Migration
The backend automatically migrates existing databases:
- Schema Detection: Checks for existing columns
- Column Addition: Adds new columns with appropriate defaults
- Data Migration: Updates existing data to meet new constraints
- Index Creation: Creates performance indexes
Manual Migration Verification
import aiosqlite
async def verify_migration(db_path):
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute("PRAGMA table_info(workflow_state)")
columns = [row[1] for row in await cursor.fetchall()]
required_columns = [
'total_steps', 'error_message',
'execution_time_ms', 'memory_usage_mb'
]
missing_columns = [col for col in required_columns if col not in columns]
if missing_columns:
print(f"Missing columns: {missing_columns}")
else:
print("Migration completed successfully")
Performance Considerations
Index Usage
- Queries on
status
,pipeline_id
, and timestamps are optimized - Composite indexes support common filtering patterns
- Consider your query patterns when adding custom indexes
WAL Mode
The backend enables WAL (Write-Ahead Logging) mode for: - Better concurrent read/write performance - Reduced blocking during writes - Improved durability
Memory Management
- Large JSON blobs in
pipeline_context
andlast_step_output
are stored as text - Consider archiving old workflows to manage database size
- Monitor
memory_usage_mb
for resource-intensive workflows
Best Practices
1. Regular Cleanup
# Set up automated cleanup
async def maintenance_cleanup():
# Clean up workflows older than 30 days
deleted = await backend.cleanup_old_workflows(days_old=30)
print(f"Maintenance: deleted {deleted} old workflows")
2. Monitoring Setup
# Regular health checks
async def health_check():
stats = await backend.get_workflow_stats()
# Alert on high failure rate
total = stats['total_workflows']
failed = stats['status_counts'].get('failed', 0)
failure_rate = failed / total if total > 0 else 0
if failure_rate > 0.1: # 10% failure rate
print(f"WARNING: High failure rate: {failure_rate:.2%}")
# Alert on stuck workflows
running = stats['status_counts'].get('running', 0)
if running > 50:
print(f"WARNING: Many running workflows: {running}")
3. Error Tracking
# Monitor specific error patterns
async def error_analysis():
failed_workflows = await backend.get_failed_workflows(hours_back=24)
error_patterns = {}
for workflow in failed_workflows:
error = workflow['error_message']
if error:
error_patterns[error] = error_patterns.get(error, 0) + 1
# Report common errors
for error, count in sorted(error_patterns.items(), key=lambda x: x[1], reverse=True):
print(f"Error: {error} (occurred {count} times)")
Future Enhancements
The optimized schema enables future features:
- Web Dashboard: Real-time workflow monitoring
- Admin CLI: Command-line workflow management
- Alerting: Automated notifications for failures
- Analytics: Performance trend analysis
- Workflow Templates: Reusable pipeline configurations
Troubleshooting
Common Issues
- Migration Failures: Check database permissions and disk space
- Performance Issues: Verify indexes are created and being used
- Memory Usage: Monitor JSON blob sizes in context and output fields
Debug Queries
-- Check index usage
SELECT * FROM sqlite_master WHERE type = 'index' AND tbl_name = 'workflow_state';
-- Check table size
SELECT COUNT(*) as total_rows FROM workflow_state;
-- Check for data integrity issues
SELECT run_id, status FROM workflow_state WHERE status NOT IN ('running', 'paused', 'completed', 'failed', 'cancelled');