Python scheduleライブラリ徹底解説!🐍 ジョブスケジューリングを簡単実装

Python

人間にとって読みやすい、シンプルな定期実行を実現しよう

はじめに:scheduleライブラリとは? 🤔

Pythonで「毎週月曜日の朝9時に特定の処理を実行したい」「10分ごとにAPIからデータを取得したい」といった定期的なタスク(ジョブ)を実行したい場面はよくあります。そんな時に役立つのが schedule ライブラリです。

schedule は、人間にとって非常に直感的で読みやすい構文でジョブスケジューリングを定義できることを目指して設計されています。複雑な設定ファイルやデーモンの知識がなくても、Pythonスクリプト内で簡単に定期実行処理を組み込めるのが大きな魅力です 😊。

このライブラリは、特にシンプルなスケジューリング要件を持つ小〜中規模のアプリケーションや、バッチ処理、定期的なデータ収集・通知などのタスクに適しています。Linuxの cron やWindowsのタスクスケジューラのようなOSレベルのスケジューラとは異なり、Pythonプロセス内で動作するため、スクリプトが実行されている間だけジョブがスケジュールされます。

この記事では、schedule ライブラリのインストール方法から基本的な使い方、少し応用的なテクニック、そして利用する上での注意点や代替手段まで、幅広く詳しく解説していきます。さあ、schedule を使って、あなたのPythonスクリプトに自動化の魔法をかけましょう!✨

インストール方法 🛠️

schedule ライブラリのインストールは非常に簡単です。Pythonのパッケージ管理ツールである pip を使用して、以下のコマンドを実行するだけです。

pip install schedule

これで、あなたのPython環境で schedule ライブラリが利用可能になります。依存関係も少なく、ほとんどの環境ですんなりとインストールできるはずです。

もし特定のバージョンを指定したい場合は、以下のようにバージョン番号を追加します(例:バージョン1.1.0をインストール)。

pip install schedule==1.1.0

通常は最新版をインストールすれば問題ありません。インストールが完了したら、早速基本的な使い方を見ていきましょう。

基本的な使い方 🕒

schedule の基本的な使い方は、以下のステップで行います。

  1. ライブラリをインポートする (scheduletime)。
  2. 定期実行したい処理を関数として定義する。
  3. schedule.every() メソッドチェーンを使って、いつ、どの関数を実行するかを定義する。
  4. 無限ループ内で schedule.run_pending() を呼び出し、スケジュールされたジョブを確認・実行する。
  5. ループ内で time.sleep() を使って、CPUを過剰に消費しないように待機する。

1. インポート

まず、必要なライブラリをインポートします。

import schedule
import time
import datetime

print(f"スクリプト開始: {datetime.datetime.now()}")

2. ジョブ関数の定義

定期的に実行したい処理を関数として定義します。引数を取らないシンプルな関数を例にします。

def job():
    """定期実行したい処理を記述する関数"""
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"ジョブ実行中! 現在時刻: {current_time}")

3. スケジューリング

schedule.every() を起点として、メソッドチェーンで実行タイミングを指定します。非常に直感的です。

# --- 様々なスケジューリング例 ---

# 10秒ごとに job 関数を実行
schedule.every(10).seconds.do(job)

# 10分ごとに job 関数を実行
# schedule.every(10).minutes.do(job)

# 毎時間ごとに job 関数を実行 (例: 1:00, 2:00, ...)
# schedule.every().hour.do(job)

# 毎日 10:30 に job 関数を実行
# schedule.every().day.at("10:30").do(job)

# 毎週月曜日に job 関数を実行
# schedule.every().monday.do(job)

# 毎週水曜日の 13:15 に job 関数を実行
# schedule.every().wednesday.at("13:15").do(job)

# 毎分17秒のタイミングで job 関数を実行 (例: 10:00:17, 10:01:17, ...)
# schedule.every().minute.at(":17").do(job)

# 毎時30分に job 関数を実行 (例: 09:30:00, 10:30:00, ...)
# schedule.every().hour.at(":30").do(job)

# 5〜10分(ランダム)ごとに job 関数を実行
# schedule.every(5).to(10).minutes.do(job)

print("スケジュールの設定が完了しました。")
print(f"現在スケジュールされているジョブ: {schedule.get_jobs()}")

