Django-django-celery的配置
2024-10-21 14:32:15
1.安装django-celery
pip install django-celery
2.添加配置 demos/demos/settings.py(我的项目名为demos,这里只是示范,切勿搞混)
# celery 相关配置
# 配置celery时区,默认时UTC。
CELERY_TIMEZONE = TIME_ZONE
# 任务队列的链接地址 celery配置redis作为broker。redis有16个数据库,编号0~15。
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 设置存储结果的后台 结果队列的链接地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3'
# 可接受的内容格式
CELERY_ACCEPT_CONTENT = ["json"]
# 任务序列化数据格式
CELERY_TASK_SERIALIZER = "json"
# 结果序列化数据格式
CELERY_RESULT_SERIALIZER = "json"
在项目同名目录下创建 demos/demos/celery.py(必须按照要求来)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 指定Django默认配置文件模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
# 为项目demos创建一个Celery实例。
app = Celery('demos')
# 这里指定从django的settings.py里读取celery配置,且setting中的配置文件必须以'CELERY_'开头
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从所有已注册的django app中加载任务
app.autodiscover_tasks()
在已经注册app目录中,创建名为tasks.py的文件(如果取别的名字,celery就扫描不到了)
将程序导入项目同名目录下的dmos/demos/init.py中
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
3.启动worker
1.先启动redis server
2.cd到项目目录
celery -A demos worker --loglevel=INFO
windows用户:celery -A demos worker --loglevel=INFO -P eventlet
我的版本是,python3.9+django4.0+celery5.0+redis5.0
如果是按照上述方法执行,还有报错,那就极有可能是版本不兼容的问题了。建议去官方文档查看!
4.celery定时任务
from django.shortcuts import HttpResponse
from api import tasks
import datetime
def create_task(request):
# 1.立即执行
# result = tasks.add.delay(2, 2)
# 2.定时执行
# 获取当前的本地时间
ctime = datetime.datetime.now()
# 本地时间转换为utc时间
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
# 当前utc时间加十秒
utc10 = datetime.timedelta(seconds=10)
# 得到目标时间
target_time = utc_ctime + utc10
# 调用函数,传入参数,以及函数执行时间
result = tasks.add.apply_async(args=[20, 3], eta=target_time)
return HttpResponse(result.id)
def get_result(request):
"""
result_object.status # 获取状态
result_object.get() # 获取数据
result_object.forget() # 把数据在backend中移除
result_object.revoke() # 取消任务
result_object.revoke(terminate=True) # 强制取消任务
"""
nid = request.GET.get('nid')
from celery.result import AsyncResult
from demos import celery_app
result_object = AsyncResult(id=nid, app=celery_app)
if result_object.successful():
data = result_object.get()
result_object.forget()
return HttpResponse(data)
elif result_object.failed():
pass
else:
pass
最新文章
- 大数据之Oozie——源码分析(一)程序入口
- 代码的坏味道(5)——数据泥团(Data Clumps)
- 2016-WAS
- Struts2 源码分析——调结者(Dispatcher)之action请求
- caffe的python接口学习(8):caffemodel中的参数及特征的抽取
- 解决Windows Server2008R2中导入Excel不能使用Jet 4.0
- Visual Studio配色方案
- 样式:让div里的两个控件在一行的操作
- Selenium2+python自动化20-Excel数据参数化
- 回顾bidirectional path tracing
- Execl DataTime Format Number
- python Sina微博自动转发带抽奖字样的微博,添加关注,取消关注
- Windows Server 2012 R2 服务器管理器介绍和配置使用
- jquery调用wcf案例
- asp 301代码
- JSP/Servlet(一)
- json-lib之复杂数据类型的转换
- angular 实现导航ng-click切换
- JAVA基础第三组(5道题)
- C语言之二分猜数字游戏