Background Threads
Daebus provides a built-in mechanism for running long-lived background threads that operate continuously throughout the life of your service. Unlike scheduled tasks that run periodically, background threads run continuously and are ideal for operations like continuous monitoring, processing work queues, or maintaining persistent connections.
Basic Usage
Section titled “Basic Usage”Creating a Background Thread
Section titled “Creating a Background Thread”Use the @app.thread decorator to define a background thread:
from daebus import Daebus, direct_loggerimport time
app = Daebus(__name__)
@app.thread("worker_thread")def continuous_worker(running): # The 'running' parameter is a callable that returns False when the app is stopping direct_logger.info("Worker thread started")
while running(): # This loops until the application exits try: # Do some work process_work_items()
# Sleep briefly to prevent CPU hogging time.sleep(0.1) except Exception as e: direct_logger.error(f"Error in worker thread: {e}")
direct_logger.info("Worker thread stopping")
if __name__ == "__main__": app.run(service="my_service")The background thread will:
- Start automatically when your service starts
- Run continuously until your service stops
- Be properly shut down when your service exits
Thread Control
Section titled “Thread Control”The Running Function
Section titled “The Running Function”Each thread function receives a running parameter, which is a callable that returns:
Truewhile the application is running normallyFalsewhen the application is shutting down
This allows your thread to gracefully exit when the service is stopping:
@app.thread("processor")def data_processor(running): setup_resources()
try: while running(): # Process items as long as the service is running process_next_item() finally: # Clean up resources when the thread is stopping cleanup_resources()Using Thread State
Section titled “Using Thread State”You can maintain state within your thread function:
@app.thread("connection_manager")def manage_connections(running): # Initialize thread-local state connections = {} retry_count = 0
while running(): try: # Monitor and manage connections for host, conn in list(connections.items()): if not conn.is_active(): # Reconnect if needed connections[host] = establish_connection(host) direct_logger.info(f"Reconnected to {host}")
# Check for new hosts to connect new_hosts = discover_hosts() for host in new_hosts: if host not in connections: connections[host] = establish_connection(host) direct_logger.info(f"Connected to new host: {host}")
# Reset retry counter on successful iteration retry_count = 0
time.sleep(5) # Check every 5 seconds
except Exception as e: retry_count += 1 direct_logger.error(f"Connection error (retry {retry_count}): {e}") time.sleep(min(30, retry_count * 5)) # Exponential backoffThread Safety
Section titled “Thread Safety”Accessing Shared Resources
Section titled “Accessing Shared Resources”When accessing resources shared with other threads, always use proper synchronization:
import threading
app = Daebus(__name__)
# Shared resourcesshared_data = {}data_lock = threading.Lock()
@app.thread("data_collector")def collect_data(running): while running(): try: # Collect new data new_data = fetch_external_data()
# Update shared data with thread safety with data_lock: for key, value in new_data.items(): shared_data[key] = value
time.sleep(10) # Collect every 10 seconds except Exception as e: direct_logger.error(f"Data collection error: {e}") time.sleep(30) # Longer sleep on error
@app.action("get_current_data")def get_data(): # Thread-safe access to the shared data with data_lock: current_data = shared_data.copy()
return response.send(current_data)Logging Considerations
Section titled “Logging Considerations”Always use direct_logger instead of the context-based logger in background threads:
@app.thread("monitoring")def monitor_system(running): while running(): try: # This is correct direct_logger.info("System monitoring active")
# This might not work correctly in a background thread # logger.info("System monitoring active")
time.sleep(60) except Exception as e: direct_logger.error(f"Monitoring error: {e}")Practical Examples
Section titled “Practical Examples”Work Queue Processor
Section titled “Work Queue Processor”Process items from a queue continuously:
import queueimport threading
app = Daebus(__name__)
# Create a thread-safe work queuework_queue = queue.Queue()results = {}results_lock = threading.Lock()
@app.thread("queue_processor")def process_queue(running): direct_logger.info("Queue processor started")
while running(): try: # Try to get an item from the queue with a timeout # This allows the thread to check the running() condition regularly try: item = work_queue.get(timeout=1.0) except queue.Empty: continue
# Process the item item_id = item.get("id") result = process_item(item)
# Store the result with results_lock: results[item_id] = result
# Mark the task as done work_queue.task_done() direct_logger.info(f"Processed item {item_id}")
except Exception as e: direct_logger.error(f"Error processing queue item: {e}")
# Action to add items to the queue@app.action("submit_job")def submit_job(): job_data = request.payload job_id = generate_unique_id()
# Add metadata to the job job_data["id"] = job_id job_data["submitted_at"] = time.time()
# Add to the queue work_queue.put(job_data)
return response.send({ "job_id": job_id, "status": "queued" })
# Action to check job results@app.action("check_job")def check_job(): job_id = request.payload.get("job_id")
if not job_id: return response.error("Missing job_id parameter")
with results_lock: result = results.get(job_id)
if result: return response.send({ "job_id": job_id, "status": "completed", "result": result }) else: # Check if job is in queue queue_position = check_queue_position(job_id) if queue_position >= 0: return response.send({ "job_id": job_id, "status": "queued", "position": queue_position }) else: return response.send({ "job_id": job_id, "status": "not_found" })External API Monitor
Section titled “External API Monitor”Monitor external APIs and report status:
import requestsimport time
app = Daebus(__name__)
# API endpoints to monitorendpoints = { "users_api": "https://api.example.com/users/health", "orders_api": "https://api.example.com/orders/health", "inventory_api": "https://api.example.com/inventory/health"}
# Status storageapi_status = {}status_lock = threading.Lock()
@app.thread("api_monitor")def monitor_apis(running): while running(): for api_name, url in endpoints.items(): try: start_time = time.time() response = requests.get(url, timeout=5) latency = time.time() - start_time
status = { "status": "up" if response.status_code == 200 else "degraded", "status_code": response.status_code, "latency_ms": round(latency * 1000, 2), "checked_at": time.time() }
with status_lock: api_status[api_name] = status
if response.status_code != 200: direct_logger.warning( f"API {api_name} returned {response.status_code}" ) except Exception as e: with status_lock: api_status[api_name] = { "status": "down", "error": str(e), "checked_at": time.time() } direct_logger.error(f"Error monitoring {api_name}: {e}")
# Wait before next check cycle time.sleep(30)
@app.action("get_api_status")def get_api_status(): with status_lock: current_status = api_status.copy()
return response.send(current_status)Websocket Client
Section titled “Websocket Client”Maintain a persistent WebSocket connection:
import websocketimport jsonimport threading
app = Daebus(__name__)
# Shared message storagereceived_messages = []messages_lock = threading.Lock()websocket_client = None
@app.thread("websocket_client")def run_websocket_client(running): global websocket_client
# WebSocket event callbacks def on_message(ws, message): try: data = json.loads(message) with messages_lock: received_messages.append(data) # Keep only the last 100 messages if len(received_messages) > 100: received_messages.pop(0)
# Process specific message types if data.get("type") == "alert": direct_logger.warning(f"Alert received: {data.get('message')}") except Exception as e: direct_logger.error(f"Error processing message: {e}")
def on_error(ws, error): direct_logger.error(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg): direct_logger.info("WebSocket connection closed")
def on_open(ws): direct_logger.info("WebSocket connection established") # Send authentication message ws.send(json.dumps({ "type": "auth", "api_key": get_api_key() }))
# Connection loop - keeps trying to connect while the service is running while running(): try: # Create a new WebSocket client url = "wss://api.example.com/stream" ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
# Store reference to client websocket_client = ws
# Start the WebSocket connection (this will block until connection closes) ws.run_forever()
# If we get here, the connection was closed - wait before reconnecting if running(): direct_logger.info("WebSocket disconnected, reconnecting in 5 seconds...") time.sleep(5) except Exception as e: direct_logger.error(f"WebSocket connection error: {e}") time.sleep(10) # Wait before retrying
# Clean shutdown if websocket_client: try: websocket_client.close() except: pass websocket_client = None
@app.action("get_recent_messages")def get_recent_messages(): with messages_lock: messages = received_messages.copy()
return response.send({ "count": len(messages), "messages": messages })
@app.action("send_websocket_message")def send_websocket_message(): global websocket_client
if not websocket_client: return response.error("WebSocket not connected")
message = request.payload.get("message") if not message: return response.error("No message provided")
try: websocket_client.send(json.dumps(message)) return response.send({"sent": True}) except Exception as e: return response.error(f"Failed to send message: {e}")Best Practices
Section titled “Best Practices”- Graceful Shutdown: Always check the
running()condition and exit cleanly - Error Handling: Catch and handle exceptions within the thread
- Resource Management: Clean up resources when the thread exits
- Thread Safety: Use locks or thread-safe data structures for shared resources
- Avoid CPU Hogging: Include sleep intervals in continuous processing loops
- Use Direct Logger: Always use
direct_loggerinstead oflogger - Implement Backoff: Use exponential backoff for retries on failure
Troubleshooting
Section titled “Troubleshooting”Thread Not Running
Section titled “Thread Not Running”If your background thread doesn’t seem to be running:
- Check for exceptions during thread startup in the logs
- Ensure your thread function accepts the
runningparameter - Verify your thread function doesn’t exit prematurely
High CPU Usage
Section titled “High CPU Usage”If your thread is causing high CPU usage:
- Make sure you have sleep intervals in your processing loops
- Check for tight loops without proper delay
- Consider using more efficient algorithms or batched processing
Thread Deadlocks
Section titled “Thread Deadlocks”If your application seems to freeze or deadlock:
- Review your lock usage to ensure you’re not causing deadlocks
- Set timeouts on resource acquisition when possible
- Avoid complex lock hierarchies
Memory Leaks
Section titled “Memory Leaks”If you see memory growth over time:
- Look for data accumulation in global variables or thread-local storage
- Ensure resources like file handles or network connections are being closed
- Check that you’re limiting cached data to reasonable sizes
Further reading
Section titled “Further reading”- Read about how-to guides in the Diátaxis framework