このように、every() の後に単位 (.seconds, .minutes, .hours, .days, .weeks) や曜日 (.monday, .tuesday, etc.) を指定し、さらに .at("HH:MM").at(":SS") で特定の時刻や秒を指定できます。.do(関数名) で実行する関数を結びつけます。

4 & 5. メインループと実行

最後に、スケジュールされたジョブを実行するためのループを作成します。

print("スケジューリングループを開始します...")
try:
    while True:
        # 現在時刻で実行されるべきジョブがあるか確認し、あれば実行
        schedule.run_pending()
        # 1秒待機して、CPUの負荷を下げる
        time.sleep(1)
except KeyboardInterrupt:
    print("\nプログラムがユーザーによって中断されました。")
    # スケジュールをクリア
    schedule.clear()
    print("スケジュールをクリアしました。")
finally:
    print(f"スクリプト終了: {datetime.datetime.now()}")

この while True: ループがスクリプトの心臓部です。schedule.run_pending() は、現在時刻を見て、実行すべきスケジュールされたジョブがあれば実行します。なければ何もせず、すぐに処理が戻ります。time.sleep(1) は、1秒ごとに run_pending() をチェックするようにしています。この値を小さくすればより高頻度にチェックしますが、CPU使用率は上がります。逆に大きくすると、ジョブの実行タイミングのずれが大きくなる可能性がありますが、CPU負荷は下がります。通常は1秒程度で十分な場合が多いでしょう。

上記のコードを結合して実行すると、10秒ごとにコンソールに「ジョブ実行中! 現在時刻: …」と表示されるはずです。Ctrl+C などでスクリプトを中断すると、KeyboardInterrupt 例外が発生し、ループが終了します。

応用的な使い方と機能 ✨

基本的な使い方をマスターしたら、さらに便利な機能を見ていきましょう。

ジョブ関数に引数を渡す

定期実行したい関数に引数を渡したい場合があります。これは .do() メソッドに追加の引数を指定することで実現できます。

import schedule
import time
import datetime

def greet(name, message):
    """引数を受け取るジョブ関数"""
    current_time = datetime.datetime.now().strftime("%H:%M:%S")
    print(f"[{current_time}] {name}さん、{message}")

# 5秒ごとに関数 greet を '山田', 'こんにちは!' という引数で実行
schedule.every(5).seconds.do(greet, name="山田", message="こんにちは!")
# 位置引数でも渡せる
schedule.every(7).seconds.do(greet, "田中", "お疲れ様です!")

print("引数付きジョブをスケジュールしました。")

try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("\n終了します。")
    schedule.clear()

.do() の第一引数に実行したい関数を、それ以降にその関数に渡したい引数を指定します。キーワード引数 (name="山田") も位置引数 ("田中") も利用可能です。

注意点: functools.partial やラムダ式を使う方法も考えられますが、schedule ライブラリ自体が .do() での引数渡しをサポートしているため、この方法が最もシンプルで推奨されます。

ジョブにタグ付けして管理する

複数のジョブをスケジュールしている場合、特定のグループのジョブだけを後でキャンセルしたり、確認したりしたいことがあります。そのためにジョブに「タグ」を付けることができます。

import schedule
import time
import datetime

def report_job():
    print(f"レポートジョブ実行: {datetime.datetime.now()}")

def cleanup_job():
    print(f"クリーンアップジョブ実行: {datetime.datetime.now()}")

# ジョブにタグを付けてスケジュール
schedule.every().day.at("09:00").do(report_job).tag('daily-tasks', 'reporting')
schedule.every().hour.do(cleanup_job).tag('hourly-tasks', 'maintenance')
schedule.every(30).minutes.do(report_job).tag('frequent-tasks', 'reporting')

print("タグ付きジョブをスケジュールしました。")
print("--- 全てのジョブ ---")
print(schedule.get_jobs())
print("--- 'reporting' タグを持つジョブ ---")
print(schedule.get_jobs('reporting'))
print("--- 'hourly-tasks' タグを持つジョブ ---")
print(schedule.get_jobs('hourly-tasks'))

# 例: 'reporting' タグを持つジョブだけキャンセルする場合
# schedule.clear('reporting')
# print("--- 'reporting' タグのジョブをキャンセル後 ---")
# print(schedule.get_jobs())

