Celery

官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

Celery执行异步任务

包架构封装

project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须是celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果

基本使用(添加立即执行任务)

执行流程:

​ 1)创建app + 任务

​ 2)启动celery(app)服务:

​ 非windows

​ 命令:celery worker -A celery_task -l info

​ windows:

​ pip3 install eventlet

​ celery worker -A celery_task -l info -P eventlet

​ 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

​ 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

celery.py
from celery import Celery

# broker: 任务仓库
broker = 'redis://127.0.0.1:6379/5'
# backend: 任务结果仓库
backend = 'redis://127.0.0.1:6379/6'
# include: 任务(函数)所在文件
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py(任务文件)
from .celery import app
import time
@app.task
def add(n, m):
print(n)
print(m)
time.sleep(10)
print('n+m的结果:%s' % (n + m))
return n + m @app.task
def low(n, m):
print(n)
print(m)
print('n-m的结果:%s' % (n - m))
return n - m
add_task.py(添加要执行的任务)
# 右键执行该文件,下面的导入环境是合理的
from celery_task.tasks import add, low # 往celery的Broker中添加立即任务
# 先启动celery: celery worker -A celery_task -l info -P eventlet ,然后右键运行执行
t1 = add.delay(10, 20)
t2 = low.delay(50, 10)
print(t2.id)
get_result.py(查看任务结果)
from celery_task.celery import app

from celery.result import AsyncResult

# 任务执行的id,可从上方任务执行完获取
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
# 拿到任务执行完的结果
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

高级使用(执行延迟任务)

celery.py
from celery import Celery

# broker:任务仓库
broker = 'redis://127.0.0.1:6379/15'
# backend:任务结果仓库
backend = 'redis://127.0.0.1:6379/15'
# include:任务(函数)所在文件
app = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])
tasks.py
from .celery import app

@app.task
def jump(n1, n2):
res = n1 * n2
print('n1 * n2 = %s' % res)
return res
add_task.py(添加延迟任务)

注:

args是jump任务需要的参数,没有就设置为空()

​ eta是该任务执行的UTC格式的时间

from celery_package.tasks import jump

# # 直接执行函数
# jump(10, 20) # 添加celery立即任务
# jump.delay(10, 20) from datetime import datetime, timedelta
# 以秒为单位添加延迟时间
def eta_second(second):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=second)
return utc_ctime + time_delay # 以天为单位添加延迟时间
def eta_days(days):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(days=days)
return utc_ctime + time_delay # apply_async就是添加延迟任务
jump.apply_async(args=(200, 50), eta=eta_second(10))

高级使用(自动任务)

执行流程:

​ 1)创建app + 任务

​ 2)启动celery(app)服务:

​ 非windows

​ 命令:celery worker -A celery_task -l info

​ windows:

​ pip3 install eventlet

​ celery worker -A celery_task -l info -P eventlet

​ 3)添加任务:自动添加任务,所以要启动一个添加任务的服务

​ 命令:celery beat -A celery_task -l info

​ 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

celery.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/15'
backend = 'redis://127.0.0.1:6379/15'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False # 自动任务的定时配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
# 定时任务:任务名自定义
'fall_task': {
'task': 'celery_task.tasks.fall', # 任务源
'args': (30, 10), # 任务参数
'schedule': timedelta(seconds=3), # 定时添加任务的时间
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
}
}
tasks.py
from .celery import app

@app.task
def fall(n1, n2):
res = n1 / n2
print('n1 / n2 = %s' % res)
return res
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

django中使用

注意点:

添加自动任务时,需要另外启动一个添加任务的服务,就是再起一个服务端运行下面的命令。

命令:celery beat -A celery_task -l info

celery.py
# 加载django环境
import os, django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup() from celery import Celery
# 任务仓库
broker = 'redis://127.0.0.1:6379/15'
# 任务结果仓库
backend = 'redis://127.0.0.1:6379/15'
# include任务函数文件的位置
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False # 自动任务的定时配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
# 定时任务:任务名自定义
'update_banner_cache': {
'task': 'celery_task.tasks.update_banner_cache', # 任务源
'args': (), # 任务参数
'schedule': timedelta(seconds=10), # 定时添加任务的时间
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
}
}
tasks.py
from .celery import app
# 获取项目中的模型类
from api.models import Banner @app.task
def test_django_celery():
banner_query = Banner.objects.filter(is_delete=False).all()
print(banner_query)

最新文章

  1. 定时Job在IIS中潜在危险-IIS 定期回收
  2. 判断是否字符串是否是JSON
  3. PowerDesigner V16.5 安装文件 及 破解文件
  4. getting started with Baxter Research Robot
  5. POJ 3259 Wormholes (Bellman_ford算法)
  6. C++ Primer : 第十三章 : 拷贝控制之拷贝、赋值与销毁
  7. sql server 创建文件组,文件
  8. 关于SQL Server 数据库的备份
  9. js压缩、混淆和加密
  10. 移动web设计稿尺寸,关于移动web尺寸的那点事
  11. USB OTG ID 检测原理
  12. JavaScript之Style属性学习
  13. NetBeans文件被锁,无法修改
  14. Java并发编程之闭锁CountDownLatch简单介绍
  15. IOS获取经度纬度
  16. CSS引入本地字体与在线字体
  17. git push完代码 想撤回 并保留之前修改的代码 / 修改完代码 发现分支不对 想切换分支 /恢复已修改的文件
  18. Jenkins学习
  19. c++入门之内置数组和array比较
  20. django+celery +rabbitmq

热门文章

  1. Apache Solr Velocity模板注入RCE漏洞复现
  2. net core WebApi——公用库April.Util公开及发布
  3. 就因为加了Lombok的@Accessors(chain = true),bean拷贝工具类不干活了
  4. 《Effective Java》 读书笔记(五)使用依赖注入取代原本的资源依赖
  5. js中几种继承实现
  6. Go netpoll I/O 多路复用构建原生网络模型之源码深度解析
  7. 五角场之殇。曾与张江、漕河泾、紫竹齐名。如今,上海四大IT科技园是否还在?
  8. Windows Server 搭建企业无线认证(Radius认证方案)
  9. js清除节点内容(改变标签元素)
  10. Java Part 001( 01_01_Java概述 )