上篇文章中,已经介绍了celery和RabbitMQ的安装以及基本用法。

本文将从工程的角度介绍如何使用celery。

1.配置和启动RabbitMQ

请参考celery+RabbitMQ实战记录

2. 安装和使用celery

2.1 创建虚拟环境,并安装celery

$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery

项目的目录结构说明:

-- celery_demo
-- api.py
-- celeryconfig.py
-- rocket
-- celery.py
-- tasks.py

2.2 配置celery

创建配置文件 celeryconfig.py,里面包含CELERY_IMPORTSBROKER_URLCELERYD_LOG_FORMATCELERY_ROUTES.

# celeryconfig.py

RABBIT_MQ = {
'HOST': '127.0.0.1',
'PORT': 5672,
'USER': 'test',
'PASSWORD': '123456'
} CELERY_IMPORTS = ("rocket.tasks", ) BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT']) CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s' CELERY_ROUTES = {
'rocket.tasks.add': {'queue': 'sunday'},
}

其中,参数定义如下:

  • CELERY_IMPORTS 导入的task

  • BROKER_URL指定了broker信息,即消息队列的地址。

  • CELERYD_LOG_FORMAT 指定了日志格式。

  • CELERY_ROUTES 指定了路由信息,即调用rocket.tasks.add后,消息具体放入哪个队列,这里是队列名称为sunday

2.3 创建celery实例

创建目录

mkdir rocket

在rocket目录下,创建文件celery.py

from celery import Celery

app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")

创建实例,并读取配置。

2.4 定义tasks

在rocket目录下,创建文件tasks.py

from rocket.celery import app

@app.task
def add(x, y):
return x + y

定义tasks。

2.5 启动celery(即消费者)

确认当前所在的目录:

$ pwd
/workspace/celery_demo

启动消费者

$ ./venv3/bin/celery worker  -A rocket -Q sunday --loglevel=info  -f app.log

/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend. alternative='Please use RPC backend or a persistent backend.') celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb) Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56 [config]
.> app: orange:0x10abdb6d8
.> transport: amqp://test:**@127.0.0.1:5672/myvhost
.> results: amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker) [queues]
.> sunday exchange=sunday(direct) key=sunday [tasks]
. rocket.tasks.add

2.6 生产者

创建文件api.py,会调用apply_async向队列中投放消息。

# api.py
from rocket.tasks import add print("start...") result = add.apply_async((1, 2), expires=10) print("result:", result)
print(result.ready()) print("end...")

执行api.py,启动生产者

./venv3/bin/python api.py
start...
result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
True
end...

2.6 查看结果

查看日志文件app.log

[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
[2019-04-23 09:53:47,455] [INFO] mingle: all alone
[2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
[2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] expires:[2019-04-23 01:54:05.817853+00:00]
[2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3

参考

using celery in project

最新文章

  1. 国内最全最详细的hadoop2.2.0集群的HA高可靠的最简单配置
  2. Java @Override报错
  3. hadoop的dfs工具类一个【原创】
  4. 【网络流24题】No.1 搭配飞行员(飞行员配对方案问题)
  5. 去掉搜狗拼音烦人的x+;进入搜狗搜索
  6. 图示Servelet请求过程
  7. js实现多行文本超出一定字数显示省略号功能
  8. 201521123060《Java程序设计》第2周学习总结
  9. CSS文字不换行,溢出省略
  10. C++11的原子量与内存序浅析
  11. C++中的stack类、QT中的QStack类
  12. 关于Box Anemometer的安装配置遇到的几个坑
  13. Docker 从入门到实践(一)Docker 简介
  14. 小学生都看得懂的C语言入门(1): 基础/判别/循环
  15. Ubuntu14.04 安装MySQL 及Can‘t connect to local MYSQL server through socket ’/var/run/mysqld/mysqld.sock‘ (2)
  16. 【转载】Java8 HashMap之tableSizeFor
  17. oracle版本兼容问题
  18. Spring-security-Oauth2.0
  19. JavaWeb基础—EL表达式与JSTL标签库
  20. java中的变量各占得字节数

热门文章

  1. react native获取组件高度,宽 度等
  2. 安装Python mysqlclient出现“OSError: mysql_config not found”错误
  3. 马凯军201771010116《面向对象与程序设计Java》第十三周学习总结
  4. Spring学习札记(一)
  5. 解决Caused by: java.lang.IllegalArgumentException: A universal match pattern ('/**') is defined before other patterns in the filter chain, causing them to be ignored. Please check the ordering in your
  6. PHP 基于pdo的数据库操作类
  7. IDEA 编译时 报 “常量字符串过长” 解决办法
  8. Toggle Slow Animations
  9. 几个NAND/NOR门可以表示一个XOR门?
  10. webservice接口和http接口介绍---更新版