try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("\n終了します。")
    schedule.clear()

.do() の後に .tag('タグ1', 'タグ2', ...) を追加することで、ジョブに任意の数のタグを付けられます。
schedule.get_jobs(tag=None) を使うと、指定したタグを持つジョブのリストを取得できます(引数を省略すると全てのジョブ)。
schedule.clear(tag=None) を使うと、指定したタグを持つジョブだけを一括でキャンセルできます(引数を省略すると全てのジョブをキャンセル)。

ジョブのキャンセル

個別のジョブをキャンセルしたい場合は、スケジュール時に返されるジョブオブジェクトを保持しておき、schedule.cancel_job(job) を使います。

import schedule
import time
import datetime

def my_job():
    print(f"マイジョブ実行: {datetime.datetime.now()}")

# スケジュールし、ジョブオブジェクトを取得
job_to_cancel = schedule.every(5).seconds.do(my_job)
print(f"ジョブをスケジュールしました: {job_to_cancel}")

# 12秒後にジョブをキャンセルする別のジョブをスケジュール
schedule.every(12).seconds.do(lambda: schedule.cancel_job(job_to_cancel)).tag('internal')
# 上記のキャンセル用ジョブは、最初の実行後に自身もキャンセルされるべき
# (そうしないと12秒ごとにキャンセルしようとし続ける)
# または、以下のように schedule.cancel_job を直接実行する方がシンプル
# time.sleep(12)
# schedule.cancel_job(job_to_cancel)
# print("5秒ごとのジョブをキャンセルしました。")

try:
    start_time = time.time()
    while True:
        schedule.run_pending()
        time.sleep(1)
        # 例として20秒経ったらループを抜ける
        if time.time() - start_time > 20:
             # キャンセル用のジョブがまだ残っている場合があるのでクリア
             print("20秒経過したので、キャンセル用のジョブ(もしあれば)もクリアして終了します。")
             schedule.clear('internal') # キャンセル用ジョブにタグがあればタグ指定でクリア
             # もしくは事前にキャンセルジョブオブジェクトを保持しておき、個別にキャンセル
             break
        # 12秒後に手動でキャンセルする場合
        if time.time() - start_time > 12 and job_to_cancel in schedule.get_jobs():
             print("\n12秒経過したので、5秒ごとのジョブを手動でキャンセルします。")
             schedule.cancel_job(job_to_cancel)
             print(f"キャンセル後のジョブリスト: {schedule.get_jobs()}")

except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear() # 念のため最後に全クリア
    print("最終的なジョブリスト:", schedule.get_jobs())

schedule.every(...).do(...) は、スケジュールされたジョブを表すオブジェクトを返します。これを変数に保存しておけば、後で schedule.cancel_job() に渡してそのジョブだけをキャンセルできます。

上記の例では、5秒ごとに実行されるジョブ `my_job` をスケジュールし、そのジョブオブジェクトを `job_to_cancel` に保持します。そして、12秒経過した時点で `schedule.cancel_job(job_to_cancel)` を呼び出して、`my_job` の定期実行を停止しています。

ジョブを即座に1回だけ実行

スケジュールされたジョブを、次のスケジュール時刻を待たずに、すぐに1回だけ実行したい場合があります。これは schedule.run_all(delay_seconds=0) で実現できます。

import schedule
import time
import datetime

def initial_setup():
    print(f"初期設定ジョブを実行: {datetime.datetime.now()}")

# 毎時実行するようにスケジュールするが、まずは即座に1回実行したい
schedule.every().hour.do(initial_setup).tag('setup')

print("ジョブをスケジュールしました。")
print("スケジュールされたジョブ:", schedule.get_jobs())

# スケジュールされた全てのジョブを即座に(遅延なしで)実行
print("--- run_all() を呼び出します ---")
schedule.run_all() # delay_seconds のデフォルトは 0
print("--- run_all() の呼び出し完了 ---")

# その後は通常のスケジュールループへ
print("通常のスケジューリングループを開始します...")
try:
    while True:
        schedule.run_pending()
        time.sleep(1)
        # 例として10秒で終了
        if len(schedule.get_jobs('setup')) > 0 and schedule.get_jobs('setup')[0].next_run > datetime.datetime.now() + datetime.timedelta(seconds=10):
            print("次の実行まで10秒以上あるのでループを終了します。")
            break
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

