WebSockets
Daebus includes a WebSocket server that enables real-time, bidirectional communication between your services and clients. WebSockets are ideal for applications requiring live updates, chat functionality, notifications, or any feature needing continuous data exchange.
Getting Started
Section titled âGetting StartedâSetting Up WebSockets
Section titled âSetting Up WebSocketsâTo add WebSocket support to your Daebus application:
from daebus import Daebus, DaebusHttp, DaebusWebSocket
app = Daebus(__name__)
# First, set up HTTP serverhttp = DaebusHttp(port=8080)app.attach(http)
# Then, add WebSocket supportwebsocket = DaebusWebSocket() # Will automatically use port 8081 (HTTP port + 1)app.attach(websocket)
# Define message handlers and start the applicationapp.run(service="realtime_service")đ Port Configuration
Section titled âđ Port ConfigurationâImportant: When both HTTP and WebSocket servers are used together, they run on separate ports to prevent conflicts:
- HTTP Server: Runs on the port you specify (e.g.,
8080) - WebSocket Server: Automatically runs on HTTP port + 1 (e.g.,
8081)
The system will log the actual ports being used when you start your application:
INFO [daebus.websocket] WebSocket will use port 8081 (HTTP port + 1)INFO [daebus.websocket] For same-port WebSocket support, connect to:INFO [daebus.websocket] HTTP: http://your-server:8080INFO [daebus.websocket] WebSocket: ws://your-server:8081Port Options
Section titled âPort OptionsâYou have several configuration options:
# Option 1: Automatic port assignment (recommended)http = DaebusHttp(port=8080)websocket = DaebusWebSocket() # Will use port 8081
# Option 2: Explicit WebSocket porthttp = DaebusHttp(port=8080)websocket = DaebusWebSocket(port=9000) # Use port 9000 for WebSocket
# Option 3: WebSocket only (no HTTP)websocket = DaebusWebSocket(port=8080) # Must specify port when no HTTP serverDiscovering the WebSocket Port
Section titled âDiscovering the WebSocket PortâYou can programmatically get the WebSocket port:
# After attaching the WebSocket componentapp.attach(websocket)
# Get the actual port being usedws_port = app.websocket.get_websocket_port()print(f"WebSocket server running on port: {ws_port}")
# Use this in your client connection URLswebsocket_url = f"ws://your-server:{ws_port}"Complete Example: HTTP + WebSocket Setup
Section titled âComplete Example: HTTP + WebSocket SetupâHereâs a complete example showing both servers with clear port usage:
from daebus import Daebus, DaebusHttp, DaebusWebSocket
app = Daebus(__name__)
# HTTP server on port 5000http = DaebusHttp(port=5000)app.attach(http)
# WebSocket server will automatically use port 5001websocket = DaebusWebSocket()app.attach(websocket)
# HTTP route@app.route("/api/status")def get_status(req): return {"status": "healthy", "websocket_port": app.websocket.get_websocket_port()}
# WebSocket handler@app.socket("ping")def handle_ping(data, client_id): return {"pong": True}
if __name__ == "__main__": print("Starting servers:") print(" HTTP: http://localhost:5000") print(" WebSocket: ws://localhost:5001") app.run(service="my_service")Client connections:
// HTTP API callsfetch('http://localhost:5000/api/status')
// WebSocket connectionconst socket = new WebSocket('ws://localhost:5001');Note: If you try to use the same port for both HTTP and WebSocket, the system will automatically resolve the conflict by moving WebSocket to the next available port.
Message Handlers
Section titled âMessage HandlersâUnderstanding Handler Signatures
Section titled âUnderstanding Handler SignaturesâImportant: WebSocket message handlers in Daebus use a specific signature that differs from some other WebSocket libraries:
@app.socket("message_type")def handler(data, client_id): # data: Contents of the 'data' field from the client message # client_id: Unique identifier for the WebSocket connection passWhat the client sends vs. what your handler receives:
// Client sends this complete message:{ "type": "chat_message", // Used to route to the correct handler "data": { // This object becomes the 'data' parameter "message": "Hello!", "room": "general" }}# Your handler receives:@app.socket("chat_message") # â Matches the 'type' fielddef handle_chat(data, client_id): # data = {"message": "Hello!", "room": "general"} # client_id = "user_abc123..." (unique session ID)
message = data.get("message") # â Direct access to message data room = data.get("room")Handling Message Types
Section titled âHandling Message TypesâUse the @app.socket() decorator to handle specific message types:
@app.socket("chat_message")def handle_chat(data, client_id): """Handle incoming chat messages""" message = data.get("message", "") sender = data.get("sender", "Anonymous")
# Log the message logger.info(f"Received chat message from {sender} (client {client_id}): {message}")
# Broadcast to all clients app.websocket.broadcast_to_all({ "sender": sender, "message": message, "timestamp": time.time() }, message_type="chat_update")
# Return acknowledgment to the sender return { "status": "delivered", "timestamp": time.time() }The handler function receives two parameters:
data: The contents of thedatafield from the clientâs messageclient_id: The clientâs session ID (a unique identifier for the connection)
Connection Events
Section titled âConnection EventsâHandle client connections and disconnections:
@app.socket_connect()def on_connect(data, client_id): """Handle new client connection""" logger.info(f"Client {client_id} connected")
# You can return data that will be sent to the client return { "status": "connected", "client_id": client_id, "server_time": time.time() }
@app.socket_disconnect()def on_disconnect(data, client_id): """Handle client disconnection""" logger.info(f"Client {client_id} disconnected")
# Clean up any client-specific resources if client_id in user_sessions: del user_sessions[client_id]Client Registration
Section titled âClient RegistrationâHandle client registration with custom data:
@app.socket_register()def on_register(data, client_id): """Handle client registration""" user_data = data.get("user", {}) username = user_data.get("username", f"Guest-{client_id[:8]}")
# Store the user information user_sessions[client_id] = { "username": username, "registered_at": time.time(), "is_active": True }
logger.info(f"Client {client_id} registered as {username}")
# Notify others about the new user app.websocket.broadcast_to_all({ "user": username, "action": "joined" }, message_type="user_update")
return { "status": "registered", "username": username }Sending Messages
Section titled âSending MessagesâResponse to Current Client
Section titled âResponse to Current ClientâSend a response to the client who sent the message:
@app.socket("get_data")def handle_data_request(data, client_id): data_id = data.get("id")
try: # Fetch the requested data result = fetch_data(data_id)
# Return the data directly - this will be sent to the client return { "data": result, "timestamp": time.time() } except Exception as e: # Return an error response return { "error": str(e), "status": "error" }Send to a Specific Client
Section titled âSend to a Specific ClientâSend a message to any connected client:
@app.action("notify_user")def send_notification(): user_id = request.payload.get("user_id") message = request.payload.get("message")
# Find the client ID for this user client_id = find_client_for_user(user_id)
if not client_id: return response.error(f"User {user_id} not connected")
# Send a message to the specific client success = app.websocket.send_to_client( client_id, { "message": message, "timestamp": time.time() }, message_type="notification" )
return response.send({ "delivered": success, "client_id": client_id })Broadcasting to All Clients
Section titled âBroadcasting to All ClientsâSend a message to all connected clients:
@app.background("system_status", 60) # Every minutedef broadcast_status(): try: # Collect system metrics metrics = { "cpu": get_cpu_usage(), "memory": get_memory_usage(), "active_users": len(user_sessions), "timestamp": time.time() }
# Broadcast to all connected clients recipients = app.websocket.broadcast_to_all( metrics, message_type="system_status" )
direct_logger.info(f"Status broadcast sent to {recipients} clients") except Exception as e: direct_logger.error(f"Error broadcasting status: {e}")Client Management
Section titled âClient ManagementâGetting Connected Clients
Section titled âGetting Connected ClientsâAccess information about connected clients:
@app.action("get_connected_clients")def get_clients(): # Get a list of all connected client IDs client_ids = list(app.websocket.clients.keys())
# Get more detailed information about each client client_info = {} for cid in client_ids: metadata = app.websocket.get_client_metadata(cid) client_info[cid] = { "connected_at": metadata.get("connected_at"), "remote_address": metadata.get("remote_address"), "messages_received": metadata.get("messages_received", 0), "is_authenticated": cid in user_sessions }
return response.send({ "count": len(client_ids), "clients": client_info })Filtering Clients
Section titled âFiltering ClientsâFind clients matching specific criteria:
@app.action("find_inactive_clients")def find_inactive_clients(): # Get clients inactive for more than 30 minutes threshold = time.time() - (30 * 60)
inactive_clients = app.websocket.get_clients_by_filter( lambda _, meta: meta.get("last_activity", 0) < threshold )
return response.send({ "count": len(inactive_clients), "clients": inactive_clients })Disconnecting Clients
Section titled âDisconnecting ClientsâForce disconnect a client:
@app.action("kick_client")def disconnect_client(): client_id = request.payload.get("client_id") reason = request.payload.get("reason", "Disconnected by administrator")
if not client_id: return response.error("No client_id provided")
# Send a message to the client before disconnecting app.websocket.send_to_client( client_id, { "reason": reason, "timestamp": time.time() }, message_type="disconnect_notice" )
# Disconnect the client success = app.websocket.disconnect_client(client_id)
return response.send({ "success": success, "client_id": client_id })Working with Message Data
Section titled âWorking with Message DataâWebSocket handlers receive the message data directly from the clientâs data field:
@app.socket("example_message")def handle_example(data, client_id): # Direct access to the message data (from the 'data' field in the client message) username = data.get("username", "Anonymous") action = data.get("action", "view")
# The client_id parameter provides the unique identifier for this connection logger.info(f"Processing {action} request from {username} (client: {client_id})")
# If you need access to the full request context, use the request proxy from daebus.modules.context import request message_type = request.message_type # The 'type' field from the client message websocket_connection = request.websocket # The underlying WebSocket connection
# Process the message... return {"status": "processed"}Client Message Structure:
// Client sends this structure{ "type": "example_message", // Determines which handler is called "data": { // This object is passed as 'data' parameter "username": "JohnDoe", "action": "view" }}When you need the full request context:
If you need access to the complete message structure, WebSocket connection, or other request details, use the request proxy:
@app.socket("advanced_handler")def handle_advanced(data, client_id): # Access message data directly (recommended for most cases) username = data.get("username")
# Access full request context when needed from daebus.modules.context import request
message_type = request.message_type # The 'type' field from client full_payload = request.payload # Complete client message websocket_conn = request.websocket # Raw WebSocket connection
# Access client metadata metadata = app.websocket.get_client_metadata(client_id) connected_at = metadata.get("connected_at")
return {"processed": True}Important Notes:
- Return values: Anything you return from a handler is automatically sent to the client as a response message
- Async handlers: You can make handlers async if you need to perform async operations:
@app.socket("async_operation")async def handle_async(data, client_id):result = await some_async_operation()return {"result": result}
- Error handling: Exceptions in handlers are caught and sent as error messages to the client
- No return value: If your handler doesnât return anything, no response is sent (useful for fire-and-forget messages)
Client-Side Implementation
Section titled âClient-Side ImplementationâHereâs a basic JavaScript client implementation:
// Connect to the WebSocket server// Note: WebSocket uses port 8081 (HTTP port 8080 + 1)const socket = new WebSocket('ws://localhost:8081');
// Handle connection opensocket.onopen = (event) => { console.log('Connected to server');
// Register with the server socket.send(JSON.stringify({ type: 'register', data: { user: { username: 'JohnDoe' } } }));};
// Handle incoming messagessocket.onmessage = (event) => { const message = JSON.parse(event.data); console.log('Received:', message);
// Handle different message types switch(message.type) { case 'chat_update': displayChatMessage(message.data); break; case 'notification': showNotification(message.data); break; case 'system_status': updateDashboard(message.data); break; }};
// Send a chat messagefunction sendChatMessage(text) { socket.send(JSON.stringify({ type: 'chat_message', data: { message: text, sender: 'JohnDoe' } }));}
// Handle connection closesocket.onclose = (event) => { console.log('Disconnected from server:', event.reason);};
// Handle errorssocket.onerror = (error) => { console.error('WebSocket error:', error);};Advanced Features
Section titled âAdvanced FeaturesâRate Limiting
Section titled âRate LimitingâEnable rate limiting to prevent abuse:
# Set up rate limiting when creating the WebSocket componentwebsocket = DaebusWebSocket()websocket.enable_rate_limiting( max_messages=60, # Maximum messages per minute window_seconds=60 # Time window for counting messages)app.attach(websocket)Custom Client ID Generation
Section titled âCustom Client ID GenerationâCustomize how client IDs are generated:
def custom_id_generator(websocket, path): """Generate custom client IDs based on IP and timestamp""" client_ip = websocket.remote_address[0] if hasattr(websocket, 'remote_address') else 'unknown' timestamp = int(time.time()) return f"client_{client_ip}_{timestamp}"
# Set the custom ID generatorwebsocket.set_client_id_generator(custom_id_generator)Graceful Shutdown
Section titled âGraceful ShutdownâImplement a graceful shutdown for the WebSocket server:
@app.action("shutdown")def graceful_shutdown(): # Prepare a shutdown message shutdown_message = { "message": "Server is shutting down for maintenance", "expected_downtime": "10 minutes", "reconnect": False }
# Perform graceful shutdown app.websocket.graceful_shutdown( timeout=5.0, # Wait 5 seconds after sending the message message=shutdown_message )
# Continue with other shutdown operations return response.send({"shutdown_initiated": True})Organizing with Blueprints
Section titled âOrganizing with BlueprintsâUse blueprints to organize WebSocket handlers:
from daebus import Daebus, DaebusHttp, DaebusWebSocket, Blueprint
# Global storage for the blueprint examplechat_rooms = {}authenticated_users = {}
# Create a blueprint for chat functionalitychat_bp = Blueprint("chat")
@chat_bp.socket("send_message")def handle_chat_message(data, client_id): # Chat message handling logic return {"received": True}
@chat_bp.socket("join_room")def handle_join_room(data, client_id): # Room joining logic return {"joined": True}
# Create another blueprint for user managementuser_bp = Blueprint("users")
@user_bp.socket_connect()def handle_connect(data, client_id): # Connection handling return {"welcome": True}
# Create the application and attach componentsapp = Daebus(__name__)# HTTP server on port 8080, WebSocket will use port 8081http = DaebusHttp(port=8080)websocket = DaebusWebSocket()
app.attach(http)app.attach(websocket)
# Register the blueprintsapp.register_blueprint(chat_bp)app.register_blueprint(user_bp)
# Run the applicationapp.run(service="chat_service")Security Considerations
Section titled âSecurity ConsiderationsâAuthentication
Section titled âAuthenticationâImplement authentication for WebSocket connections:
@app.socket_connect()def on_connect(data, client_id): # Extract authentication token from the connection request # You can access the full request context using the request proxy from daebus.modules.context import request token = extract_token_from_request(request)
if not token or not validate_token(token): # Return False to reject the connection return False
# Store authenticated user information user_id = get_user_id_from_token(token)
# Store user info in your own session storage authenticated_users[client_id] = { "user_id": user_id, "authenticated_at": time.time() }
logger.info(f"Authenticated connection from user {user_id}") return {"authenticated": True, "user_id": user_id}Input Validation
Section titled âInput ValidationâAlways validate incoming messages:
@app.socket("update_profile")def handle_profile_update(data, client_id): # Get user data profile_data = data.get("profile", {})
# Validate required fields if not profile_data.get("name"): return {"error": "Name is required", "status": "error"}
# Validate data types if "age" in profile_data and not isinstance(profile_data["age"], int): return {"error": "Age must be a number", "status": "error"}
# Sanitize input (example) if "bio" in profile_data: profile_data["bio"] = sanitize_html(profile_data["bio"])
# Process the valid data # ...
return {"status": "updated"}Complete Example
Section titled âComplete ExampleâHereâs a complete example of a chat application using WebSockets:
from daebus import Daebus, DaebusHttp, DaebusWebSocket, direct_loggerimport timeimport threading
app = Daebus(__name__)
# Set up HTTP and WebSocket servers# HTTP on port 8080, WebSocket will automatically use port 8081http = DaebusHttp(port=8080)websocket = DaebusWebSocket()
app.attach(http)app.attach(websocket)
# Thread-safe storage for chat rooms and usersrooms = {}rooms_lock = threading.Lock()users = {}users_lock = threading.Lock()
# Connection handler@app.socket_connect()def on_connect(data, client_id): direct_logger.info(f"Client connected: {client_id}") return {"status": "connected", "client_id": client_id}
# Disconnection handler@app.socket_disconnect()def on_disconnect(data, client_id): # Remove user from rooms with rooms_lock: for room_name, room in list(rooms.items()): if client_id in room["members"]: room["members"].remove(client_id)
# Notify others in the room if room["members"]: app.websocket.broadcast_to_clients( room["members"], { "user": users.get(client_id, {}).get("username", "Anonymous"), "action": "left", "room": room_name }, message_type="room_update" )
# Remove user with users_lock: if client_id in users: del users[client_id]
direct_logger.info(f"Client disconnected: {client_id}")
# User registration@app.socket("register")def register_user(data, client_id): username = data.get("username")
if not username: return {"error": "Username is required", "status": "error"}
# Store user information with users_lock: users[client_id] = { "username": username, "registered_at": time.time(), "rooms": [] }
direct_logger.info(f"User registered: {username} ({client_id})")
return { "status": "registered", "username": username, "available_rooms": list(rooms.keys()) }
# Create or join room@app.socket("join_room")def join_room(data, client_id): room_name = data.get("room")
if not room_name: return {"error": "Room name is required", "status": "error"}
# Get username username = users.get(client_id, {}).get("username", "Anonymous")
with rooms_lock: # Create room if it doesn't exist if room_name not in rooms: rooms[room_name] = { "created_at": time.time(), "created_by": client_id, "members": set(), "messages": [] } direct_logger.info(f"Room created: {room_name} by {username}")
# Add user to room rooms[room_name]["members"].add(client_id)
# Add room to user's list with users_lock: if client_id in users and "rooms" in users[client_id]: if room_name not in users[client_id]["rooms"]: users[client_id]["rooms"].append(room_name)
# Notify others in the room with rooms_lock: room = rooms[room_name] others = room["members"] - {client_id}
if others: app.websocket.broadcast_to_clients( list(others), { "user": username, "action": "joined", "room": room_name }, message_type="room_update" )
direct_logger.info(f"User {username} joined room: {room_name}")
# Return room information return { "status": "joined", "room": room_name, "members": len(rooms[room_name]["members"]), "history": rooms[room_name]["messages"][-20:] # Last 20 messages }
# Send message to room@app.socket("chat_message")def send_message(data, client_id): room_name = data.get("room") message = data.get("message", "").strip()
if not room_name or not message: return {"error": "Room and message are required", "status": "error"}
# Check if user is in the room with rooms_lock: if room_name not in rooms: return {"error": "Room does not exist", "status": "error"}
if client_id not in rooms[room_name]["members"]: return {"error": "Not a member of this room", "status": "error"}
# Get username username = users.get(client_id, {}).get("username", "Anonymous")
# Create message object msg = { "id": f"msg_{time.time()}_{client_id[:8]}", "room": room_name, "sender": username, "text": message, "timestamp": time.time() }
# Add to room history with rooms_lock: rooms[room_name]["messages"].append(msg) # Keep only last 100 messages if len(rooms[room_name]["messages"]) > 100: rooms[room_name]["messages"] = rooms[room_name]["messages"][-100:]
# Get all members except sender recipients = list(rooms[room_name]["members"] - {client_id})
# Broadcast to other room members if recipients: app.websocket.broadcast_to_clients( recipients, msg, message_type="new_message" )
direct_logger.info(f"Message from {username} in {room_name}: {message[:30]}...")
return { "status": "sent", "message_id": msg["id"], "timestamp": msg["timestamp"] }
# HTTP route to get active rooms@app.route("/api/rooms")def get_rooms(req): with rooms_lock: room_info = [] for name, room in rooms.items(): room_info.append({ "name": name, "members": len(room["members"]), "message_count": len(room["messages"]), "created_at": room["created_at"] })
return response.send({ "count": len(room_info), "rooms": room_info }, 200)
if __name__ == "__main__": print("Starting chat application:") print(" HTTP API: http://localhost:8080") print(" WebSocket: ws://localhost:8081") app.run(service="chat_service")Best Practices
Section titled âBest Practicesâ- Message Structure: Use a consistent message structure across your application
- Error Handling: Always handle connection errors and retries on the client
- Authentication: Implement proper authentication for WebSocket connections
- Validation: Validate all incoming message data
- Performance: Be mindful of broadcasting to large numbers of clients
- Reconnection: Implement reconnection logic on the client side
- Testing: Test with multiple simultaneous connections to ensure scalability
Deprecation Warnings
Section titled âDeprecation WarningsâYou may see deprecation warnings related to the websockets library:
DeprecationWarning: websockets.server.WebSocketServerProtocol is deprecatedDeprecationWarning: websockets.legacy is deprecatedThese warnings are related to the underlying websockets library and do not affect functionality. They will be addressed in future versions of Daebus. You can safely ignore them for now.
Troubleshooting
Section titled âTroubleshootingâConnection Issues
Section titled âConnection IssuesâIf clients canât connect:
- Check the correct port: WebSocket uses HTTP port + 1 (e.g., if HTTP is on 8080, WebSocket is on 8081)
- Verify the WebSocket server is running (check logs for âWebSocket server listening on port Xâ)
- Use the correct WebSocket URL:
ws://your-server:WEBSOCKET_PORT(not the HTTP port) - Check for firewall or proxy issues blocking WebSocket traffic
- Verify HTTP is properly set up if using both services together
Message Handling
Section titled âMessage HandlingâIf messages arenât being processed:
- Verify the message type matches your handler registration
- Check the message format on the client side
- Look for errors in your handler functions
- Ensure the
datafield contains the expected structure
Performance Issues
Section titled âPerformance IssuesâIf you experience performance problems:
- Limit the number of messages sent per second
- Reduce the size of messages
- Use more targeted broadcasting instead of broadcasting to all clients
- Consider splitting clients across multiple server instances