PythonSchedulingLightweight

Python schedule Library Guide: Lightweight Cron for Python

The schedule library is the simplest way to run recurring tasks in Python — no broker, no worker process, no cron daemon. Just pip install schedule and write schedule.every().hour.do(job). This guide covers everything from basic usage to production patterns.

9 min read·Updated April 2026

1. Quick Start

pip install schedule
import schedule
import time

def send_report():
    print("Sending daily report...")
    # your logic here

def health_check():
    print("Health check OK")

# Schedule jobs
schedule.every().day.at("09:00").do(send_report)
schedule.every(10).minutes.do(health_check)

# Run loop
while True:
    schedule.run_pending()
    time.sleep(1)

run_pending() checks whether any scheduled job should run now and executes it synchronously. The time.sleep(1) loop keeps CPU usage near zero between checks.

2. Scheduling Syntax Reference

# Interval-based
schedule.every(10).seconds.do(job)          # every 10 seconds
schedule.every(5).minutes.do(job)           # every 5 minutes
schedule.every(2).hours.do(job)             # every 2 hours
schedule.every().minute.do(job)             # every 1 minute (shorthand)
schedule.every().hour.do(job)               # every 1 hour
schedule.every().day.do(job)                # every 1 day (at midnight)
schedule.every().week.do(job)               # every 7 days

# Time-of-day specific
schedule.every().day.at("10:30").do(job)    # daily at 10:30 AM
schedule.every().day.at("22:00").do(job)    # daily at 10 PM
schedule.every().hour.at(":30").do(job)     # at the 30th minute of every hour

# Day-of-week specific
schedule.every().monday.do(job)             # every Monday at midnight
schedule.every().wednesday.at("13:15").do(job)  # every Wednesday 1:15 PM
schedule.every().friday.at("18:00").do(job) # every Friday 6 PM

# All day-of-week options:
# .monday .tuesday .wednesday .thursday .friday .saturday .sunday

# Pass arguments to the job function
schedule.every().hour.do(send_email, to="admin@example.com", subject="Report")

# Store the job reference to cancel later
job = schedule.every(5).minutes.do(health_check)
schedule.cancel_job(job)
Time format:Times are in 24-hour format (HH:MM or HH:MM:SS). The library uses the system's local timezone by default. Use a timezone-aware clock (see Section 9) for UTC scheduling.

3. Cron Expression Mapping

The schedulelibrary does not accept cron syntax. Here's how common cron patterns translate:

Cronschedule equivalentDescription
* * * * *every().minute.do(job)Every minute
*/5 * * * *every(5).minutes.do(job)Every 5 minutes
0 * * * *every().hour.at(':00').do(job)Every hour at :00
30 * * * *every().hour.at(':30').do(job)Every hour at :30
0 9 * * *every().day.at('09:00').do(job)Daily at 9 AM
0 9 * * 1-5every().monday…friday.at('09:00').do(job)*Weekdays at 9 AM
0 0 * * 0every().sunday.at('00:00').do(job)Every Sunday midnight
0 1 1 * *— (not supported directly)1st of month at 1 AM
* Weekdays workaround:schedule doesn't support day ranges. Register 5 separate jobs (one per weekday) or use a wrapper:
import datetime

def weekday_only(job_func):
    """Wrapper: only run the job on weekdays."""
    def wrapper():
        if datetime.datetime.now().weekday() < 5:  # 0=Mon, 4=Fri
            job_func()
    return wrapper

schedule.every().day.at("09:00").do(weekday_only(send_report))

# For monthly jobs (1st of month):
def monthly_job():
    if datetime.datetime.now().day == 1:
        do_monthly_task()

schedule.every().day.at("01:00").do(monthly_job)

4. Running in a Background Thread

The blocking while True loop is fine for dedicated scripts, but when embedding scheduler in a web app or service, run it in a background thread.

import schedule
import threading
import time

def run_scheduler():
    """Background thread target for the scheduler loop."""
    while True:
        schedule.run_pending()
        time.sleep(1)

def start_background_scheduler():
    t = threading.Thread(target=run_scheduler, daemon=True)
    t.start()
    return t

# Register jobs
schedule.every().day.at("09:00").do(send_report)
schedule.every(10).minutes.do(health_check)

# Start background thread
scheduler_thread = start_background_scheduler()

# Your main application continues...
# e.g., Flask app.run() or asyncio event loop
Thread safety:The schedule library's default scheduler is NOT thread-safe. If you modify the schedule from multiple threads (add/remove jobs), use a lock or create a separate scheduler instance per thread. For concurrent job execution, see the docs on using threading.Thread inside the job function.

5. Error Handling & Retries

By default, if a job raises an exception, it bubbles up through run_pending() and can crash your scheduler loop. Always wrap jobs defensively.

import schedule
import time
import logging
import functools

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def catch_exceptions(cancel_on_failure=False):
    """Decorator: log exceptions, optionally cancel job on failure."""
    def decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except Exception as e:
                logger.exception(f"Job {job_func.__name__} failed: {e}")
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return decorator