run_all() は、スケジュールされている全てのジョブを、それぞれの次の実行時刻に関わらず、delay_seconds で指定された秒数だけ待機した後に実行します。デフォルトは0秒なので即時実行です。これは、スクリプト起動時に初期化処理などをまず実行しておきたい場合に便利です。

注意: run_all() はあくまでスケジュールされているジョブを実行するだけなので、実行後にそのジョブがスケジュールから削除されるわけではありません。次の定刻になれば、通常通り再度実行されます。

ジョブの実行期限を設定する (.until())

特定の時刻や日付まで、または一定期間だけジョブを実行したい場合があります。これは .until() メソッドを使って実現できます。

import schedule
import time
import datetime

def temporary_job():
    print(f"期間限定ジョブ実行: {datetime.datetime.now()}")

# 今から30秒後まで、5秒ごとにジョブを実行する
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=30)
schedule.every(5).seconds.until(stop_time).do(temporary_job).tag("limited_time")

# または特定の日時まで
# stop_datetime = datetime.datetime(2025, 4, 2, 7, 0, 0) # 例: 2025年4月2日 07:00:00 UTC
# schedule.every().minute.until(stop_datetime).do(temporary_job).tag("until_specific_time")

# または timedelta を使って期間を指定 (例: 1時間後まで)
# schedule.every(10).seconds.until(datetime.timedelta(hours=1)).do(temporary_job).tag("duration_limit")

print(f"ジョブをスケジュールしました。停止予定時刻: {stop_time}")
print(schedule.get_jobs())

try:
    while True:
        schedule.run_pending()
        # ジョブがなくなったらループを抜ける
        if not schedule.get_jobs("limited_time"):
             print("実行期限が来たか、ジョブがキャンセルされたため、スケジュールが空になりました。ループを終了します。")
             break
        time.sleep(1)
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

.do() の前に .until(end_time) を追加します。end_time には以下のいずれかを指定できます。

  • datetime.datetime オブジェクト: 特定の日時まで実行。
  • datetime.time オブジェクト: その日の指定時刻まで実行(毎日リセットされる)。
  • datetime.timedelta オブジェクト: スケジュールされた時点から指定期間だけ実行。
  • 文字列 ("HH:MM:SS" or "HH:MM"): その日の指定時刻まで実行。
指定された期限時刻を過ぎると、そのジョブは自動的にスケジュールから削除され、以降は実行されなくなります。

次の実行時刻の取得

スケジュールされたジョブが次にいつ実行されるかを知りたい場合は、ジョブオブジェクトの next_run 属性を参照します。

import schedule
import time
import datetime

def my_task():
    print("タスク実行")

# 1分ごとに実行
job = schedule.every().minute.do(my_task)

# スケジュール直後はまだ next_run が計算されていないことがあるので、
# 一度 run_pending を呼ぶか、少し待つ必要がある場合がある
schedule.run_pending() # これで next_run が計算されるはず
time.sleep(0.1) # 念のため

if job.next_run:
    print(f"次の実行予定時刻: {job.next_run}")
else:
    # .until() などで既に期限切れの場合などは None になる
    print("次の実行予定はありません。")

# 全てのジョブの次の実行時刻を表示
print("--- 全てのジョブの次の実行時刻 ---")
for j in schedule.get_jobs():
    print(f"Job: {j}, Next Run: {j.next_run}")

# アイドル秒数 (次のジョブ実行まであと何秒か) を取得
idle_seconds = schedule.idle_seconds()
if idle_seconds is not None and idle_seconds > 0:
    print(f"次のジョブ実行まで、あと約 {idle_seconds:.2f} 秒です。")

# schedule.run_pending() の代わりに schedule.run_all() を使うと、
# 即時実行後に次のスケジュールが設定される
# schedule.run_all()
# print(f"run_all 実行後の次の実行予定時刻: {job.next_run}")

# ループは実行しない例
schedule.clear() # 既存のスケジュールをクリア

job.next_rundatetime.datetime オブジェクトを返します。また、schedule.idle_seconds() は、スケジュールされているジョブの中で最も早く実行されるジョブまでの残り秒数を浮動小数点数で返します。これにより、次のジョブ実行までどれくらい待機すればよいかの目安を知ることができます。(ただし、time.sleep() の引数として直接使う場合は注意が必要です。詳細は後述の注意点を参照)。

