查看原文
其他

分布式场景下使用APScheduler

Python猫 2020-09-13

The following article is from 懒编程 Author ayuliao

👆 Python猫” ,一个值得加星标的公众号

剧照 | 《爱尔兰人》

简介

APScheduler是一个定时任务框架,其主要功能就是方便控制不同类型的定时任务,本身并没有考虑分布式多实例情况下的一些问题,本篇文章就来简单谈谈APScheduler在简单分布式场景下的使用。

分布式带来的问题

比如有个服务A,服务A中有使用APScheduler添加任务的逻辑,每次添加的任务会在随后固定的某个时间点被APScheduler调用执行。

在单节点情况下,这并没有什么问题,但随着业务加大,你可能要开启多个服务A来做负载时,此时APScheduler就会出现重复执行任务的问题。

为了方便说明,这里使用MongoDB作为APScheduler的jobstore,使用线程池作为它的执行器。(如果你不明白我在说啥,建议看看此前APScheduler的文章)

  1. scheduler = BlockingScheduler(

  2. jobstores={"default": mongostore},

  3. executors={"default": ThreadPoolExecutor(10)},

  4. job_defaults={"coalesce": True, "max_instances": 3},

  5. timezone='Asia/Shanghai',

  6. )

如果开启了多个服务A,服务A中都使用了相同配置的scheduler,此时就会出现任务重复执行的问题。

为何会有这个问题?一起来阅读一下相关源码,一探究竟。

因为使用了BlockingScheduler作为调度器,所以直接看到该类的代码

  1. # apscheduler/schedulers/blocking.py


  2. class BlockingScheduler(BaseScheduler):

  3. """

  4. A scheduler that runs in the foreground

  5. (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).

  6. """

  7. _event = None


  8. # ... 省略部分代码


  9. def _main_loop(self):

  10. wait_seconds = TIMEOUT_MAX

  11. while self.state != STATE_STOPPED:

  12. # 等待事件通知,wait_seconds为等待事件通知的超时时间

  13. # wait()方法会阻塞线程,直到事件标志状态为true。

  14. self._event.wait(wait_seconds)

  15. # clear()方法将事件标志状态设置为false

  16. self._event.clear()

  17. wait_seconds = self._process_jobs()

_main_loop方法会构成主循环,其具体的执行逻辑在 _process_jobs方法中, _process_jobs方法部分代码如下。

  1. # apscheduler/schedulers/base.py/BaseScheduler


  2. def _process_jobs(self):

  3. """

  4. Iterates through jobs in every jobstore, starts jobs that are due and figures out how long

  5. to wait for the next round.


  6. If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least

  7. ``jobstore_retry_interval`` seconds.


  8. """

  9. if self.state == STATE_PAUSED:

  10. self._logger.debug('Scheduler is paused -- not processing jobs')

  11. return None


  12. self._logger.debug('Looking for jobs to run')

  13. now = datetime.now(self.timezone) # 当前时间

  14. next_wakeup_time = None

  15. events = []


  16. with self._jobstores_lock:

  17. # 从_jobstores中获取当前要处理的任务

  18. for jobstore_alias, jobstore in self._jobstores.items():

  19. try:

  20. # 以当前时间为基准,判断是否到了执行时间

  21. due_jobs = jobstore.get_due_jobs(now)

  22. except Exception as e:

  23. # Schedule a wakeup at least in jobstore_retry_interval seconds

  24. # 在 jobstore 重试间隔时间(秒)内唤醒

  25. self._logger.warning('Error getting due jobs from job store %r: %s',

  26. jobstore_alias, e)

  27. # 唤醒时间

  28. retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)

  29. if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:

  30. next_wakeup_time = retry_wakeup_time


  31. continue

  32. # ... 省略部分代码

_process_jobs方法通过 due_jobs=jobstore.get_due_jobs(now)获取jobstore中的任务对象,通过前面的配置可知,mongodb是这里的jobstore。

看到mongodb对应jobstore的代码。

  1. # apscheduler/jobstores/mongodb.py/MongoDBJobStore


  2. def get_due_jobs(self, now):

  3. timestamp = datetime_to_utc_timestamp(now)

  4. return self._get_jobs({'next_run_time': {'$lte': timestamp}})

