1.Celery模块调用

既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。

多worker,多队列,实例:

1.在服务器上编写文件tasks.py。首先定义一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。

#!/usr/bin/env
#-*-conding:utf-8-*-
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True app = Celery()
app.config_from_object("celeryconfig") @app.task
def tashA(x,y):
return x*y @app.task
def taskB(x,y,z):
return x+y+z @app.task
def add(x,y):
return x+y

2.编写celeryconfig.py文件。

#!/usr/bin/env python
#-*- coding:utf-8 -*-
from kombu import Exchange,Queue
from celery import platforms
platforms.C_FORCE_ROOT = True BROKER_URL = "redis://localhost:6379/7" 
CELERY_RESULT_BACKEND = "redis://localhost:6379/8" CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") 
) CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}

3.启动worker来指定task

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

4.传入参数

将上面两个文件导出到pycharm中:

编写文件传参:

from tasks import *
re1 = taskA.delay(100, 200)
re2 = taskB.delay(1,2, 3)
print(re3.status) #查看re3的状态
print(re3.id)               #查看re3的id

运行之后可见:taskA,taskB都已正常执行。

5.我们可以看到add(re3)的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

celery -A tasks worker -l info -n worker.%h -Q celery

这样我们再次运行pycharm就可以看见add也被运行了,并且redis数据库中也有该id了。

2.Celery与定时任务

1.在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'taskA_schedule' : {
        'task':'tasks.taskA',
        'schedule':20,
        'args':(5,6)
    },
    'taskB_scheduler' : {
        'task':"tasks.taskB",
        "schedule":200,
        "args":(10,20,30)
    },
    'add_schedule': {
        "task":"tasks.add",
        "schedule":10,
        "args":(1,2)
    }
}

2.Celery启动定时任务

celery -A tasks worker -l info -n workerA.%h -Q for_task_A -B

启动完成后:

taskA每20秒执行一次taskA.delay(5, 6)

taskB每200秒执行一次taskB.delay(10, 20, 30)

Celery每10秒执行一次add.delay(1, 2)

最新文章

  1. 常用str函数
  2. python小程序
  3. 从头学Qt Quick(2)-- QML语法从一个简单的例子说起
  4. hiho_1054_滑动解锁
  5. Control.Refresh Control.Invalidate 和 Control.OnPaint之间的联系和区别
  6. maven安装和环境变量配置
  7. Web字体库下载及转换工具
  8. SqlServer经典函数之数字去零
  9. sql 分割字符串 存储过程
  10. 安卓WindowManager注入事件如何跳出进程间安全限制
  11. 第三节:Creating API Endpoints (创建API路由)
  12. Visual Studio 2019 使用 Live Share
  13. node,npm,vue的全局升级
  14. 为什么阿里巴巴禁止开发人员使用isSuccess作为变量名
  15. js運算符
  16. [POJ题目分类][转]
  17. spring controller 获取context
  18. hdu 6406 Taotao Picks Apples 线段树 单点更新
  19. 解决MySQL复制出错 Last_SQL_Errno:1146
  20. 中文乱码—Servlet—SpringMVC

热门文章

  1. jenkins和gitlab结合的时候出错
  2. 树莓派(raspberry pi)学习11: 将树莓派变成一个Web服务器(转)
  3. Ubuntu菜鸟入门(十一)—— windows 和 ubuntu时间冲突解决
  4. 【Android】播放音频的几种方式介绍
  5. 《JAVA与模式》之简单工厂与工厂方法
  6. the most beautiful media player on the linux platform.
  7. ss 命令
  8. [转]用GSON 五招之内搞定任何JSON数组
  9. Python Tensorflow CNN 识别验证码
  10. jmeter 签名MD5生成