整个项目工程如下

__init__.py

"""
注意点:python3.7 需要执行 pip install --upgrade https://github.com/celery/celery/tarball/master
否则会报 from . import async, base SyntaxError: invalid syntax celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis
celery -A __init__ worker -l info -Q save_mongo
cd /Users/admin/PycharmProjects/function_test/adminSyS/mq&&celery -A __init__ worker --concurrency=5 celery 启动 --autoscale=10,3 当worker不足时自动加3-10个
celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3 supervisor 配置
[program:celery]
directory = /data/app/adminSyS/mq
command = /data/app/adminSyS/venv/bin/celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis
autostart = true
autorestart = true
startsecs = 5
startretries = 3 监控:
文档 https://flower-docs-cn.readthedocs.io/zh/latest/install.html
pip install flower
celery flower --broker=redis://:z1234567@47.93.235.228:6379/5 --port=5555
测试 http://47.93.235.228:9114
"""
import os
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
from celery import Celery app = Celery(include=["mq.tasks"])
app.config_from_object("celeryconfig") # 指定配置文件

celeryconfig.py

import os
import sys sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
from kombu import Queue, Exchange BROKER_URL = "redis://:redis@127.0.0.1:6379/5" # 消息代理
CELERY_RESULT_BACKEND = "redis://:redis@127.0.0.1:6379/6" # 任务执行结果存放地址
CELERY_TASK_SERIALIZER = "json" # 任务序列化和翻序列化的方案
CELERY_RESULT_SERIALIZER = "json" # 读取任务结果序列化方式
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_ACCEPT_CONTENT = ["json", "msgpack"] # 接受的任务类型
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400} # 任务重发时间
CELERYD_MAX_TASKS_PER_CHILD = 400 # 每个worker执行了多少任务就会自动重启防止内存泄漏 # 指定任务队列 save_redis save_redis2
CELERY_QUEUES = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("save_redis", Exchange("save_redis"), routing_key="save_redis"),
Queue("save_redis2", Exchange("save_redis2"), routing_key="save_redis2")
) # mq.tasks.taskA mq.tasks.taskB 具体函数名
CELERY_ROUTES = {
'mq.tasks.taskA': {"queue": "save_redis", "routing_key": "save_redis"},
'mq.tasks.taskB': {"queue": "save_redis2", "routing_key": "save_redis2"}
}

tasks.py

from mq import app
import redis
import time rds = redis.Redis(
host="localhost",
port=6379,
db=7,
password="redis") # 客户端 @app.task
def taskA(x, y):
time.sleep(10)
rds.setex(name="taskA", value=x + y, time=3600)
return x + y @app.task
def taskB(x, y, c):
time.sleep(10)
rds.setex(name="taskB{}".format(c), value=x + y, time=3600)
return x + y

test.py

import sys
import os sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
from mq.tasks import taskA, taskB #re1 = taskA.delay(100, 200)
# print(re1)
# print(re1.status) # 服务端 for i in range(10):
re2 = taskB.delay(100, i, i)
print(re2)
print(re2.status)

celery客户端启动  save_redis2,save_redis 代表函数名    --autoscale=10,3 当work不够用时自动起3-10个进程

celery -A __init__  worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3

#启动方式

../venv/bin/celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3


# 启动方式打印log
command=/home/op/saiyan_game_center/venv/bin/celery -A __init__ worker --concurrency=8 -l INFO -Q upload_box_task --autoscale=5,3 --logfile=/home/op/saiyan_game_center/logs/log.log

celery web监控

文档:https://flower-docs-cn.readthedocs.io/zh/latest/install.html

安装:pip install flower

启动:celery flower --broker=代理url --port=5555

celery flower --broker=redis://:redis@127.0.0.1:6379/5 --port=5555

最新文章

  1. jQueryt过滤选择器
  2. appsrequest v3 发布了
  3. sqlmap的一些小技巧
  4. ubuntu解决libstdc++.so.6: cannot open shared object file: No such file or directory:问题
  5. dedecms程序给栏目增加缩略图的方法
  6. 四、Linux/UNIX操作命令积累【chmod、chown、tail】
  7. MongoDB安全策略
  8. Windows Developer Day - Adaptive Cards
  9. Hadoop基础知识串烧
  10. python3 报错
  11. js监听手机端点击物理返回键或js监听pc端点击浏览器返回键
  12. MT【13】三角函数求范围
  13. WebSocket.之.基础入门-建立连接
  14. stl集合算法
  15. 树莓派上启动nfs server
  16. python之模块pprint之常见用法
  17. android在不加载图片的前提下获得图片的宽高
  18. centos图形界面的开启和关闭
  19. JUC——TimeUnit工具类(二)
  20. C#版-Redis缓存服务器在Windows下的使用

热门文章

  1. Python卸载不干净?苹果电脑卸载python教程
  2. Linux不管上一条命令成功还是失败都执行下一个命令的方法
  3. 【转载】python2x与3x下urlretrieve的使用
  4. 图论 - PAT乙级 1029 旧键盘 (C++ python3)
  5. oracle彻底删除干净
  6. Flash完美跨域访问的方法
  7. python应用-表格式输出一组数据
  8. Spring Cloud Sleuth 知识点
  9. spark延迟调度与动态资源管理
  10. tomcat catalina