apscheduler cron定时任务触发接口

geteshi
2022-07-26 / 0 评论 / 37 阅读 / 正在检测是否收录...
from apscheduler.schedulers.blocking import BlockingScheduler


class Timing:
    def __init__(self, start_date, end_date, hour=None):
        self.start_date = start_date
        self.end_date = end_date
        self.hour = hour

    def cron(self, job, *value_list):
        """cron格式 在特定时间周期性地触发"""
        # year (int 或 str) – 年,4位数字
        # month (int 或 str) – 月 (范围1-12)
        # day (int 或 str) – 日 (范围1-31)
        # week (int 或 str) – 周 (范围1-53)
        # day_of_week (int 或 str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
        # hour (int 或 str) – 时 (范围0-23)
        # minute (int 或 str) – 分 (范围0-59)
        # second (int 或 str) – 秒 (范围0-59)
        # start_date (datetime 或 str) – 最早开始日期(包含)
        # end_date (datetime 或 str) – 分 最晚结束时间(包含)
        # timezone (datetime.tzinfo 或str) – 指定时区
        scheduler = BlockingScheduler()
        scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                          args=[*value_list])
        scheduler.start()

    def interval(self, job, *value_list):
        """interval格式 周期触发任务"""
        # weeks (int) - 间隔几周
        # days (int)  - 间隔几天
        # hours (int) - 间隔几小时
        # minutes (int) - 间隔几分钟
        # seconds (int) - 间隔多少秒
        # start_date (datetime 或 str) - 开始日期
        # end_date (datetime 或 str) - 结束日期
        # timezone (datetime.tzinfo 或str) - 时区
        scheduler = BlockingScheduler()
        # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
        scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date=self.start_date,
                          end_date=self.end_date, args=[*value_list])
        scheduler.start()

    @staticmethod
    def date(job, *value_list):
        """date格式 特定时间点触发"""
        # run_date (datetime 或 str) - 作业的运行日期或时间
        # timezone (datetime.tzinfo 或 str)  - 指定时区
        scheduler = BlockingScheduler()
        # 在 2019-8-30 01:00:01 运行一次 job 方法
        scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=[*value_list])
        scheduler.start()
0

评论 (0)

取消