Broadcasting Guide
This guide explains how to use the Laravel-like broadcasting system for real-time events in this FastAPI boilerplate.
Table of Contents
- Introduction
- Server Side Installation
- Configuration
- Defining Broadcast Events
- Authorizing Channels
- Broadcasting Events
- Client Side Installation
- Receiving Broadcasts
- Examples
Introduction
The broadcasting system allows you to broadcast server-side events to connected clients in real-time using WebSockets. It's similar to Laravel's broadcasting system and supports multiple drivers.
Key Features
- Real-time Broadcasting: WebSocket-based real-time communication
- Multiple Drivers: Redis, Pusher, Ably, Log, Null
- Channel Types: Public, Private, Presence channels
- Channel Authorization: Secure access control for private/presence channels
- Event Broadcasting: Broadcast model events and custom events
- Laravel-like API: Familiar broadcasting syntax
Server Side Installation
Dependencies
The required packages are already included in requirements.txt:
redis[hiredis]- For Redis pub/sub broadcastingpython-socketio- WebSocket supportwebsockets- WebSocket protocol support
Configuration
Configure broadcasting in your .env file:
# Broadcasting Configuration
BROADCAST_DRIVER=redis # Options: redis, pusher, ably, log, null
BROADCAST_CONNECTION=default
Configuration
Broadcasting Drivers
Redis (Default):
Pusher Channels:
BROADCAST_DRIVER=pusher
PUSHER_APP_ID=your-app-id
PUSHER_APP_KEY=your-app-key
PUSHER_APP_SECRET=your-app-secret
PUSHER_APP_CLUSTER=mt1
Ably:
Log (Development):
Null (Testing):
Defining Broadcast Events
Creating Broadcast Events
Create event classes that implement ShouldBroadcast:
from app.events.base import ShouldBroadcast
from typing import Dict, Any
class OrderShipped(ShouldBroadcast):
"""Event broadcast when an order is shipped."""
def __init__(self, order):
self.order = order
def broadcast_on(self) -> str:
"""Channel to broadcast on."""
return f"private-order.{self.order.id}"
def broadcast_as(self) -> str:
"""Event name."""
return "OrderShipped"
def broadcast_with(self) -> Dict[str, Any]:
"""Data to broadcast."""
return {
"order_id": self.order.id,
"status": "shipped",
"tracking_number": self.order.tracking_number,
}
Broadcast Name
By default, the event name is the class name. Override broadcast_as() to customize:
Broadcast Data
Override broadcast_with() to customize the data:
def broadcast_with(self) -> Dict[str, Any]:
return {
"order": {
"id": self.order.id,
"total": str(self.order.total),
}
}
Broadcast Queue
Queue broadcasts for async processing:
Broadcast Conditions
Broadcast only when conditions are met:
class OrderShipped(ShouldBroadcast):
def should_broadcast(self) -> bool:
"""Only broadcast if order is actually shipped."""
return self.order.status == "shipped"
Authorizing Channels
Defining Authorization Callbacks
Define channel authorization in app/routes/channels.py:
from app.core.channels import channel
from app.models.user import User
def register_channels():
"""Register channel authorization callbacks."""
# Private user channel - only user can access their own channel
channel().channel('private-user.{id}', lambda user, id: user.id == id)
# Private order channel - user can access if they own the order
channel().channel('private-order.{id}', lambda user, id: user_owns_order(user, id))
# Presence channel - all authenticated users can join
channel().channel('presence-users', lambda user: True)
Channel Classes
For complex authorization logic, use channel classes:
from app.core.channels import channel
class OrderChannel:
"""Channel authorization for orders."""
@staticmethod
def join(user: User, order_id: int) -> bool:
"""User can join if they own the order or are admin."""
from app.models.order import Order
order = Order.get(db, id=order_id)
return order and (order.user_id == user.id or user.is_superuser)
def register_channels():
channel().channel('private-order.{id}', OrderChannel.join)
Broadcasting Events
Broadcasting from Code
from app.core.broadcasting import broadcast
from app.events.user_events import UserCreated
# Broadcast an event
user = User.create(db, obj_in=user_data)
event = UserCreated(user)
broadcast().event(event)
Broadcasting to Specific Channels
# Broadcast to public channel
broadcast().channel('orders').broadcast('OrderCreated', {'order_id': 123})
# Broadcast to private channel
broadcast().private('user.123').broadcast('UserUpdated', {'user_id': 123})
# Broadcast to presence channel
broadcast().presence('chat.room1').broadcast('MessageSent', {'message': 'Hello'})
Broadcasting to Others Only
Exclude the current user from receiving the broadcast:
Customizing the Connection
Use a different broadcasting driver:
Client Side Installation
JavaScript Client
Install a WebSocket client library:
Connecting to WebSocket
// Using native WebSocket
const socket = new WebSocket('ws://localhost:8000/api/v1/broadcasting/ws?token=YOUR_TOKEN');
socket.onopen = () => {
console.log('Connected');
// Subscribe to channel
socket.send(JSON.stringify({
event: 'subscribe',
channel: 'users'
}));
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
if (data.event === 'UserCreated') {
console.log('New user:', data.data);
}
};
Using Laravel Echo (Compatible)
import Echo from 'laravel-echo';
import Pusher from 'pusher-js';
window.Pusher = Pusher;
window.Echo = new Echo({
broadcaster: 'pusher',
key: 'your-app-key',
cluster: 'mt1',
encrypted: true,
wsHost: 'localhost',
wsPort: 8000,
wssPort: 8000,
authEndpoint: 'http://localhost:8000/api/v1/broadcasting/auth',
auth: {
headers: {
Authorization: 'Bearer ' + token
}
}
});
Receiving Broadcasts
Listening for Events
// Listen to public channel
Echo.channel('users')
.listen('UserCreated', (e) => {
console.log('User created:', e);
});
// Listen to private channel
Echo.private('user.123')
.listen('UserUpdated', (e) => {
console.log('User updated:', e);
});
// Listen to presence channel
Echo.join('presence-users')
.here((users) => {
console.log('Users here:', users);
})
.joining((user) => {
console.log('User joined:', user);
})
.leaving((user) => {
console.log('User left:', user);
});
Leaving a Channel
// Leave a channel
Echo.leave('users');
// Leave private channel
Echo.leaveChannel('private-user.123');
Namespaces
Use namespaces to organize events:
class OrderShipped(ShouldBroadcast):
def broadcast_as(self) -> str:
return "App\\Events\\OrderShipped" # Namespaced event name
Examples
Example 1: Broadcasting User Events
from app.events.user_events import UserCreated, UserUpdated
from app.core.broadcasting import broadcast
# When user is created
user = User.create(db, obj_in=user_data)
broadcast().event(UserCreated(user))
# When user is updated
user = User.update(db, db_obj=user, obj_in=user_data)
broadcast().event(UserUpdated(user))
Example 2: Broadcasting Order Events
from app.events.base import ShouldBroadcast
class OrderShipped(ShouldBroadcast):
def __init__(self, order):
self.order = order
def broadcast_on(self) -> str:
return f"private-order.{self.order.id}"
def broadcast_as(self) -> str:
return "OrderShipped"
def broadcast_with(self) -> Dict[str, Any]:
return {
"order_id": self.order.id,
"status": "shipped",
}
# Broadcast when order is shipped
order.status = "shipped"
db.commit()
broadcast().event(OrderShipped(order))
Example 3: Presence Channel
class UserJoined(ShouldBroadcast):
def __init__(self, user):
self.user = user
def broadcast_on(self) -> str:
return "presence-chat.room1"
def broadcast_as(self) -> str:
return "UserJoined"
def broadcast_with(self) -> Dict[str, Any]:
return {
"user": {
"id": self.user.id,
"name": self.user.full_name,
}
}
Example 4: Model Broadcasting
Automatically broadcast when models are created/updated/deleted:
from app.events.base import ShouldBroadcast
from app.models.user import User
class User(ShouldBroadcast):
# Model implementation...
def broadcast_on(self) -> str:
return "users"
def broadcast_as(self) -> str:
return "UserCreated" # or UserUpdated, UserDeleted
def broadcast_with(self) -> Dict[str, Any]:
return {
"id": self.id,
"email": self.email,
}
Best Practices
- Use Private Channels: For user-specific data, use private channels
- Authorize Properly: Always implement proper channel authorization
- Queue Heavy Broadcasts: Use queues for broadcasts that might be slow
- Handle Errors: Always handle WebSocket connection errors
- Reconnect Logic: Implement reconnection logic on client side
- Rate Limiting: Consider rate limiting for client events
Troubleshooting
WebSocket Connection Failed
- Check if WebSocket endpoint is accessible
- Verify authentication token
- Check CORS settings
- Verify Redis is running (for Redis driver)
Channel Authorization Failed
- Check channel authorization callback
- Verify user authentication
- Check channel name format
Events Not Received
- Verify event is being broadcast
- Check if client is subscribed to correct channel
- Verify broadcasting driver is configured correctly
- Check Redis pub/sub connection