Celery 在使用前必须要进行实例化,实例化之后被称为应用程序(简称 app)。 应用程序是线程安全的,可以配置多个不同配置、组件、任务的 Celery 程序在同一个进程中运行。 创建一个实例:
from celery import Celery
app = Celery()
app
最后一行显示的内容包含 app 类的 Celery 名称,当前主模块 (main) 以及对象的内存地址(0x250bbc65eb8)。
celery的主名称:Main Name其中主模块的名称是比较重要的。 使用 Celery 进行发送任务消息时,消息内容不会含有源代码信息,只有执行任务的名称。这与主机名在互联网上的工作类似:每一个职程(Worker)通过任务名称与实际的功能进行对应,这样的方式被称为 任务注册 。 每当定义任务时,该任务也将添加到本地注册表中:
@app.task
... def add(x, y):
... return x + y
...
add
add.name
'__main__.add'
app.tasks['__main__.add']
在上面的实例中 __main__
再次出现,这是因为 Celery 无法检测到该函数属于哪个模块时,自动会使用主模块的名称作为任务名称的开头。 这只是一组有限实例中的问题:
-
如果在其中定义任务的模块作为程序运行。
-
如果应用程序是在python shell(repl)中创建的。
例如,这里的 tasks 模块还用于用 app.worker_main()
启动一个职程(Worker):
tasks.py
from celery import Celery
app = Celery()
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
app.worker_main()
当模块执行时,任务名以 __main__
开头,被另外一个模块导入时,例如调用一个任务时,任务的名称将以 tasks
(模块真实的名称)进行开头命名:
>>> from tasks import add
>>> add.name
tasks.add
也可以设置主模块的名称:
>>> app = Celery('tasks')
>>> app.main
'tasks'
>>> @app.task
... def add(x, y):
... return x + y
>>> add.name
tasks.add
celery配置:Configuration
您可以设置几个选项来更改 Celery 的工作方式。这些选项可以直接在app实例上设置,也可以使用专用的配置模块。
你可以设置几个选项来改变 Celery 的工作方式。这些选项可以直接在应用程序实例上设置,也可以使用专用的配置模块。
可以通过配置几个选项来进行配置 Celery,该配置通过直接在程序实例中配置,也可以通过专用的配置模块进行配置。 配置如下 app.conf
:
>>> app.conf.timezone
'Europe/London'
也可以直接设置对应的值:
>>> app.conf.enable_utc = True
也可以通过 update
进行一次设置多个键值:
>>> app.conf.update(
... enable_utc=True,
... timezone='Europe/London',
...)
配置对象由多个字典组成,按以下顺序进行查询:
-
在运行时所做的更改。
-
配置模块(如果有的话)
-
默认配置(celery.app.defaults) 也通过
app.add_defaults()
进行配置新的默认配置。
config_from_object()
可以从配置对象中进行加载配置。
加载的内容可以为配置模块、也可以为用于属性的对象。
注意:config_from_object()
在进行调用时会恢复默认的配置,如果需要设置其他的配置,建议在调用完毕之后进行操作。
案例1:使用模块名称
app.config_from_object()
可以从一个 python 模块中(包含属性名称)进行加载,例如:celeryconfig
、myproj.config.celery
或 myproj.config:CeleryConfig
:
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
其中 celeryconfig
模块内容如下:
celeryconfig.py
enable_utc = True
timezone = 'Europe/London'
导入 celeryconfig 程序就可以应用。
案例2:配置模块对象
可以配置模块对象,但不建议这样用。
import celeryconfig
from celery import Celery
app = Celery()
app.config_from_object(celeryconfig)
案例3:使用配置类/对象
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
config_from_envvar
app.config_from_envvar()
可以从环境变量获取信息进行配置, 例如,从名称为 CELERY_CONFIG_MODULE
的环境变量中加载配置:
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
然后通过指定的环境变量进行配置使用的配置模块:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
celery过滤配置:Censored configuration
如果您希望将配置作为调试信息或类似信息打印出来,那么您也可能希望过滤掉敏感信息,如密码和API密钥。 Celery 提供了集中打印配置信息工具,其中一个为 humanize()
:
>>> app.conf.humanize(with_defaults=False, censored=True)
该方法将配置信息转换为列表字符串返回,默认情况下,仅包含修改的键值,可以通过 with_defaults
参数进行包含默认的配置信息。 可以通过 table()
方法将返回结果转换为字典:
>>> app.conf.table(with_defaults=False, censored=True)
注意:Celery 不会删除所有的敏感配置信息,通过正则表达式来进行检索通常命名的信息,如果包含敏感信息的自定义配置,Celery 会标识为机密的名称来下进行命名秘钥。
如果命名中含有子字符串,将会进行过滤:
API
、TOKEN
、KEY
、SECRET
、PASS
、SIGNATURE
、DATABASE
懒加载:Laziness
程序是懒加载
的,在没有实际使用的情况下,是不会进行加载的。 创建一个 Celery 程序的流程如下:
-
创建用于事件的逻辑时钟实例
-
创建任务注册表
-
将自身设置为当前应用程序(如果禁用
set_as_current
参数则不会) -
调用
app.on_init()
回调函数(默认情况下不执行任何操作)app.task()
装饰器不会在定义任务时创建任务,创建任务通常在使用该任务或应用程序完成后进行创建。
该实例会显示,在使用任务或访问属性之前,是如何创建任务的:
>>> @app.task
>>> def add(x, y):
... return x + y
>>> type(add)
>>> add.__evaluated__()
False
>>> add # >> add.__evaluated__()
True
应用程序的终结可以通过调用 app.finalize()
显式执行,也可以通过访问 app.tasks
属性隐式执行。 完成创建对象将:
-
复制必须在应用之间共享的任务
默认情况下共享任务,如果装饰器的
shared
参数,该任务为私有任务。 -
评估所有待处理的任务装饰器
-
确保当前所有的人呢我都已经绑定到当前应用程序
任务绑带到应用程序,便于从配置中获取配置信息。
虽然可以依赖于当前设置的应用程序,但最佳做法是始终将应用程序实例传递给需要它的任何内容。 通常将这种做法称为 app chain
,根据传递的应用程序创建一系列实例。 下面的这个实例实不可取的:
from celery import current_app
class Scheduler(object):
def run(self):
app = current_app
应该将 app
作为参数进行传递:
class Scheduler(object):
def __init__(self, app):
self.app = app
在celery内部实现中,使用 celery.app_or_default() 函数使得模块级别的 API 也能正常使用:
from celery.app import app_or_default
class Scheduler(object):
def __init__(self, app=None):
self.app = app_or_default(app)
在开发环境中,可以通过设置 CELERY_TRACE_APP 环境变量在应用实例链被打破时抛出一个异常:
$ CELERY_TRACE_APP=1 celery worker -l info
抽象任务:Abstract Tasks
使用 task()
装饰器创建的所有任务都将从应用程序继承 Task 类。 也可以通过 base
参数进行指定基类:
@app.task(base=OtherTask):
def add(x, y):
return x + y
创建自定义任务类,需要继承 celery.Task
:
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return super(DebugTask, self).__call__(*args, **kwargs)
tip 如果重写 taks
中的 __call__
函数,就必须要要调用 super()
函数,这样基本调用方法可以设置直接调用任务时使用的默认请求。 基类是比较特殊的,因为它还没有绑定到任何特定的应用程序。一旦任务绑定到应用程序,它将读取配置以设置默认值,等等。 要实现基类,需要使用 app.taks()
装饰器创建任务:
@app.task(base=DebugTask)
def add(x, y):
return x + y
也可以通过更改应用程序的 app.task()
属性来更改其默认基类:
>>> from celery import Celery, Task
>>> app = Celery()
>>> class MyBaseTask(Task):
... queue = 'hipri'
>>> app.Task = MyBaseTask
>>> app.Task
>>> @app.task
... def add(x, y):
... return x + y
>>> add
>>> add.__class__.mro()
[,
,
,
]