0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Celery Beat 的周期调度机制及实现原理

科技绿洲 来源:Python实用宝典 作者:Python实用宝典 2023-10-31 15:24 次阅读

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。

为了讲解 Celery Beat 的周期调度机制及实现原理,我们会基于Django从制作一个简单的周期任务开始,然后一步一步拆解 Celery Beat 的源代码。

相关前置应用知识,可以阅读以下文章:

  1. 实战教程!Django Celery 异步与定时任务
  2. Python Celery异步快速下载股票数据

1.Celery 简单周期任务示例

在 celery_app.tasks.py 中添加如下任务:

@shared_task
def pythondict_task():
    print("pythondict_task")

在 django.celery.py 文件中添加如下配置:

from celery_django import settings
from datetime import timedelta


app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

CELERYBEAT_SCHEDULE = {
    'pythondict_task': {
        'task': 'celery_app.tasks.pythondict_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此时,先启动 Celery Beat 定时任务命令:

celery beat -A celery_django -S django

然后打开第二个终端进程启动消费者:

celery -A celery_django worker

此时在worker的终端上就会输出类似如下的信息

[2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
[2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
[2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
[2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到结果正常输出,说明任务成功定时执行。

2.源码剖析

为了明白 Celery Beat 是如何实现周期任务调度的,我们需要从 Celery 源码入手。

当你执行 Celery Beat 启动命令的时候,到底发生了什么?

celery beat -A celery_django -S django

当你执行这个命令的时候,Celery/bin/celery.py 中的 CeleryCommand 类接收到命令后,会选择 beat 对应的类执行如下代码:

# Python 实用宝典
# https://pythondict.com

from celery.bin.beat import beat

class CeleryCommand(Command):
    commands = {
        # ...
        'beat': beat,
        # ...
    }
    # ...
    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help']
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

此时cls对应的是beat类,通过查看位于bin/beat.py中的 beat 类可知,该类只重写了run方法和add_arguments方法。

所以此时执行的 run_from_argv 方法是 beat 继承的 Command 的 run_from_argv 方法:

# Python 实用宝典
# https://pythondict.com

def run_from_argv(self, prog_name, argv=None, command=None):
    return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

该方法中会调用 Command 的 handle_argv 方法,而该方法在经过相关参数处理后会调用 self(*args, **options) 到 call 函数:

# Python 实用宝典
    # https://pythondict.com
    
    def handle_argv(self, prog_name, argv, command=None):
        """Parse command-line arguments from ``argv`` and dispatch
        to :meth:`run`.

        :param prog_name: The program name (``argv[0]``).
        :param argv: Command arguments.

        Exits with an error message if :attr:`supports_args` is disabled
        and ``argv`` contains positional arguments.

        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

Command 类的 __call__函数:

# Python 实用宝典
    # https://pythondict.com
    
    def __call__(self, *args, **kwargs):
        random.seed() # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

可见,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法:

# Python 实用宝典
# https://pythondict.com
    
class beat(Command):
    """Start the beat periodic task scheduler.

    Examples::

        celery beat -l info
        celery beat -s /var/run/celery/beat-schedule --detach
        celery beat -S djcelery.schedulers.DatabaseScheduler

    """
    doc = __doc__
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, **kwargs):
        # 是否开启后台运行
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        workdir = working_directory
        kwargs.pop('app', None)
        # 设定偏函数
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run() # 后台运行
        else:
            return beat().run() # 立即运行

这里引用了偏函数的知识,偏函数就是从基函数创建一个新的带默认参数的函数,详细可见廖雪峰老师的介绍:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可见,此时创建了app的Beat方法的偏函数,并通过 .run 函数执行启动 beat 进程,首先看看这个 beat 方法:

# Python 实用宝典
    # https://pythondict.com
    @cached_property
    def Beat(self, **kwargs):
        # 导入celery.apps.beat:Beat类
        return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此时就实例化了 celery.apps.beat 中的 Beat 类,并调用了该实例的 run 方法:

# Python 实用宝典
    # https://pythondict.com
    def run(self):
        print(str(self.colored.cyan(
            'celery beat v{0} is starting.'.format(VERSION_BANNER))))
        # 初始化loader
        self.init_loader()
        # 设置进程
        self.set_process_title()
        # 开启任务调度
        self.start_scheduler()

init_loader 中,会导入默认的modules,此时会引入相关的定时任务,这些不是本文重点。我们重点看 start_scheduler 是如何开启任务调度的:

# Python 实用宝典
    # https://pythondict.com
    def start_scheduler(self):
        c = self.colored
        if self.pidfile: # 是否设定了pid文件
            platforms.create_pidlock(self.pidfile) # 创建pid文件
        # 初始化service
        beat = self.Service(app=self.app,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)
        
        # 打印启动信息
        print(str(c.blue('__ ', c.magenta('-'),
                  c.blue(' ... __ '), c.magenta('-'),
                  c.blue(' _n'),
                  c.reset(self.startup_info(beat)))))
        # 开启日志
        self.setup_logging()
        if self.socket_timeout:
            logger.debug('Setting default socket timeout to %r',
                         self.socket_timeout)
            # 设置超时
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            # 注册handler
            self.install_sync_handler(beat)
            # 开启beat
            beat.start()
        except Exception as exc:
            logger.critical('beat raised exception %s: %r',
                            exc.__class__, exc,
                            exc_info=True)

我们看下beat是如何开启的:

# Python 实用宝典
    # https://pythondict.com
    def start(self, embedded_process=False, drift=-0.010):
        info('beat: Starting...')
        # 打印最大间隔时间
        debug('beat: Ticking with max interval- >%s',
              humanize_seconds(self.scheduler.max_interval))
        
        # 通知注册该signal的函数
        signals.beat_init.send(sender=self)
        if embedded_process:
            signals.beat_embedded_init.send(sender=self)
            platforms.set_process_title('celery beat')

        try:
            while not self._is_shutdown.is_set():
                # 调用scheduler.tick()函数检查还剩多余时间
                interval = self.scheduler.tick()
                interval = interval + drift if interval else interval
                # 如果大于0
                if interval and interval > 0:
                    debug('beat: Waking up %s.',
                          humanize_seconds(interval, prefix='in '))
                    # 休眠
                    time.sleep(interval)
                    if self.scheduler.should_sync():
                        self.scheduler._do_sync()
        except (KeyboardInterrupt, SystemExit):
            self._is_shutdown.set()
        finally:
            self.sync()

这里重点看 self.scheduler.tick() 方法:

# Python 实用宝典
    # https://pythondict.com
    def tick(self):
        """Run a tick, that is one iteration of the scheduler.

        Executes all due tasks.

        """
        remaining_times = []
        try:
            # 遍历每个周期任务设定
            for entry in values(self.schedule):
                # 下次运行时间
                next_time_to_run = self.maybe_due(entry, self.publisher)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            pass

        return min(remaining_times + [self.max_interval])

这里通过 self.schedule 拿到了所有存放在用 shelve 写入的 celerybeat-schedule 文件的定时任务,遍历所有定时任务,调用 self.maybe_due 方法:

# Python 实用宝典
    # https://pythondict.com
    def maybe_due(self, entry, publisher=None):
        # 是否到达运行时间
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            # 打印任务发送日志
            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                # 执行任务
                result = self.apply_async(entry, publisher=publisher)
            except Exception as exc:
                error('Message Error: %sn%s',
                      exc, traceback.format_stack(), exc_info=True)
            else:
                debug('%s sent. id- >%s', entry.task, result.id)
        return next_time_to_run

可以看到,此处会判断任务是否到达定时时间,如果是的话,会调用 apply_async 调用Worker执行任务。如果不是,则返回下次运行时间,让 Beat 进程进行 Sleep,减少进程资源消耗。

到此,我们就讲解完了 Celery Beat 在周期定时任务的检测调度机制,怎么样,小伙伴们有没有什么疑惑?可以在下方留言区留言一起讨论哦。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 终端
    +关注

    关注

    1

    文章

    998

    浏览量

    29599
  • 源代码
    +关注

    关注

    94

    文章

    2929

    浏览量

    66063
  • python
    +关注

    关注

    51

    文章

    4675

    浏览量

    83467
收藏 人收藏

    评论

    相关推荐

    uc/os任务调度机制

    uc/os任务调度机制uc/OS 任务调度机制 内核的核心任务是任务调度机制,为了对uC/OS进行分析,我们从任务调度开始。在uC/OS中,一个任务通常是一个无限循环,程序具有如下的结
    发表于 07-07 09:46

    UCOS之任务调度机制

    UCOS之任务调度机制
    发表于 05-30 07:56

    VxWorks系统的任务调度机制

    针对多任务系统而言,调度是指根据一定的算法.将CPU 分配给符合条件的任务使用,不同的系统任务调度机制不同。本文介绍VxWorks系统的任务调度策略和算法.分析优先级倒置产
    发表于 12-16 14:11 10次下载

    Linux与VxWorks任务调度机制分析

    Linux与VxWorks任务调度机制分析
    发表于 03-28 09:52 19次下载

    嵌入式实时操作系统VxWorks内核调度机制研究

    嵌入式实时操作系统VxWorks内核调度机制研究
    发表于 03-29 12:26 13次下载

    μC/OS-II 任务调度机制的改进

    介绍μC/OS-II 任务调度机制,并提出一种改进方法,使μC/OS-II变成一个兼备实时与分时任务调度机制的操作系统; 论述改进后系统的特点和要注意的问题,给出部分源代码。
    发表于 04-15 11:21 14次下载

    高可信赖实时操作系统的防危调度机制

    为增强实时操作系统的防危性,在分析现有调度机制的基础上,探讨了最大关键度优先的调度算法,该算法是一种混合型的优先级实时调度算法,由静态优先级、动态子优先级和
    发表于 05-16 11:52 10次下载

    基于轮循机制和RED的语音流调度机制

           本文提出了一种新的VoIP业务流调度机制(RR-RED),通过随机早期检测(RED)和轮循机制(Round Robin)控制主动丢包。该机制很好的继承了R
    发表于 09-03 08:58 7次下载

    Li nux与VxWorks任务调度机制分析

    分析了Linux和VxWorks两种多任务操作系统任务调度机制的异同,从任务控制块、调度的时机、调度的优先级和调度的策略方面进行了详细的分析和对比。分析了VxWorks和Linux在P
    发表于 11-13 17:54 10次下载

    VxWorks系统的任务调度机制

    针对多任务系统而言,调度是指根据一定的算法.将CPU 分配给符合条件的任务使用,不同的系统任务调度机制不同。本文介绍VxWorks系统的任务调度策略和算法.分析优先级倒置产生
    发表于 11-27 16:26 13次下载

    嵌入式实时操作系统VxWorks内核调度机制分析

    本文简要介绍了多任务内核,重点分析了嵌入式实时操作系统VxWorks的内核调度机制——优先级抢占调度和时间片轮转调度算法。
    发表于 12-11 16:15 14次下载

    Windows CE陷阱调度机制

     一.什么是陷阱调度机制?        一般来说,嵌入式操作系统主要由两部分组成:运行在核心态的内核系统和运行在用户态的环境子系统组成。因
    发表于 08-27 14:38 570次阅读

    基于动态概率休眠调度机制的WSNs拓扑控制算法_韩瑞艳

    基于动态概率休眠调度机制的WSNs拓扑控制算法_韩瑞艳
    发表于 03-19 19:19 0次下载

    虚拟计算资源调度机制研究

    针对基于Xen的vCPU调度机制对虚拟机网络性能的影响进行了深入研究和分析。提出一种高效、准确、轻量级的网络排队敏感类型虚拟机(NSVM)识别方法,可根据当前虚拟机I/O传输特征将容易受到影响
    发表于 02-08 17:08 0次下载
    虚拟计算资源<b class='flag-5'>调度机制</b>研究

    NB―IoT物理控制信道NB―PDCCH及资源调度机制

    NB―IoT物理控制信道NB―PDCCH及资源调度机制(现代电源技术试题及答案)-NB―IoT物理控制信道NB―PDCCH及资源调度机制          
    发表于 08-31 19:56 13次下载
    NB―IoT物理控制信道NB―PDCCH及资源<b class='flag-5'>调度机制</b>