@catch_exceptions(cancel_on_failure=False)
def flaky_job():
    # This job may fail — exception is logged, loop continues
    result = fetch_data_from_api()
    process(result)

@catch_exceptions(cancel_on_failure=True)
def one_shot_job():
    # Returns CancelJob on failure — job is removed from schedule
    run_migration()

schedule.every(5).minutes.do(flaky_job)
schedule.every().day.at("02:00").do(one_shot_job)

while True:
    schedule.run_pending()
    time.sleep(1)

6. Tags & Selective Execution

Tag jobs to group, query, run, or cancel them as a group.

import schedule

# Tag jobs at creation
schedule.every().hour.do(fetch_prices).tag("market-data", "hourly")
schedule.every().hour.do(fetch_news).tag("market-data", "hourly")
schedule.every().day.at("18:00").do(generate_report).tag("reporting")

# Run only jobs with a specific tag
schedule.run_all(delay_seconds=0, tag="market-data")

# Cancel all jobs with a tag
schedule.clear("reporting")

# Get all jobs with a tag
market_jobs = schedule.get_jobs("market-data")
print(f"Market data jobs: {len(market_jobs)}")

# CancelJob: return from within the job to unschedule it
def one_time_setup():
    do_setup()
    return schedule.CancelJob  # Remove this job after first run

schedule.every(1).seconds.do(one_time_setup)

7. Job Decorators

Wrap jobs with decorators to add timing, logging, or retry logic without cluttering your job functions.

import time
import logging
import functools

logger = logging.getLogger(__name__)

def timed(job_func):
    """Log job duration."""
    @functools.wraps(job_func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = job_func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        logger.info(f"{job_func.__name__} completed in {elapsed:.2f}s")
        return result
    return wrapper

def retry(max_retries=3, delay_seconds=5):
    """Retry a job up to max_retries times."""
    def decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_retries + 1):
                try:
                    return job_func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries:
                        logger.error(f"{job_func.__name__} failed after {max_retries} retries: {e}")
                        raise
                    logger.warning(f"{job_func.__name__} attempt {attempt} failed, retrying in {delay_seconds}s")
                    time.sleep(delay_seconds)
        return wrapper
    return decorator

@timed
@retry(max_retries=3, delay_seconds=10)
def sync_data():
    response = requests.get("https://api.example.com/data", timeout=30)
    response.raise_for_status()
    save_to_db(response.json())

schedule.every(15).minutes.do(sync_data)

8. schedule vs. Celery vs. cron

FeaturescheduleCelery Beatcron
Cron expression syntax✗ (fluent API only)✓ crontab()✓ native
Sub-minute intervals✓ every().seconds✓ timedelta✗ (1 min min)
In-process scheduling✓ (same process)✗ (separate worker)✗ (OS process)
Distributed tasks
Missed run handling✗ (skipped)✓ redis/db tracking✗ (skipped)
Dependencies (pip)0 depsredis/rabbitmq needed0 deps
Best forScripts, bots, simple servicesDistributed task queuesOS-level daemon jobs

Use schedule when…

  • Building scripts, bots, or single-process services
  • You want zero infrastructure (no Redis/RabbitMQ)
  • Tasks are lightweight and run in-process
  • Celery is overkill for your use case

Use Celery when…

  • You need distributed task execution
  • Tasks must survive worker restarts (persistence)
  • You need task result tracking and monitoring
  • You already use Redis or RabbitMQ

Use cron when…

  • Tasks are OS-level or shell commands
  • You want task scheduling outside the app
  • Process isolation per run is desirable
  • The app is stateless/serverless (Lambda)

9. Production Best Practices

01

Run the scheduler loop in a dedicated thread or process

Don't mix the scheduler loop with your web framework's main thread. Use a daemon thread (daemon=True so it exits with the main process), a separate subprocess, or a supervisor-managed process.

02

Use UTC times and convert explicitly

schedule uses the system's local clock. If your server runs UTC (common in cloud), your at('09:00') calls are already UTC. If not, convert: at(f'{(datetime.time(9, 0) - utc_offset):%H:%M}'). Better yet, keep servers on UTC.

03

Keep job functions fast — offload heavy work

run_pending() is synchronous. If a job takes 30 seconds, no other jobs run during that time. For slow jobs, use a thread inside the job function: threading.Thread(target=heavy_work, daemon=True).start()

04

Log every job start and end

Add structured logging at job boundaries: job name, start time, end time, duration, success/failure, and key metrics. This is your observability layer — without it, debugging missed or slow jobs is very difficult.

05

Handle job idempotency for retries

If your retry decorator re-runs a partially-completed job, it must be safe. Check-then-skip pattern: if record_exists(): return before doing work. This prevents duplicate side effects on retry.

06

For high-reliability production: consider Celery or APScheduler

The schedule library has no persistence — if the process restarts, no missed runs are caught. For critical jobs (billing, backups), use APScheduler with a SQLAlchemy job store, or Celery Beat with Redis/database-backed schedule.

Also using Linux cron or platform schedulers?

Use CronExpression.tools to validate cron expressions, explain them in plain English, and see the next 10 run times with timezone support.