Skip to content

SQLite Backend Guide

Overview

The SQLiteBackend is a production-ready state backend for Flujo that provides durable workflow persistence with advanced observability features. It's designed for high-performance, concurrent access patterns and includes comprehensive admin query capabilities.

Key Features

  • Durable Persistence: Workflows survive process restarts and system failures
  • Concurrent Access: Thread-safe operations with proper locking
  • Admin Queries: Built-in observability and monitoring capabilities
  • Automatic Migration: Seamless schema upgrades for existing deployments
  • Performance Optimized: Indexed queries and WAL mode for better concurrency
  • Enhanced Serialization: Support for custom Python types and complex data structures

Basic Usage

Initialization

from flujo.state.backends.sqlite import SQLiteBackend
from pathlib import Path

# Initialize with a database file path
backend = SQLiteBackend(Path("workflow_state.db"))

# The database will be automatically created and initialized on first use

Core Operations

from datetime import datetime

# Save workflow state
now = datetime.utcnow().replace(microsecond=0)
state = {
    "run_id": "run_123",
    "pipeline_id": "data_processing",
    "pipeline_name": "Data Processing Pipeline",
    "pipeline_version": "1.0.0",
    "current_step_index": 2,
    "pipeline_context": {"input_data": "sample.csv", "processed_rows": 1000},
    "last_step_output": {"status": "success", "rows_processed": 1000},
    "status": "running",
    "created_at": now,
    "updated_at": now,
    "total_steps": 5,
    "error_message": None,
    "execution_time_ms": 1500,
    "memory_usage_mb": 45.2
}

await backend.save_state("run_123", state)

# Load workflow state
loaded_state = await backend.load_state("run_123")
print(f"Pipeline: {loaded_state['pipeline_name']}")
print(f"Status: {loaded_state['status']}")
print(f"Current step: {loaded_state['current_step_index']}")

# Delete workflow state
await backend.delete_state("run_123")

Admin Queries and Observability

The SQLiteBackend provides powerful built-in observability features that allow you to monitor and analyze your workflows.

List Workflows

# List all workflows
all_workflows = await backend.list_workflows()
print(f"Total workflows: {len(all_workflows)}")

# Filter by status
running_workflows = await backend.list_workflows(status="running")
failed_workflows = await backend.list_workflows(status="failed")

# Filter by pipeline
pipeline_workflows = await backend.list_workflows(pipeline_id="data_processing")

# Pagination
recent_workflows = await backend.list_workflows(limit=10, offset=0)

Workflow Statistics

# Get comprehensive workflow statistics
stats = await backend.get_workflow_stats()

print(f"Total workflows: {stats['total_workflows']}")
print(f"Status breakdown: {stats['status_counts']}")
print(f"Recent workflows (24h): {stats['recent_workflows_24h']}")
print(f"Average execution time: {stats['average_execution_time_ms']}ms")

# Example output:
# Total workflows: 150
# Status breakdown: {'running': 5, 'completed': 120, 'failed': 15, 'paused': 10}
# Recent workflows (24h): 25
# Average execution time: 1250ms

Failed Workflows Analysis

# Get failed workflows from the last 24 hours
failed_24h = await backend.get_failed_workflows(hours_back=24)

for workflow in failed_24h:
    print(f"Failed workflow: {workflow['run_id']}")
    print(f"  Pipeline: {workflow['pipeline_name']}")
    print(f"  Error: {workflow['error_message']}")
    print(f"  Failed at: {workflow['updated_at']}")

# Get failed workflows from the last week
failed_week = await backend.get_failed_workflows(hours_back=168)

Cleanup Operations

# Clean up workflows older than 30 days
deleted_count = await backend.cleanup_old_workflows(days_old=30)
print(f"Deleted {deleted_count} old workflows")

# Clean up workflows older than 7 days
deleted_count = await backend.cleanup_old_workflows(days_old=7)

Direct SQL Queries

