Celery简介

Celery是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery构成

Task

任务模块, 包含异步任务和定时任务, 异步任务通常在业务逻辑中被触发并发往Broker任务队列, 定时任务由Celery Beat 进程周期性地发往Broker任务队列

Broker

消息中间件, 就是任务调度队列, 用来接收任务, 将任务存储到队列中, 就像是生产者与消费者模型中的队列一样

Celery本身不提供Broker, 官方推荐使用RabbitMQ和Redis

Worker

任务的执行单元, 它实时监控Broker队列, 获取队列中的任务, 并执行, 可以看做是生产者与消费者模型中的消费者

Backend

任务执行结果的存储单元, 用来存储任务结果, 以便查询

Celery本身不提供Backend, 官方推荐使用RabbitMQ和Redis

Celery使用

安装

python安装Celery: pip install celery

我们使用Redis作为Broker和Backend, 因此确保你的设备配置了Redis环境

基本使用

"""
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须是celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果 """

异步任务: delay

"""
1.执行add_task.py将任务添加到队列
2.cmd切换至所在文件目录celery_task下运行worker:
>: celery worker -A celery_task -l info -P eventlet
3.执行get_result.py获取任务结果 """
# celery.py
from celery import Celery # 配置消息中间件, 用来接收任务
broker = 'redis://127.0.0.1:6379/0' # 配置backend, 用来存储任务执行结果
backend = 'redis://127.0.0.1:6379/1' # worker, 任务执行单元
app = Celery(broker=broker, backend=backend)
# tasks.py
from .celery import app # 定义任务
@app.task
def add(x, y):
res = x + y
print(f'{x}+{y}={res}')
return res @app.task
def minus(x, y):
res = x - y
print(f'{x}-{y}={res}')
return res
# add_task.py
from .tasks import add, minus # 在业务逻辑中触发异步任务
add_results = add.delay(10, 20) # 任务执行结果的id
print(add_results.id)
# get_result.py
from .celery import app
from .add_tasks import add_results
from celery.result import AsyncResult if __name__ == '__main__':
# 获取异步任务结果对象, 参数:id, app
async = AsyncResult(id=add_results.id, app=app) if async.successful():
result = async.get()
print('任务执行成功')
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

延迟任务: apply_async

from celery_task.tasks import add
from datetime import timedelta, datetime # 添加延时任务, 10秒后执行
add_results = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10)) # 任务执行结果的id
print(add_results.id)

周期任务: beat_schedule

注意: 周期任务是通过 celery beat 来周期性添加的, , 因此启动worker服务之后, 还要重开一个cmd窗口启动beat服务: celery beat -A celery_task -l info

# celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab # 配置消息中间件, 用来接收任务
broker = 'redis://127.0.0.1:6379/0' # 配置backend, 用来存储任务执行结果
backend = 'redis://127.0.0.1:6379/1' # worker, 任务执行单元
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区
app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = {
'add-task': {
'task': 'celery_task.tasks.add',
# 每10秒添加一次任务
'schedule': timedelta(seconds=10),
'args': (10, 20)
},
'minus-task': {
'task': 'celery_task.tasks.minus',
# 每周一八点半添加一次任务
'schedule': crontab(hour=8, minute=30, day_of_week=1),
'args': (20, 10)
} }

Django配置Celery

在根目录下创建包文件夹 celery_task

"""
project
├── celery_task # celery包
├── __init__.py # 包文件
├── celery.py # celery连接和配置相关文件,且名字必须是celery.py
└── tasks.py # 所有任务函数
"""

celery.py

# 1.加载Django配置环境
import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'luffyapi.settings.dev') # 2.加载Celery配置环境
from celery import Celery broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks.py']) # 时区
app.conf.timezone = 'Asia/Shanghai'
# UTC
app.conf.enable_utc = False from datetime import timedelta
app.conf.beat_schedules = {
'update-banner-cache': {
'task': 'celery_task.tasks.py.update_banner_cache',
# 每10秒添加一次
'schedule': timedelta(seconds=10),
'args': ()
}
}

task.py

from .celery import app
from home.models import Banner
from home.serializers import BannerModerSerializer
from django.conf import settings
from django.core.cache import cache @app.task
def update_banner_cache():
print('lalal')
banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-order').all()
serializer_obj = BannerModerSerializer(data=banner_query, many=True)
banner_data = serializer_obj.data
for banner in banner_data:
banner['image'] = settings.BASE_URL + banner.get('image')
cache.set('banner_cache', banner_data)
return True

最新文章

  1. 冰冻三尺非一日之寒--web来了
  2. PHP 汉字拼音互转
  3. .net core 基本概念
  4. 转换成的jar文件接收后台的信息乱码cmd解决办法
  5. spring mvc 删除返回字符串中值为null的字段
  6. iReport 开发和运行所用版本不一致导致设置字体大小不起作用
  7. PHP延迟静态绑定 static关键字
  8. centos添加PATH环境变量
  9. MSsql 服务器之间远程及其链接查询
  10. 如何用js获取当前url的参数值
  11. bzoj 1412 [ZJOI2009]狼和羊的故事(最小割)
  12. Linux下限制Shell:Rssh和Scponly
  13. oracle 包,函数,过程,块的创建和执行及在java中执行(转)
  14. Android更新UI的几种方法
  15. vue中computed、metfods、watch的区别
  16. 【Spring学习】Spring的源码解析之路
  17. Faster_RCNN 4.训练模型
  18. Win10系列:C#应用控件基础8
  19. CSS 常用技巧
  20. Caltech数据使用详情

热门文章

  1. 原生js用div实现简单的轮播图
  2. mybatis 一对多查询 集合创建空对象的问题
  3. Native memory allocation (mmap) failed to map 142606336 bytes for committing reserved memory.
  4. [转]在Windows中安装Memcached
  5. H3C 单区域OSPF配置示例一(续)
  6. 2019-8-30-C#-反射调用私有事件
  7. java 九个预定义Class对象
  8. P1034 台阶问题一
  9. 2019-8-6-在-Gitlab-开启-MatterMost-机器人
  10. 微信小程序开发踩坑之旅