実践的な例 💡

これまでに学んだ機能を使って、いくつかの実践的な例を見てみましょう。

例1: 毎日特定の時間にメールレポートを送信

ここでは、メール送信自体の実装は省略し、スケジューリング部分に焦点を当てます。

import schedule
import time
import datetime
# import smtplib # 実際のメール送信に必要
# from email.message import EmailMessage # 実際のメール送信に必要

def send_daily_report():
    """日次レポートを生成してメールで送信する関数(ダミー)"""
    print(f"[{datetime.datetime.now()}] 日次レポートを生成・送信しています...")
    # ここでレポート生成ロジック (例: DBから集計)
    report_content = f"本日の売上サマリーです。\n(これは {datetime.datetime.now()} に生成されました)"
    # ここでメール送信ロジック (smtplibなどを使用)
    # msg = EmailMessage()
    # msg.set_content(report_content)
    # msg['Subject'] = f'{datetime.date.today()} の日次レポート'
    # msg['From'] = 'report@example.com'
    # msg['To'] = 'manager@example.com'
    # smtp_server = 'smtp.example.com'
    # smtp_port = 587
    # username = 'user'
    # password = 'password'
    # with smtplib.SMTP(smtp_server, smtp_port) as server:
    #     server.starttls()
    #     server.login(username, password)
    #     server.send_message(msg)
    print("レポート送信完了。")

# 毎日午前8時30分にレポート送信ジョブを実行
schedule.every().day.at("08:30").do(send_daily_report).tag("daily-report")

print("日次レポート送信ジョブをスケジュールしました。")
print(f"次の実行予定: {schedule.next_run}")

try:
    while True:
        schedule.run_pending()
        time.sleep(60) # 1分ごとにチェック (メール送信なので秒単位の精度は不要)
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

この例では、every().day.at("08:30") を使って、毎日午前8時30分に send_daily_report 関数を実行するようにスケジュールしています。time.sleep() の間隔は、ジョブの実行頻度に合わせて調整します。1日1回のジョブであれば、60秒やそれ以上の間隔でも問題ないでしょう。

例2: 1時間ごとに一時ファイルをクリーンアップ

import schedule
import time
import datetime
import os
import glob

TEMP_DIR = "./temp_files" # 一時ファイルが保存されるディレクトリ (例)

def cleanup_temp_files():
    """TEMP_DIR 内の古い一時ファイルを削除する関数"""
    print(f"[{datetime.datetime.now()}] 一時ファイルのクリーンアップを開始します...")
    now = time.time()
    # 古いファイルの閾値(例:1日以上前)
    threshold_age_seconds = 24 * 60 * 60

    if not os.path.exists(TEMP_DIR):
        print(f"ディレクトリが存在しません: {TEMP_DIR}")
        return

    try:
        for filename in glob.glob(os.path.join(TEMP_DIR, '*')):
            if os.path.isfile(filename):
                file_mtime = os.path.getmtime(filename)
                if now - file_mtime > threshold_age_seconds:
                    print(f"  古いファイルを削除します: {filename} (最終更新: {datetime.datetime.fromtimestamp(file_mtime)})")
                    # os.remove(filename) # 実際に削除する場合はコメントアウトを外す
                else:
                     print(f"  比較的新しいファイルのため保持: {filename}")
    except Exception as e:
        print(f"クリーンアップ中にエラーが発生しました: {e}")
    finally:
        print("クリーンアップ処理を完了しました。")

# 毎時0分にクリーンアップジョブを実行
schedule.every().hour.at(":00").do(cleanup_temp_files).tag("cleanup")

# スクリプト開始時に一度実行しておく場合
# print("初回クリーンアップを実行します...")
# cleanup_temp_files()
# もしくは schedule.run_all() を使う
# schedule.run_all()

print("一時ファイルクリーンアップジョブをスケジュールしました。")
print(f"次の実行予定: {schedule.next_run}")

# テスト用に一時ファイルディレクトリとファイルを作成
if not os.path.exists(TEMP_DIR):
    os.makedirs(TEMP_DIR)
