Task Scheduler
Daebus includes a built-in task scheduler that allows you to run recurring background tasks at specified intervals. This feature is useful for periodic operations like health checks, cleanup jobs, data synchronization, or any task that needs to run on a regular schedule.
Basic Usage
Section titled “Basic Usage”Scheduling a Task
Section titled “Scheduling a Task”Use the @app.background decorator to schedule a recurring task:
from daebus import Daebus, direct_logger
app = Daebus(__name__)
@app.background("health_check", 60) # Name "health_check", run every 60 secondsdef health_check(): try: # Perform health check operations status = check_system_health() direct_logger.info(f"Health check completed: {status}") except Exception as e: direct_logger.error(f"Health check failed: {e}")
if __name__ == "__main__": app.run(service="monitor_service")The @app.background decorator takes two required parameters:
name: A unique identifier for the taskinterval: How often to run the task, in seconds
Important Considerations
Section titled “Important Considerations”Thread Safety
Section titled “Thread Safety”Background tasks run in separate threads from the main service and other tasks. This means you should:
- Use
direct_loggerinstead ofloggerfor logging - Be careful with shared resources
- Implement proper synchronization for shared state
import threading
app = Daebus(__name__)
# Shared counter with lock for thread safetyrequest_counter = 0counter_lock = threading.Lock()
@app.background("metrics_reporter", 300) # Run every 5 minutesdef report_metrics(): global request_counter
# Thread-safe access to shared state with counter_lock: current_count = request_counter # Optionally reset counter request_counter = 0
# Report metrics direct_logger.info(f"Processed {current_count} requests in the last 5 minutes")Error Handling
Section titled “Error Handling”Always include error handling in background tasks:
@app.background("cleanup", 3600) # Run hourlydef cleanup_old_data(): try: # Perform cleanup operations deleted = delete_old_records() direct_logger.info(f"Cleanup completed: {deleted} records removed") except Exception as e: direct_logger.error(f"Cleanup failed: {e}") # Prevent the error from stopping the task schedulerIf exceptions are not caught, they will be logged by Daebus, but it’s better to handle them explicitly within your task function.
Advanced Usage
Section titled “Advanced Usage”Dynamic Intervals
Section titled “Dynamic Intervals”If you need to change the task interval dynamically, you can use the scheduler’s API:
app = Daebus(__name__)
# Store reference to the task for later modificationcleanup_task = None
@app.background("adaptive_cleanup", 3600) # Start with hourlydef cleanup_job(): # Task implementation... pass
# Store the task referencecleanup_task = app.background_tasks["adaptive_cleanup"]
@app.action("adjust_schedule")def adjust_cleanup_interval(): global cleanup_task
# Get requested interval from the payload new_interval = request.payload.get("interval", 3600)
# Update the task interval if cleanup_task: cleanup_task.interval = new_interval return response.send({"updated": True, "new_interval": new_interval}) else: return response.error("Task not found")One-time Tasks
Section titled “One-time Tasks”For one-time delayed execution, you can use Python’s threading.Timer:
import threading
@app.action("schedule_one_time")def schedule_one_time_task(): delay = request.payload.get("delay", 60) # Default 60 seconds
# Create a one-time delayed task def delayed_task(): try: # Task implementation direct_logger.info("One-time task executed") except Exception as e: direct_logger.error(f"One-time task failed: {e}")
# Schedule the task timer = threading.Timer(delay, delayed_task) timer.daemon = True # Allow app to exit even if task is pending timer.start()
return response.send({"scheduled": True, "delay": delay})Practical Examples
Section titled “Practical Examples”Periodic Data Refresh
Section titled “Periodic Data Refresh”from daebus import Daebus, broadcast, direct_loggerimport time
app = Daebus(__name__)
# Cache for datacached_data = {}last_update = 0
@app.background("data_refresh", 300) # Refresh every 5 minutesdef refresh_data(): global cached_data, last_update
try: # Fetch fresh data from source fresh_data = fetch_data_from_source()
# Update cache cached_data = fresh_data last_update = time.time()
# Broadcast data update event broadcast.send("data_events", { "type": "data_refreshed", "timestamp": last_update })
direct_logger.info("Data cache refreshed successfully") except Exception as e: direct_logger.error(f"Failed to refresh data: {e}")
# Action to access cached data@app.action("get_data")def get_data(): return response.send({ "data": cached_data, "last_updated": last_update })Resource Monitoring
Section titled “Resource Monitoring”import psutilimport time
app = Daebus(__name__)
@app.background("system_monitor", 30) # Check every 30 secondsdef monitor_system(): try: # Collect system metrics metrics = { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "disk_percent": psutil.disk_usage('/').percent, "timestamp": time.time() }
# Log high usage alerts if metrics["cpu_percent"] > 90: direct_logger.warning(f"High CPU usage: {metrics['cpu_percent']}%")
if metrics["memory_percent"] > 85: direct_logger.warning(f"High memory usage: {metrics['memory_percent']}%")
# Store metrics or broadcast to monitoring channel broadcast.send("monitoring", metrics)
except Exception as e: direct_logger.error(f"Monitoring error: {e}")Cleanup Jobs
Section titled “Cleanup Jobs”import datetime
app = Daebus(__name__)
@app.background("db_cleanup", 86400) # Run daily (24 hours)def database_cleanup(): try: # Calculate cutoff date (e.g., 30 days ago) cutoff_date = datetime.datetime.now() - datetime.timedelta(days=30)
# Delete old logs deleted_logs = delete_old_logs(cutoff_date) direct_logger.info(f"Deleted {deleted_logs} old log entries")
# Delete old temporary files deleted_files = cleanup_temp_files(cutoff_date) direct_logger.info(f"Deleted {deleted_files} temporary files")
# Optimize database optimize_database() direct_logger.info("Database optimization completed")
except Exception as e: direct_logger.error(f"Database cleanup failed: {e}")Best Practices
Section titled “Best Practices”- Keep Tasks Light: Avoid long-running operations that could block other tasks
- Use Thread-Safe Logging: Always use
direct_loggerinstead oflogger - Handle Exceptions: Catch and handle exceptions to prevent task termination
- Consider Resource Usage: Be mindful of CPU and memory usage, especially for frequent tasks
- Add Monitoring: Log task execution times and results to monitor performance
- Use Appropriate Intervals: Balance freshness requirements with system load
Troubleshooting
Section titled “Troubleshooting”Task Not Running
Section titled “Task Not Running”If a scheduled task doesn’t seem to be running:
- Check logs for any errors in the task function
- Verify the interval is what you expect (in seconds)
- Ensure the task doesn’t exit prematurely due to an uncaught exception
High CPU Usage
Section titled “High CPU Usage”If scheduled tasks are causing high CPU usage:
- Increase the interval between task runs
- Optimize the task implementation
- Consider using fewer background tasks
Memory Leaks
Section titled “Memory Leaks”If you notice memory growth over time:
- Check for resources not being properly cleaned up in your tasks
- Ensure you’re not accumulating data in global variables
- Profile your application to identify memory leaks
Further reading
Section titled “Further reading”- Read about how-to guides in the Diátaxis framework