蒋振飞的博客 - Django框架17:Celery的使用   
正在加载蒋振飞的博客...
V3.0
蒋振飞的博客

Django框架17:Celery的使用

发布时间: 2018年09月26日 发布人: 蒋振飞 热度: 325 ℃ 评论数: 0

一、Celery介绍

1.什么是Celery

    Celery是一个python模块,它在官网的定义:Celery is asynchronous task queue/job based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
    这里强调的概念包括:异步任务队列,分布式消息传递,实时或调度任务。

2.为什么要使用Celery

    Celery是一个使用Python开发的分布式任务调度模块,因此对于大量使用Python构建的系统,可以说是无缝衔接,使用起来很方便。Celery专注于实时处理任务,同时也支持任务的定时调度。因此适合实时异步任务定时任务等调度场景。

3.Celery任务队列

    任务队列是一种在进程或机器之间分发任务的机制。 
    任务队列的输入是被称为任务(task)的工作单元。专用的工作进程会时刻监控任务队列,来获取要执行的任务。
    celery的client和worker通过消息来“沟通“。Celery需要依靠RabbitMQ等作为消息代理,同时也支持Redis甚至是Mysql,Mongo等,当然,官方默认推荐的是RabbitMQ。
    为了开始一个任务,client需要向队列中发送任务消息,然后broker会把任务投递给worker处理。一个celery系统可以包含多个worker和broker,以便实现高可用和可水平扩展。

4.官方网站http://www.celeryproject.org/

5.中文文档:http://docs.jinkan.org/docs/celery/

二、Celery配置

参考Github实践例子:https://github.com/celery/celery/tree/3.1/examples/django

1.导入celery的app

    在包含settings文件夹proj的__init__.py中添加

# proj/__init__.py
from __future__ import absolute_import
from .celery import app as celery_app

2.新建celery.py文件

    同样在包含settings文件夹下进行操作

# proj/celery.py
# coding:utf8
from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

3.新增celery的配置

    在settings.py文件中,添加一些celery的配置。

# proj/settings.py
# 修改点1
INSTALLED_APP += [
    'kombu.transport.django',
    'djcelery'
]

# 修改点2
# 生产环境我使用redis作为broker
BROKER_URL = 'redis://172.23.18.116:6379/0'
# 开发环境可以直接用django作为broker
BROKER_URL = 'django://'

# 修改点3
#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

4.建立task任务

    做好上面的步骤之后,就可以在自己的app中新增tasks.py文件,里面用shared_task来装饰celery任务,如demoapp/tasks.py。

# demoapp/tasks.py
from __future__ import absolute_import
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

三、celery测试及使用

1.启动worker

# 这里指定了配置,默认的话忽略info后面
python manage.py celery worker --loglevel=info --settings=proj.settings

2.Django环境下测试

    使用下面的命令也就是在django环境中执行,常用来测试django模块功能。

# 同上忽略指定的配置文件
python manage.py shell --settings=proj.settings

    试着执行shared_task add()

from demoapp.tasks import add
add.delay(3, 3)

    上面的worker log中出现add()的return也就是6,说明celery配置正确。

3.celery的定时任务

    这一步是celery beat定时或者定间隔给celery发送task。

# proj/settings.py
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'12

    接下来,将目标djcelery的定时文件迁移一下,即可在admin后台配置定时任务时间间隔等内容。

python manage.py migrate

    再然后

python manage.py celery beat

    可以看到celery beat开始按之前admin后台设置的时间间隔或者crontab开始发送task给celery了,此时自然也需要celery worker在running状态。celery beat发送的任务return的结果在celery log中可以看到,说明django-celery 定时任务配置成功。

打赏 蒋振飞

取消

感谢您的支持,我会继续努力的!

扫码支持
一分也是爱     一块不嫌多

点击 支付宝 或 微信 打赏蒋振飞

打开支付宝扫一扫,即可进行扫码打赏哦

评论列表