Redis Usage Guide
This guide explains how to use Redis for both caching and message queue (Celery) in this FastAPI boilerplate.
Table of Contents
Redis Configuration
Environment Variables
Configure Redis in your .env file:
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD= # Optional, leave empty if no password
# Cache Configuration
CACHE_PREFIX=cache:
CACHE_DEFAULT_TTL=3600 # Default TTL in seconds (1 hour)
CACHE_SERIALIZER=json # Options: json, pickle
Connection
Redis is used for: - Caching: Store frequently accessed data - Message Queue: Background task processing with Celery
Caching
The boilerplate provides a Laravel-like caching interface through app.core.cache.
Basic Usage
from app.core.cache import cache
# Store a value
cache().put("key", "value", ttl=3600) # TTL in seconds
# Get a value
value = cache().get("key")
value = cache().get("key", default="default_value") # With default
# Check if key exists
if cache().has("key"):
print("Key exists")
# Delete a value
cache().forget("key")
# Clear all cache
cache().flush()
Remember Pattern (Cache-Aside)
The remember method is very useful - it gets from cache or executes a callback:
from app.core.cache import cache
from app.models.user import User
# Get user from cache, or fetch from DB if not cached
user = cache().remember(
f"user:{user_id}",
ttl=300, # 5 minutes
callback=lambda: User.get(db, id=user_id)
)
Advanced Operations
# Store forever (no expiration)
cache().forever("key", "value")
# Add only if key doesn't exist
cache().add("key", "value", ttl=3600)
# Get and delete (pull)
value = cache().pull("key", default=None)
# Increment/Decrement numeric values
cache().increment("counter", amount=1)
cache().decrement("counter", amount=1)
Tagged Cache
Group related cache keys:
# Create tagged cache
tagged = cache().tags("users", "profile")
# Operations use tags automatically
tagged.put("user:1", user_data, ttl=3600)
tagged.get("user:1")
tagged.flush() # Flush all keys with these tags
Real-World Example
from fastapi import Depends
from app.core.cache import cache
from app.core.database import get_db
from app.models.user import User
@router.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
# Try cache first
cache_key = f"user:{user_id}"
user = cache().get(cache_key)
if user is None:
# Cache miss - fetch from database
user = User.get(db, id=user_id)
if user:
# Store in cache for 5 minutes
cache().put(cache_key, user, ttl=300)
return user
@router.put("/users/{user_id}")
def update_user(user_id: int, db: Session = Depends(get_db)):
user = User.update(db, ...)
# Invalidate cache after update
cache().forget(f"user:{user_id}")
return user
Message Queue (Celery)
Celery uses Redis as both the message broker and result backend.
Configuration
Celery automatically uses Redis configuration from your .env:
# Celery will use Redis settings automatically
# Or you can override:
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
Creating Tasks
Tasks are defined in app/workers/tasks.py:
from app.core.celery_app import celery_app
@celery_app.task(name="send_email")
def send_email_task(email: str, subject: str, body: str):
"""Send email asynchronously."""
# Your email sending logic here
print(f"Sending email to {email}")
return {"status": "sent"}
@celery_app.task(name="process_data")
def process_data_task(data_id: int):
"""Process data asynchronously."""
# Your processing logic here
return {"processed": data_id}
Calling Tasks
from app.workers.tasks import send_email_task, process_data_task
# Call task asynchronously
result = send_email_task.delay("user@example.com", "Subject", "Body")
# Get task result (blocking)
task_result = result.get(timeout=10)
# Call task with ETA (execute at specific time)
from datetime import datetime, timedelta
send_email_task.apply_async(
args=["user@example.com", "Subject", "Body"],
eta=datetime.utcnow() + timedelta(hours=1)
)
# Call task with countdown (execute after delay)
send_email_task.apply_async(
args=["user@example.com", "Subject", "Body"],
countdown=300 # Execute after 5 minutes
)
Running Celery Worker
Start the Celery worker:
For development with auto-reload:
Monitoring with Flower
Flower is included for monitoring Celery tasks:
Access at: http://localhost:5555
Example: Background Email Sending
# In your endpoint
from app.workers.tasks import send_welcome_email
@router.post("/register")
def register(user_data: UserCreate):
user = User.create(db, obj_in=user_data)
# Send welcome email in background
send_welcome_email.delay(user.id)
return user
Examples
Example 1: Caching Database Queries
from app.core.cache import cache
from app.models.user import User
def get_active_users(db: Session):
"""Get active users with caching."""
return cache().remember(
"users:active",
ttl=600, # 10 minutes
callback=lambda: User.get_multi(db, skip=0, limit=100)
)
Example 2: Cache Invalidation on Update
@router.put("/users/{user_id}")
def update_user(user_id: int, db: Session = Depends(get_db)):
user = User.update(db, ...)
# Invalidate related cache
cache().forget(f"user:{user_id}")
cache().forget("users:active") # Invalidate list cache
return user
Example 3: Using Cache Tags
# Cache with tags
tagged = cache().tags("users")
tagged.put("profile:1", profile_data, ttl=3600)
tagged.put("settings:1", settings_data, ttl=3600)
# Flush all user-related cache
tagged.flush() # Removes both profile:1 and settings:1
Example 4: Background Task with Retry
from celery import Task
from app.core.celery_app import celery_app
class RetryTask(Task):
"""Task with automatic retry on failure."""
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3, 'countdown': 60}
@celery_app.task(base=RetryTask, name="process_payment")
def process_payment_task(payment_id: int):
"""Process payment with automatic retry."""
# Your payment processing logic
pass
Best Practices
- Cache Keys: Use descriptive, namespaced keys (e.g.,
user:123,users:active) - TTL: Set appropriate TTL based on data freshness requirements
- Cache Invalidation: Always invalidate cache on data updates
- Error Handling: Cache failures shouldn't break your application
- Monitoring: Use Flower to monitor Celery tasks
- Connection Pooling: Redis connection pooling is handled automatically
Troubleshooting
Redis Connection Error
# Check Redis connection
from app.core.cache import cache
try:
cache().put("test", "value")
print("Redis connected!")
except Exception as e:
print(f"Redis error: {e}")
Celery Not Processing Tasks
- Check if Celery worker is running
- Verify Redis connection
- Check Celery logs for errors
- Ensure tasks are imported in
celery_app.py