For advanced operational tasks, you can directly query the SQLite database to gain deep insights into your workflow performance and behavior.

Performance Monitoring Queries

-- Get average execution time by pipeline
SELECT
    pipeline_name,
    COUNT(*) as total_runs,
    AVG(execution_time_ms) as avg_execution_time,
    MAX(execution_time_ms) as max_execution_time,
    MIN(execution_time_ms) as min_execution_time
FROM workflow_state
WHERE execution_time_ms IS NOT NULL
GROUP BY pipeline_name
ORDER BY avg_execution_time DESC;

-- Get memory usage statistics
SELECT
    pipeline_name,
    AVG(memory_usage_mb) as avg_memory_usage,
    MAX(memory_usage_mb) as max_memory_usage,
    COUNT(*) as total_runs
FROM workflow_state
WHERE memory_usage_mb IS NOT NULL
GROUP BY pipeline_name;

-- Find slow workflows (execution time > 5 seconds)
SELECT
    run_id,
    pipeline_name,
    execution_time_ms,
    created_at
FROM workflow_state
WHERE execution_time_ms > 5000
ORDER BY execution_time_ms DESC;

Error Analysis Queries

-- Get error patterns
SELECT
    error_message,
    COUNT(*) as error_count,
    pipeline_name
FROM workflow_state
WHERE error_message IS NOT NULL
GROUP BY error_message, pipeline_name
ORDER BY error_count DESC;

-- Find workflows that failed at specific steps
SELECT
    run_id,
    pipeline_name,
    current_step_index,
    error_message,
    updated_at
FROM workflow_state
WHERE status = 'failed'
ORDER BY updated_at DESC;

-- Get failure rate by pipeline
SELECT
    pipeline_name,
    COUNT(*) as total_runs,
    SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_runs,
    ROUND(
        (SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) * 100.0 / COUNT(*)),
        2
    ) as failure_rate_percent
FROM workflow_state
GROUP BY pipeline_name
ORDER BY failure_rate_percent DESC;

Activity Analysis Queries

-- Get workflow activity by hour
SELECT
    strftime('%Y-%m-%d %H:00:00', created_at) as hour,
    COUNT(*) as workflows_started,
    SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
    SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM workflow_state
WHERE created_at >= datetime('now', '-7 days')
GROUP BY hour
ORDER BY hour DESC;

-- Find most active pipelines
SELECT
    pipeline_name,
    COUNT(*) as total_runs,
    COUNT(DISTINCT DATE(created_at)) as active_days
FROM workflow_state
WHERE created_at >= datetime('now', '-30 days')
GROUP BY pipeline_name
ORDER BY total_runs DESC;

-- Get step completion statistics
SELECT
    pipeline_name,
    AVG(current_step_index) as avg_step_progress,
    MAX(current_step_index) as max_steps,
    COUNT(*) as total_workflows
FROM workflow_state
WHERE status = 'running'
GROUP BY pipeline_name;

Performance Considerations

WAL Mode and Concurrency

The SQLiteBackend uses Write-Ahead Logging (WAL) mode by default, which provides:

  • Better Concurrency: Multiple readers can access the database simultaneously
  • Improved Performance: Writes don't block reads
  • Crash Recovery: Automatic recovery from unexpected shutdowns
# The backend automatically enables WAL mode during initialization
backend = SQLiteBackend(Path("workflow_state.db"))
# WAL mode is enabled automatically - no additional configuration needed

Indexed Queries

The backend creates strategic indexes for optimal query performance:

-- These indexes are created automatically
CREATE INDEX idx_workflow_state_status ON workflow_state(status);
CREATE INDEX idx_workflow_state_created_at ON workflow_state(created_at);
CREATE INDEX idx_workflow_state_pipeline_id ON workflow_state(pipeline_id);

Large Dataset Optimization

For databases with thousands of workflows:

# Use pagination for large result sets
workflows = await backend.list_workflows(limit=100, offset=0)