getduejobs方法主要调用 _get_jobs方法去获取任务对象,要关注的重点是,它使用了lte以及时间戳作为参数,简单用过mongodb的朋友都知道lte其实就是小于等于的意思,简单而言,只要小于或等于timestamp这个时间戳的任务都会被获取。

都看到这了,顺便看一下 _get_jobs方法的代码吧。

  1. def _reconstitute_job(self, job_state):

  2. # 反序列化,获取任务对象参数

  3. job_state = pickle.loads(job_state)

  4. job = Job.__new__(Job)

  5. job.__setstate__(job_state)

  6. job._scheduler = self._scheduler

  7. job._jobstore_alias = self._alias

  8. return job


  9. def _get_jobs(self, conditions):

  10. jobs = []

  11. failed_job_ids = []

  12. for document in self.collection.find(conditions, ['_id', 'job_state'],

  13. sort=[('next_run_time', ASCENDING)]):

  14. try:

  15. jobs.append(self._reconstitute_job(document['job_state']))

  16. except BaseException:

  17. self._logger.exception('Unable to restore job "%s" -- removing it',

  18. document['_id'])

  19. failed_job_ids.append(document['_id'])


  20. # Remove all the jobs we failed to restore

  21. if failed_job_ids:

  22. self.collection.remove({'_id': {'$in': failed_job_ids}})


  23. return jobs # 返回所有小于等于某一时间戳的任务对象

到这里就很清楚APScheduler会出现重复执行任务问题的原因。

启动多个服务A,相当于运行同一份代码多次,此时APSCheduler的配置都是相同的,即多个APScheduler实例连接同一个mongodb,此时mongodb中存在一个任务就有可能被APScheduler消费多次。

使用分布式锁

要解决APScheduler多实例重复执行任务的问题,最常见的解决方案就是使用分布式锁,而分布式锁中最常见的就是基于Redis构建的字段锁。

Redis字段锁很容易理解,就是通过set命令在redis中设置一个字段,如果字段存在,则是加锁状态,而字段不存在,则是解锁状态。

设计Redis锁时,需要考虑操作原子性,避免同时去获取Redis字段的情况出现,还需要考虑字段超时,避免因逻辑错误出现的长时间死锁,所以设计Redis字段锁还是需要一些tick的,这里分享一种写法,如下。

  1. @contextmanager

  2. def redis_lock(name, timeout=(24 + 2) * 60 * 60):

  3. try:

  4. today_string = datetime.datetime.now().strftime("%Y-%m-%d")

  5. key = f"servername.lock.{name}.{today_string}"

  6. log.info(f"<Redis Lock> {key}")

  7. # 原子性的锁: 不存在,创建锁,返回1,相当于获取锁;存在,创建锁失败,返回0,相当于获取锁失败;过一段时间超时,避免死锁

  8. # nx: 不存在,key值设置为value,返回1,存在,不操作,返回0

  9. # ex: 设置超时

  10. lock = bonus_redis.set(key, value=1, nx=True, ex=timeout)

  11. yield lock

  12. finally:

  13. bonus_redis.delete(key) # 释放锁

通过上面方法设置的锁与常用的锁不同。

如果程序没有获得常用的锁,则会阻塞等待锁,而这里涉及的锁并不会等待,它的作用只是保证被锁方法在特定时间段内只执行一次。

此外还要考虑的是加锁位置,因为APScheduler会获取小于某个时间戳下的所有任务,那为了避免任务被重复执行,最直观的做法就是在任务函数中加上锁,例子如下。

  1. # 要被执行的任务函数

  2. def tick():

  3. with redislock() as lock:

  4. if lock:

  5. print('Tick! The time is: %s' % datetime.now())


  6. if __name__ == '__main__':

  7. scheduler = BackgroundScheduler()

  8. # 添加到相应的jobstore中

  9. scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行

  10. scheduler.start()

  11. print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

结尾

至此分布式场景下使用APScheduler的方法就介绍完了,核心思想就是确保多个APScheduler实例对同一任务只会执行一次,感谢你的阅读。

如果文章对你有所帮助,点击「在看」支持二两,叩谢豪恩。

优质文章,推荐阅读:

Python 定时任务框架:APScheduler 源码剖析 (一)

Python 定时任务框架:APScheduler 源码剖析 (二)

Python定时任务框架:APScheduler源码剖析(三)

感谢创作者的好文

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存