您当前的位置: 首页 > 

IT之一小佬

暂无认证

  • 1浏览

    0关注

    1192博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Celery 应用:Application

IT之一小佬 发布时间:2020-12-29 19:51:02 ,浏览量:1

Celery 应用:Application

Celery 在使用前必须要进行实例化,实例化之后被称为应用程序(简称 app)。 应用程序是线程安全的,可以配置多个不同配置、组件、任务的 Celery 程序在同一个进程中运行。 创建一个实例:

from celery import Celery
app = Celery()
app

最后一行显示的内容包含 app 类的 Celery 名称,当前主模块 (main) 以及对象的内存地址(0x250bbc65eb8)。

celery的主名称:Main Name

        其中主模块的名称是比较重要的。 使用 Celery 进行发送任务消息时,消息内容不会含有源代码信息,只有执行任务的名称。这与主机名在互联网上的工作类似:每一个职程(Worker)通过任务名称与实际的功能进行对应,这样的方式被称为 任务注册 。 每当定义任务时,该任务也将添加到本地注册表中:

@app.task
... def add(x, y):
...     return x + y
... 
add

add.name
'__main__.add'
app.tasks['__main__.add']

在上面的实例中 __main__再次出现,这是因为 Celery 无法检测到该函数属于哪个模块时,自动会使用主模块的名称作为任务名称的开头。 这只是一组有限实例中的问题:

  • 如果在其中定义任务的模块作为程序运行。

  • 如果应用程序是在python shell(repl)中创建的。

例如,这里的 tasks 模块还用于用 app.worker_main() 启动一个职程(Worker):

tasks.py

from celery import Celery
app = Celery()

@app.task
def add(x, y): 
    return x + y

if __name__ == '__main__':
    app.worker_main()

当模块执行时,任务名以 __main__ 开头,被另外一个模块导入时,例如调用一个任务时,任务的名称将以 tasks (模块真实的名称)进行开头命名:

>>> from tasks import add

>>> add.name

tasks.add

也可以设置主模块的名称:

>>> app = Celery('tasks')

>>> app.main

'tasks'

>>> @app.task

... def add(x, y):

... return x + y

>>> add.name

tasks.add
celery配置:Configuration

您可以设置几个选项来更改 Celery 的工作方式。这些选项可以直接在app实例上设置,也可以使用专用的配置模块。

你可以设置几个选项来改变 Celery 的工作方式。这些选项可以直接在应用程序实例上设置,也可以使用专用的配置模块。

可以通过配置几个选项来进行配置 Celery,该配置通过直接在程序实例中配置,也可以通过专用的配置模块进行配置。 配置如下 app.conf

>>> app.conf.timezone
'Europe/London'

也可以直接设置对应的值:

>>> app.conf.enable_utc = True

也可以通过 update 进行一次设置多个键值:

>>> app.conf.update(

... enable_utc=True,

... timezone='Europe/London',

...)

配置对象由多个字典组成,按以下顺序进行查询:

  1. 在运行时所做的更改。

  2. 配置模块(如果有的话)

  3. 默认配置(celery.app.defaults) 也通过 app.add_defaults() 进行配置新的默认配置。

config_from_object

config_from_object() 可以从配置对象中进行加载配置。

加载的内容可以为配置模块、也可以为用于属性的对象。

注意:config_from_object() 在进行调用时会恢复默认的配置,如果需要设置其他的配置,建议在调用完毕之后进行操作。

案例1:使用模块名称

app.config_from_object() 可以从一个 python 模块中(包含属性名称)进行加载,例如:celeryconfigmyproj.config.celerymyproj.config:CeleryConfig

from celery import Celery

app = Celery()

app.config_from_object('celeryconfig')

其中 celeryconfig 模块内容如下:

celeryconfig.py

enable_utc = True

timezone = 'Europe/London'

导入 celeryconfig 程序就可以应用。

 

案例2:配置模块对象

可以配置模块对象,但不建议这样用。

import celeryconfig

from celery import Celery

app = Celery()

app.config_from_object(celeryconfig)

案例3:使用配置类/对象

from celery import Celery

app = Celery()

class Config:

    enable_utc = True

    timezone = 'Europe/London'

app.config_from_object(Config)

# or using the fully qualified name of the object:

# app.config_from_object('module:Config')
  config_from_envvar

app.config_from_envvar() 可以从环境变量获取信息进行配置, 例如,从名称为 CELERY_CONFIG_MODULE 的环境变量中加载配置:

import os

from celery import Celery

#: Set default configuration module name

os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()

app.config_from_envvar('CELERY_CONFIG_MODULE')