# Filter by specific criteria to reduce result size
recent_failures = await backend.list_workflows(
    status="failed",
    limit=50
)

# Use cleanup to prevent database bloat
await backend.cleanup_old_workflows(days_old=90)

Fault Tolerance and Recovery

Automatic Schema Migration

The backend automatically handles schema upgrades:

# If you upgrade Flujo and the schema changes, the backend will:
# 1. Detect the schema mismatch
# 2. Automatically migrate existing data
# 3. Preserve all workflow state
# 4. Continue normal operation

backend = SQLiteBackend(Path("existing_workflow_state.db"))
# Migration happens automatically on first access

Database Corruption Recovery

The backend includes robust error handling and recovery mechanisms:

# The backend automatically retries operations on database locks
# and handles corruption scenarios gracefully

try:
    state = await backend.load_state("run_123")
except Exception as e:
    # The backend will attempt automatic recovery
    print(f"Recovery attempted: {e}")

Connection Pooling and Retry Logic

# The backend implements intelligent retry logic with exponential backoff
# for database locked errors and schema migration scenarios

# No additional configuration needed - handled automatically
backend = SQLiteBackend(Path("workflow_state.db"))

Security Considerations

SQL Injection Prevention

The SQLiteBackend uses parameterized queries throughout to prevent SQL injection:

# All user inputs are properly parameterized
await backend.list_workflows(status="running")  # Safe
await backend.list_workflows(pipeline_id="user_input")  # Safe

# The backend never constructs SQL queries by string concatenation
# All queries use parameterized statements with proper escaping

Input Validation

# The backend validates all inputs before database operations
# SQL identifiers are validated for problematic characters
# Column definitions are validated against a whitelist

# Example of safe usage:
state = {
    "run_id": "safe_run_id_123",  # Validated
    "pipeline_id": "safe_pipeline_name",  # Validated
    # ... other fields
}
await backend.save_state("run_123", state)

Best Practices

Maintenance

# Regular cleanup to prevent database bloat
import asyncio
from datetime import datetime

async def maintenance_routine():
    backend = SQLiteBackend(Path("workflow_state.db"))

    # Clean up workflows older than 30 days
    deleted = await backend.cleanup_old_workflows(days_old=30)
    print(f"Cleaned up {deleted} old workflows")

    # Get statistics for monitoring
    stats = await backend.get_workflow_stats()
    print(f"Database contains {stats['total_workflows']} workflows")

# Run maintenance weekly
# asyncio.run(maintenance_routine())

Monitoring

# Set up monitoring for your workflows
async def monitor_workflows():
    backend = SQLiteBackend(Path("workflow_state.db"))

    # Check for failed workflows
    failed = await backend.get_failed_workflows(hours_back=1)
    if failed:
        print(f"Alert: {len(failed)} workflows failed in the last hour")
        for wf in failed:
            print(f"  - {wf['run_id']}: {wf['error_message']}")

    # Monitor performance
    stats = await backend.get_workflow_stats()
    if stats['average_execution_time_ms'] > 10000:  # 10 seconds
        print("Alert: Average execution time is high")

    # Check for stuck workflows
    running = await backend.list_workflows(status="running")
    for wf in running:
        # Check if workflow has been running too long
        created = datetime.fromisoformat(wf['created_at'])
        if (datetime.utcnow() - created).total_seconds() > 3600:  # 1 hour
            print(f"Alert: Workflow {wf['run_id']} has been running for over 1 hour")

Backups

import shutil
from pathlib import Path

def backup_database():
    """Create a backup of the workflow database."""
    db_path = Path("workflow_state.db")
    backup_path = Path(f"workflow_state_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db")

    if db_path.exists():
        shutil.copy2(db_path, backup_path)
        print(f"Backup created: {backup_path}")

    # Also backup the WAL file if it exists
    wal_path = db_path.with_suffix('.db-wal')
    if wal_path.exists():
        wal_backup = backup_path.with_suffix('.db-wal')
        shutil.copy2(wal_path, wal_backup)
        print(f"WAL backup created: {wal_backup}")

