celery+RabbitMQ 实战记录2—工程化使用
2024-10-14 08:38:37
上篇文章中,已经介绍了celery和RabbitMQ的安装以及基本用法。
本文将从工程的角度介绍如何使用celery。
1.配置和启动RabbitMQ
2. 安装和使用celery
2.1 创建虚拟环境,并安装celery
$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery
项目的目录结构说明:
-- celery_demo
-- api.py
-- celeryconfig.py
-- rocket
-- celery.py
-- tasks.py
2.2 配置celery
创建配置文件 celeryconfig.py,里面包含CELERY_IMPORTS
、BROKER_URL
、CELERYD_LOG_FORMAT
、CELERY_ROUTES
.
# celeryconfig.py
RABBIT_MQ = {
'HOST': '127.0.0.1',
'PORT': 5672,
'USER': 'test',
'PASSWORD': '123456'
}
CELERY_IMPORTS = ("rocket.tasks", )
BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT'])
CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'
CELERY_ROUTES = {
'rocket.tasks.add': {'queue': 'sunday'},
}
其中,参数定义如下:
CELERY_IMPORTS
导入的taskBROKER_URL
指定了broker信息,即消息队列的地址。CELERYD_LOG_FORMAT
指定了日志格式。CELERY_ROUTES
指定了路由信息,即调用rocket.tasks.add
后,消息具体放入哪个队列,这里是队列名称为sunday
。
2.3 创建celery实例
创建目录
mkdir rocket
在rocket目录下,创建文件celery.py
from celery import Celery
app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")
创建实例,并读取配置。
2.4 定义tasks
在rocket目录下,创建文件tasks.py
from rocket.celery import app
@app.task
def add(x, y):
return x + y
定义tasks。
2.5 启动celery(即消费者)
确认当前所在的目录:
$ pwd
/workspace/celery_demo
启动消费者
$ ./venv3/bin/celery worker -A rocket -Q sunday --loglevel=info -f app.log
/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative='Please use RPC backend or a persistent backend.')
celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb)
Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56
[config]
.> app: orange:0x10abdb6d8
.> transport: amqp://test:**@127.0.0.1:5672/myvhost
.> results: amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> sunday exchange=sunday(direct) key=sunday
[tasks]
. rocket.tasks.add
2.6 生产者
创建文件api.py,会调用apply_async
向队列中投放消息。
# api.py
from rocket.tasks import add
print("start...")
result = add.apply_async((1, 2), expires=10)
print("result:", result)
print(result.ready())
print("end...")
执行api.py,启动生产者
./venv3/bin/python api.py
start...
result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
True
end...
2.6 查看结果
查看日志文件app.log
[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
[2019-04-23 09:53:47,455] [INFO] mingle: all alone
[2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
[2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] expires:[2019-04-23 01:54:05.817853+00:00]
[2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3
参考
最新文章
- 国内最全最详细的hadoop2.2.0集群的HA高可靠的最简单配置
- Java @Override报错
- hadoop的dfs工具类一个【原创】
- 【网络流24题】No.1 搭配飞行员(飞行员配对方案问题)
- 去掉搜狗拼音烦人的x+;进入搜狗搜索
- 图示Servelet请求过程
- js实现多行文本超出一定字数显示省略号功能
- 201521123060《Java程序设计》第2周学习总结
- CSS文字不换行,溢出省略
- C++11的原子量与内存序浅析
- C++中的stack类、QT中的QStack类
- 关于Box Anemometer的安装配置遇到的几个坑
- Docker 从入门到实践(一)Docker 简介
- 小学生都看得懂的C语言入门(1): 基础/判别/循环
- Ubuntu14.04 安装MySQL 及Can‘t connect to local MYSQL server through socket ’/var/run/mysqld/mysqld.sock‘ (2)
- 【转载】Java8 HashMap之tableSizeFor
- oracle版本兼容问题
- Spring-security-Oauth2.0
- JavaWeb基础—EL表达式与JSTL标签库
- java中的变量各占得字节数
热门文章
- react native获取组件高度,宽 度等
- 安装Python mysqlclient出现“OSError: mysql_config not found”错误
- 马凯军201771010116《面向对象与程序设计Java》第十三周学习总结
- Spring学习札记(一)
- 解决Caused by: java.lang.IllegalArgumentException: A universal match pattern ('/**') is defined before other patterns in the filter chain, causing them to be ignored. Please check the ordering in your
- PHP 基于pdo的数据库操作类
- IDEA 编译时 报 “常量字符串过长” 解决办法
- Toggle Slow Animations
- 几个NAND/NOR门可以表示一个XOR门?
- webservice接口和http接口介绍---更新版