# 古いファイルと新しいファイルの例を作成
with open(os.path.join(TEMP_DIR, "old_file.tmp"), "w") as f: f.write("old")
# 過去のタイムスタンプを設定 (2日前)
two_days_ago = time.time() - 2 * 24 * 60 * 60
os.utime(os.path.join(TEMP_DIR, "old_file.tmp"), (two_days_ago, two_days_ago))
with open(os.path.join(TEMP_DIR, "new_file.tmp"), "w") as f: f.write("new")


try:
    while True:
        schedule.run_pending()
        time.sleep(1) # 1時間ごとのジョブだが、より正確な時刻実行のため sleep は短めに設定
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    # テスト用ファイルを削除
    # import shutil
    # if os.path.exists(TEMP_DIR): shutil.rmtree(TEMP_DIR)
    schedule.clear()

この例では、every().hour.at(":00") を使用して、毎時ちょうど(例: 9:00:00, 10:00:00)に一時ファイル削除の関数を実行します。at(":00") の代わりに every().hour だけだと、スクリプトを開始した時刻基準の毎時(例: 9:15に開始したら、10:15, 11:15…)になります。目的に応じて使い分けましょう。

例3: 5分ごとにAPIからデータを取得

import schedule
import time
import datetime
import requests # データ取得に requests ライブラリを使用 (pip install requests)
import json

API_ENDPOINT = "https://api.coindesk.com/v1/bpi/currentprice.json" # 例: Bitcoin価格API

def fetch_bitcoin_price():
    """Bitcoinの現在価格を取得して表示する関数"""
    print(f"[{datetime.datetime.now()}] Bitcoin価格を取得します...")
    try:
        response = requests.get(API_ENDPOINT)
        response.raise_for_status() # エラーがあれば例外を発生させる (4xx, 5xx)
        data = response.json()
        usd_price = data['bpi']['USD']['rate']
        timestamp = data['time']['updatedISO']
        print(f"取得成功! 現在のBitcoin価格 (USD): {usd_price} (取得時刻: {timestamp})")
        # ここで取得したデータをDBに保存したり、ファイルに書き出したりする処理を追加できる
        # save_to_database(timestamp, usd_price)
    except requests.exceptions.RequestException as e:
        print(f"APIからのデータ取得中にエラーが発生しました: {e}")
    except KeyError as e:
        print(f"受信データの形式が予期したものと異なります: {e}")
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")

# 5分ごとに価格取得ジョブを実行
schedule.every(5).minutes.do(fetch_bitcoin_price).tag("api-fetch")

print("Bitcoin価格取得ジョブを5分ごとに実行するようにスケジュールしました。")
print(f"次の実行予定: {schedule.next_run}")

# スクリプト開始時にも一度実行する
# fetch_bitcoin_price()
# または
# schedule.run_all()

try:
    while True:
        schedule.run_pending()
        time.sleep(1) # 1秒ごとにチェック
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

every(5).minutes を使って、5分間隔でAPIからデータを取得する関数を実行します。APIリクエストには時間がかかる可能性や、ネットワークエラーが発生する可能性もあるため、try...except ブロックで適切にエラーハンドリングを行うことが重要です。

制限事項と注意点 ⚠️

schedule ライブラリはシンプルで使いやすい反面、いくつかの重要な制限事項と注意点があります。これらを理解しておくことは、適切な場面でライブラリを選択し、問題を防ぐために不可欠です。

1. インプロセススケジューラであること

schedule は、実行中のPythonスクリプトのプロセス内で動作します。つまり、スクリプトが終了すれば、スケジューリングも停止します。OSの cron やタスクスケジューラのように、システムが起動している限りバックグラウンドで動作し続けるわけではありません。

したがって、schedule を使ったスクリプトは、デーモン化する、systemdsupervisor などのプロセス管理ツールを使う、あるいは単純にターミナルで実行し続けるなど、何らかの方法で常駐させる必要があります。もしスクリプトが予期せず停止した場合、スケジュールされていたジョブは実行されません。再起動しても、停止中に実行されるはずだったジョブが遡って実行されることもありません(実行されなかったジョブはスキップされます)。

また、スケジュール情報はメモリ上にのみ保持されるため、スクリプトを再起動すると、再度スケジュールを定義し直す必要があります。永続化が必要な場合は、APScheduler などのより高機能なライブラリを検討するか、自分で状態を保存・復元する仕組みを作る必要があります。