# Run backups daily
# backup_database()

Configuration

# Use environment variables for configuration
import os
from pathlib import Path

# Set database path via environment variable
db_path = Path(os.getenv("FLUJO_DB_PATH", "flujo_ops.db"))
backend = SQLiteBackend(db_path)

# Or use the configuration system
from flujo.cli.config import load_backend_from_config
backend = load_backend_from_config()

API Reference

SQLiteBackend

class SQLiteBackend(StateBackend):
    """SQLite-backed persistent storage for workflow state with optimized schema."""

    def __init__(self, db_path: Path) -> None:
        """Initialize the backend with a database file path."""

    async def save_state(self, run_id: str, state: Dict[str, Any]) -> None:
        """Save workflow state with enhanced serialization."""

    async def load_state(self, run_id: str) -> Optional[Dict[str, Any]]:
        """Load workflow state with enhanced deserialization."""

    async def delete_state(self, run_id: str) -> None:
        """Delete workflow state."""

    async def list_workflows(
        self,
        status: Optional[str] = None,
        pipeline_id: Optional[str] = None,
        limit: Optional[int] = None,
        offset: int = 0,
    ) -> List[Dict[str, Any]]:
        """List workflows with optional filtering and pagination."""

    async def get_workflow_stats(self) -> Dict[str, Any]:
        """Get comprehensive workflow statistics."""

    async def get_failed_workflows(self, hours_back: int = 24) -> List[Dict[str, Any]]:
        """Get failed workflows from the last N hours."""

    async def cleanup_old_workflows(self, days_old: float = 30) -> int:
        """Clean up workflows older than specified days. Returns number of deleted workflows."""

Troubleshooting

Common Issues

Database is locked errors

# The backend automatically handles database locks with retry logic
# If you see these errors frequently, consider:
# 1. Reducing concurrent access
# 2. Using separate database files for different applications
# 3. Implementing proper connection pooling in your application

High memory usage

# Monitor memory usage with the built-in tracking
stats = await backend.get_workflow_stats()
workflows = await backend.list_workflows()

for wf in workflows:
    if wf.get('memory_usage_mb', 0) > 100:  # 100MB threshold
        print(f"High memory usage: {wf['run_id']} - {wf['memory_usage_mb']}MB")

Slow queries

# Use the provided indexes effectively
# Filter by status or pipeline_id when possible
# Use pagination for large result sets

# Good: Uses index
fast_query = await backend.list_workflows(status="running", limit=10)

# Avoid: No filtering, could be slow with large datasets
slow_query = await backend.list_workflows()  # No limit or filters

Debug Queries

-- Check database size
SELECT
    name,
    page_count * page_size as size_bytes,
    page_count * page_size / 1024.0 / 1024.0 as size_mb
FROM pragma_page_count(), pragma_page_size()
WHERE name = 'workflow_state';

-- Check index usage
SELECT
    name,
    sql
FROM sqlite_master
WHERE type = 'index' AND tbl_name = 'workflow_state';

-- Check for database corruption
PRAGMA integrity_check;

-- Analyze query performance
EXPLAIN QUERY PLAN
SELECT * FROM workflow_state WHERE status = 'running';

Performance Tuning

# For high-throughput applications, consider:
# 1. Using separate database files for different pipeline types
# 2. Implementing application-level caching
# 3. Regular cleanup to maintain optimal performance

# Example: Separate databases by pipeline type
data_processing_backend = SQLiteBackend(Path("data_processing.db"))
ml_training_backend = SQLiteBackend(Path("ml_training.db"))

This comprehensive guide covers all aspects of the SQLiteBackend, from basic usage to advanced operational tasks. The backend is production-ready and provides the observability and reliability needed for serious workflow management.