Python schedule Library Guide: Lightweight Cron for Python
The schedule library is the simplest way to run recurring tasks in Python — no broker, no worker process, no cron daemon. Just pip install schedule and write schedule.every().hour.do(job). This guide covers everything from basic usage to production patterns.
Table of Contents
1. Quick Start
pip install schedule
import schedule
import time
def send_report():
print("Sending daily report...")
# your logic here
def health_check():
print("Health check OK")
# Schedule jobs
schedule.every().day.at("09:00").do(send_report)
schedule.every(10).minutes.do(health_check)
# Run loop
while True:
schedule.run_pending()
time.sleep(1)run_pending() checks whether any scheduled job should run now and executes it synchronously. The time.sleep(1) loop keeps CPU usage near zero between checks.
2. Scheduling Syntax Reference
# Interval-based
schedule.every(10).seconds.do(job) # every 10 seconds
schedule.every(5).minutes.do(job) # every 5 minutes
schedule.every(2).hours.do(job) # every 2 hours
schedule.every().minute.do(job) # every 1 minute (shorthand)
schedule.every().hour.do(job) # every 1 hour
schedule.every().day.do(job) # every 1 day (at midnight)
schedule.every().week.do(job) # every 7 days
# Time-of-day specific
schedule.every().day.at("10:30").do(job) # daily at 10:30 AM
schedule.every().day.at("22:00").do(job) # daily at 10 PM
schedule.every().hour.at(":30").do(job) # at the 30th minute of every hour
# Day-of-week specific
schedule.every().monday.do(job) # every Monday at midnight
schedule.every().wednesday.at("13:15").do(job) # every Wednesday 1:15 PM
schedule.every().friday.at("18:00").do(job) # every Friday 6 PM
# All day-of-week options:
# .monday .tuesday .wednesday .thursday .friday .saturday .sunday
# Pass arguments to the job function
schedule.every().hour.do(send_email, to="admin@example.com", subject="Report")
# Store the job reference to cancel later
job = schedule.every(5).minutes.do(health_check)
schedule.cancel_job(job)3. Cron Expression Mapping
The schedulelibrary does not accept cron syntax. Here's how common cron patterns translate:
| Cron | schedule equivalent | Description |
|---|---|---|
| * * * * * | every().minute.do(job) | Every minute |
| */5 * * * * | every(5).minutes.do(job) | Every 5 minutes |
| 0 * * * * | every().hour.at(':00').do(job) | Every hour at :00 |
| 30 * * * * | every().hour.at(':30').do(job) | Every hour at :30 |
| 0 9 * * * | every().day.at('09:00').do(job) | Daily at 9 AM |
| 0 9 * * 1-5 | every().monday…friday.at('09:00').do(job)* | Weekdays at 9 AM |
| 0 0 * * 0 | every().sunday.at('00:00').do(job) | Every Sunday midnight |
| 0 1 1 * * | — (not supported directly) | 1st of month at 1 AM |
import datetime
def weekday_only(job_func):
"""Wrapper: only run the job on weekdays."""
def wrapper():
if datetime.datetime.now().weekday() < 5: # 0=Mon, 4=Fri
job_func()
return wrapper
schedule.every().day.at("09:00").do(weekday_only(send_report))
# For monthly jobs (1st of month):
def monthly_job():
if datetime.datetime.now().day == 1:
do_monthly_task()
schedule.every().day.at("01:00").do(monthly_job)4. Running in a Background Thread
The blocking while True loop is fine for dedicated scripts, but when embedding scheduler in a web app or service, run it in a background thread.
import schedule
import threading
import time
def run_scheduler():
"""Background thread target for the scheduler loop."""
while True:
schedule.run_pending()
time.sleep(1)
def start_background_scheduler():
t = threading.Thread(target=run_scheduler, daemon=True)
t.start()
return t
# Register jobs
schedule.every().day.at("09:00").do(send_report)
schedule.every(10).minutes.do(health_check)
# Start background thread
scheduler_thread = start_background_scheduler()
# Your main application continues...
# e.g., Flask app.run() or asyncio event loopthreading.Thread inside the job function.5. Error Handling & Retries
By default, if a job raises an exception, it bubbles up through run_pending() and can crash your scheduler loop. Always wrap jobs defensively.
import schedule
import time
import logging
import functools
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def catch_exceptions(cancel_on_failure=False):
"""Decorator: log exceptions, optionally cancel job on failure."""
def decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
logger.exception(f"Job {job_func.__name__} failed: {e}")
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return decorator
@catch_exceptions(cancel_on_failure=False)
def flaky_job():
# This job may fail — exception is logged, loop continues
result = fetch_data_from_api()
process(result)
@catch_exceptions(cancel_on_failure=True)
def one_shot_job():
# Returns CancelJob on failure — job is removed from schedule
run_migration()
schedule.every(5).minutes.do(flaky_job)
schedule.every().day.at("02:00").do(one_shot_job)
while True:
schedule.run_pending()
time.sleep(1)7. Job Decorators
Wrap jobs with decorators to add timing, logging, or retry logic without cluttering your job functions.
import time
import logging
import functools
logger = logging.getLogger(__name__)
def timed(job_func):
"""Log job duration."""
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = job_func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"{job_func.__name__} completed in {elapsed:.2f}s")
return result
return wrapper
def retry(max_retries=3, delay_seconds=5):
"""Retry a job up to max_retries times."""
def decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
return job_func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
logger.error(f"{job_func.__name__} failed after {max_retries} retries: {e}")
raise
logger.warning(f"{job_func.__name__} attempt {attempt} failed, retrying in {delay_seconds}s")
time.sleep(delay_seconds)
return wrapper
return decorator
@timed
@retry(max_retries=3, delay_seconds=10)
def sync_data():
response = requests.get("https://api.example.com/data", timeout=30)
response.raise_for_status()
save_to_db(response.json())
schedule.every(15).minutes.do(sync_data)8. schedule vs. Celery vs. cron
| Feature | schedule | Celery Beat | cron |
|---|---|---|---|
| Cron expression syntax | ✗ (fluent API only) | ✓ crontab() | ✓ native |
| Sub-minute intervals | ✓ every().seconds | ✓ timedelta | ✗ (1 min min) |
| In-process scheduling | ✓ (same process) | ✗ (separate worker) | ✗ (OS process) |
| Distributed tasks | ✗ | ✓ | ✗ |
| Missed run handling | ✗ (skipped) | ✓ redis/db tracking | ✗ (skipped) |
| Dependencies (pip) | 0 deps | redis/rabbitmq needed | 0 deps |
| Best for | Scripts, bots, simple services | Distributed task queues | OS-level daemon jobs |
Use schedule when…
- •Building scripts, bots, or single-process services
- •You want zero infrastructure (no Redis/RabbitMQ)
- •Tasks are lightweight and run in-process
- •Celery is overkill for your use case
Use Celery when…
- •You need distributed task execution
- •Tasks must survive worker restarts (persistence)
- •You need task result tracking and monitoring
- •You already use Redis or RabbitMQ
Use cron when…
- •Tasks are OS-level or shell commands
- •You want task scheduling outside the app
- •Process isolation per run is desirable
- •The app is stateless/serverless (Lambda)
9. Production Best Practices
Run the scheduler loop in a dedicated thread or process
Don't mix the scheduler loop with your web framework's main thread. Use a daemon thread (daemon=True so it exits with the main process), a separate subprocess, or a supervisor-managed process.
Use UTC times and convert explicitly
schedule uses the system's local clock. If your server runs UTC (common in cloud), your at('09:00') calls are already UTC. If not, convert: at(f'{(datetime.time(9, 0) - utc_offset):%H:%M}'). Better yet, keep servers on UTC.
Keep job functions fast — offload heavy work
run_pending() is synchronous. If a job takes 30 seconds, no other jobs run during that time. For slow jobs, use a thread inside the job function: threading.Thread(target=heavy_work, daemon=True).start()
Log every job start and end
Add structured logging at job boundaries: job name, start time, end time, duration, success/failure, and key metrics. This is your observability layer — without it, debugging missed or slow jobs is very difficult.
Handle job idempotency for retries
If your retry decorator re-runs a partially-completed job, it must be safe. Check-then-skip pattern: if record_exists(): return before doing work. This prevents duplicate side effects on retry.
For high-reliability production: consider Celery or APScheduler
The schedule library has no persistence — if the process restarts, no missed runs are caught. For critical jobs (billing, backups), use APScheduler with a SQLAlchemy job store, or Celery Beat with Redis/database-backed schedule.
Also using Linux cron or platform schedulers?
Use CronExpression.tools to validate cron expressions, explain them in plain English, and see the next 10 run times with timezone support.