2. 実行タイミングの精度

schedule のジョブ実行タイミングは、メインループ内の time.sleep() の間隔に依存します。例えば time.sleep(1) としている場合、ジョブの実行時刻チェックは1秒ごとに行われます。そのため、秒単位での厳密な実行タイミングが保証されるわけではありません。

もし time.sleep(60) としている場合、毎時0分0秒に設定したジョブは、実際にはチェックが行われたタイミング(例えば毎時0分30秒頃)に実行される可能性があります。

また、schedule.idle_seconds() を使って次のジョブまでの待機時間を取得し、time.sleep(schedule.idle_seconds()) のようにすることも考えられます。これによりCPU負荷を最小限にできますが、もし他のジョブが動的に追加されたり、システム時刻が変更されたりした場合に、適切に wakeup できない可能性があります。公式ドキュメントでも、固定の短い sleep (例: 1秒) を推奨しています。

ミリ秒単位のような高精度なスケジューリングが求められるタスクには schedule は不向きです。

3. ジョブの実行時間とブロッキング

デフォルトでは、schedule.run_pending() はスケジュールされたジョブを順番に、同期的に実行します。もしあるジョブの実行に非常に時間がかかる場合(例えば、5分ごとに実行するジョブが、処理に6分かかる場合)、そのジョブの実行が完了するまで、run_pending() はブロックされ、メインループの time.sleep() も実行されません。

これにより、後続のジョブの実行が遅延したり、次の実行時刻を過ぎてしまったりする可能性があります。例えば、1分ごとに実行されるジョブAとジョブBがあり、ジョブAが70秒かかった場合、ジョブBの実行はジョブAの完了後になり、予定時刻より遅れます。さらに、ジョブAの次の実行も遅れる可能性があります。

この問題を避けるためには、時間のかかる可能性のあるジョブは、別のスレッドやプロセスで非同期に実行することを検討する必要があります。threading モジュールや multiprocessing モジュールを使う、あるいは Celery などの非同期タスクキューと連携する方法が考えられます。schedule ライブラリ自体には、ジョブを非同期に実行するための組み込み機能は提供されていません。

import schedule
import time
import threading
import datetime

def long_running_job():
    print(f"[{datetime.datetime.now()}] 時間のかかるジョブを開始します...")
    time.sleep(10) # 例として10秒かかる処理
    print(f"[{datetime.datetime.now()}] 時間のかかるジョブが完了しました。")

def quick_job():
    print(f"[{datetime.datetime.now()}] すぐ終わるジョブを実行します。")

def run_threaded(job_func):
    """ジョブを別スレッドで実行するためのラッパー関数"""
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

# 5秒ごとに時間のかかるジョブを「別スレッドで」実行
schedule.every(5).seconds.do(run_threaded, long_running_job)
# 3秒ごとにすぐ終わるジョブを実行 (これはメインスレッドで実行される)
schedule.every(3).seconds.do(quick_job)

print("スレッドを使ったジョブ実行をスケジュールしました。")

try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

上記の例では、long_running_job を直接 .do() に渡す代わりに、それを別スレッドで実行するラッパー関数 run_threaded を介して実行しています。これにより、long_running_job が実行中でもメインループや他のジョブ (quick_job) がブロックされにくくなります。(ただし、スレッド管理の複雑さが加わる点には注意が必要です。)

4. エラーハンドリング

スケジュールされたジョブ関数内で例外が発生した場合、その例外は schedule.run_pending() まで伝播し、キャッチしない限りメインループ(while True:)が停止してしまう可能性があります。

そのため、各ジョブ関数内で適切に try...except ブロックを使ってエラーハンドリングを行うことが非常に重要です。これにより、一部のジョブでエラーが発生しても、他のジョブの実行やスケジューリングループ自体が停止するのを防ぐことができます。

import schedule
import time
import datetime