然后通过指定的环境变量进行配置使用的配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

celery过滤配置:Censored configuration

如果您希望将配置作为调试信息或类似信息打印出来,那么您也可能希望过滤掉敏感信息,如密码和API密钥。 Celery 提供了集中打印配置信息工具,其中一个为 humanize()

>>> app.conf.humanize(with_defaults=False, censored=True)

该方法将配置信息转换为列表字符串返回,默认情况下,仅包含修改的键值,可以通过 with_defaults 参数进行包含默认的配置信息。 可以通过 table() 方法将返回结果转换为字典:

>>> app.conf.table(with_defaults=False, censored=True)

注意:Celery 不会删除所有的敏感配置信息,通过正则表达式来进行检索通常命名的信息,如果包含敏感信息的自定义配置,Celery 会标识为机密的名称来下进行命名秘钥。

如果命名中含有子字符串,将会进行过滤:

APITOKENKEYSECRETPASSSIGNATUREDATABASE

 

懒加载:Laziness

程序是懒加载的,在没有实际使用的情况下,是不会进行加载的。 创建一个 Celery 程序的流程如下:

  • 创建用于事件的逻辑时钟实例

  • 创建任务注册表

  • 将自身设置为当前应用程序(如果禁用 set_as_current 参数则不会)

  • 调用 app.on_init() 回调函数(默认情况下不执行任何操作)

    app.task() 装饰器不会在定义任务时创建任务,创建任务通常在使用该任务或应用程序完成后进行创建。

该实例会显示,在使用任务或访问属性之前,是如何创建任务的:

>>> @app.task

>>> def add(x, y):

... return x + y

​

>>> type(add)



​

>>> add.__evaluated__()

False

​

>>> add # >> add.__evaluated__()

True

应用程序的终结可以通过调用 app.finalize() 显式执行,也可以通过访问 app.tasks 属性隐式执行。 完成创建对象将:

  • 复制必须在应用之间共享的任务

    默认情况下共享任务,如果装饰器的 shared 参数,该任务为私有任务。

  • 评估所有待处理的任务装饰器

  • 确保当前所有的人呢我都已经绑定到当前应用程序

    任务绑带到应用程序,便于从配置中获取配置信息。

打破链式操作:Breaking the chain

        虽然可以依赖于当前设置的应用程序,但最佳做法是始终将应用程序实例传递给需要它的任何内容。 通常将这种做法称为 app chain,根据传递的应用程序创建一系列实例。 下面的这个实例实不可取的:

from celery import current_app


class Scheduler(object):


    def run(self):

        app = current_app

应该将 app 作为参数进行传递:

class Scheduler(object):

​
def __init__(self, app):

    self.app = app

在celery内部实现中,使用 celery.app_or_default() 函数使得模块级别的 API 也能正常使用:

from celery.app import app_or_default

​

class Scheduler(object):

    def __init__(self, app=None):

        self.app = app_or_default(app)

在开发环境中,可以通过设置 CELERY_TRACE_APP 环境变量在应用实例链被打破时抛出一个异常:

$ CELERY_TRACE_APP=1 celery worker -l info
抽象任务:Abstract Tasks

使用 task() 装饰器创建的所有任务都将从应用程序继承 Task 类。 也可以通过 base 参数进行指定基类:

@app.task(base=OtherTask):

def add(x, y):

    return x + y

创建自定义任务类,需要继承 celery.Task

from celery import Task



class DebugTask(Task):

​

    def __call__(self, *args, **kwargs):

        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))

        return super(DebugTask, self).__call__(*args, **kwargs)

tip 如果重写 taks 中的 __call__ 函数,就必须要要调用 super() 函数,这样基本调用方法可以设置直接调用任务时使用的默认请求。 基类是比较特殊的,因为它还没有绑定到任何特定的应用程序。一旦任务绑定到应用程序,它将读取配置以设置默认值,等等。 要实现基类,需要使用 app.taks() 装饰器创建任务:

@app.task(base=DebugTask)

def add(x, y):

    return x + y

也可以通过更改应用程序的 app.task() 属性来更改其默认基类:

>>> from celery import Celery, Task

​

>>> app = Celery()

​

>>> class MyBaseTask(Task):

... queue = 'hipri'

​

>>> app.Task = MyBaseTask

>>> app.Task



​

>>> @app.task

... def add(x, y):

... return x + y

​

>>> add



​

>>> add.__class__.mro()

[,

,

,

]


 

 
关注
打赏
1665675218
查看更多评论
立即登录/注册

微信扫码登录

0.0412s