def job_that_might_fail():
    """エラーが発生する可能性のあるジョブ"""
    print(f"[{datetime.datetime.now()}] ジョブ実行試行...")
    try:
        # ここでエラーが発生する可能性のある処理
        if datetime.datetime.now().second % 10 == 0: # 例: 10秒に1回エラーを起こす
             raise ValueError("意図的なエラー発生!")
        print("ジョブ成功!")
    except Exception as e:
        print(f"ジョブ実行中にエラーが発生しました: {e}")
        # ここでエラーログを記録したり、通知したりする処理を追加できる
        # log_error(e)
        # notify_admin(e)

schedule.every(3).seconds.do(job_that_might_fail)

print("エラーハンドリングを含むジョブをスケジュールしました。")

try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("\n終了します。")
finally:
    schedule.clear()

この例のように、ジョブ関数内でエラーを捕捉し、ログ出力などの対応を行うことで、schedule のメインループは影響を受けずに動き続けることができます。

schedule はシンプルさが魅力ですが、より複雑な要件や堅牢性が求められる場合には、他のツールやライブラリを検討する価値があります。

OS標準のスケジューラ (cron / タスクスケジューラ)

メリット: OSレベルで動作するため信頼性が高く、Pythonスクリプトが実行されていなくてもジョブが起動される。システム管理の標準的な方法。
デメリット: 設定方法がOSに依存し、schedule ほど直感的ではない場合がある。Python環境外での設定が必要。分単位より細かいスケジューリングは標準では難しいことが多い。
適しているケース: サーバー上で定期的にPythonスクリプトを実行したい場合。OS管理に慣れている場合。

APScheduler

メリット: schedule よりも高機能。多様なトリガー(cron形式、インターバル、日付指定など)、ジョブストア(メモリ、DB、Redisなどでの永続化)、実行バックエンド(スレッドプール、プロセスプール)をサポート。
デメリット: schedule と比較すると学習コストがやや高い。設定項目が多い。
適しているケース: スケジュールの永続化が必要な場合、cron形式での指定が必要な場合、ジョブの並列実行が必要な場合、より複雑なスケジューリングロジックが必要な場合。
APScheduler Documentation

Celery Beat

メリット: 分散タスクキュー Celery の一部であり、スケーラブルな定期タスク実行が可能。タスクのモニタリングや管理機能が豊富。Broker(Redis, RabbitMQなど)との連携が前提。
デメリット: Celery と Broker のセットアップが必要で、構成が複雑になる。scheduleAPScheduler よりも導入のハードルが高い。
適しているケース: 大規模なアプリケーションで、多数の定期タスクや非同期タスクを分散環境で実行・管理したい場合。既にCeleryを使用している場合。
Celery Periodic Tasks

どのツールを選択するかは、プロジェクトの要件(シンプルさ、機能性、堅牢性、スケーラビリティ、永続化の必要性など)に応じて決定することが重要です。シンプルなユースケースであれば schedule が手軽で良い選択肢となります 👍。

まとめ 🎉

Pythonの schedule ライブラリは、「人間にとって読みやすい」ことを重視して設計された、シンプルで直感的なジョブスケジューリングライブラリです。

  • pip install schedule で簡単にインストール可能。
  • every().seconds, minutes, hour, day.at("HH:MM"), monday のような流れるようなインターフェースでスケジュールを定義できる。
  • ジョブ関数への引数渡し、ジョブのタグ付け、キャンセル、実行期限設定など、基本的な機能は揃っている。
  • メインループ内で schedule.run_pending()time.sleep() を呼び出すことで動作する。

ただし、以下の点には注意が必要です。

  • Pythonプロセス内で動作するため、スクリプトが停止するとスケジューリングも停止する。常駐化が必要。
  • スケジュールは永続化されない。
  • 実行タイミングの精度は time.sleep() の間隔に依存する。
  • 時間のかかるジョブは後続のジョブをブロックする可能性があるため、非同期実行(スレッドなど)の検討が必要。
  • ジョブ関数内での適切なエラーハンドリングが重要。

schedule は、そのシンプルさから、小規模なスクリプトや基本的なバッチ処理、簡単な自動化タスクには非常に有効なツールです。しかし、より高度な機能や堅牢性、永続性が求められる場合は、APSchedulerCelery Beat、あるいはOS標準のスケジューラといった代替手段を検討しましょう。

ぜひ schedule ライブラリを活用して、日々の繰り返し作業を自動化し、Pythonプログラミングをより効率的で楽しいものにしてください!🚀

コメント

タイトルとURLをコピーしました