一.基于python实现的三方组件----Celery

1.作用

用于异步周期任务的处理

2.Celery的组成

(1)任务 app
(2)记录任务的缓存(通常用redis或rabbitMQ)
任务记录 -broker
任务返回记录-backend
(3)Worker 员工
主动执行任务
主动反馈结果

3.celery简单实例

s1.py

from celery import Celery
import time #使用redis连接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼 @my_task.task
def my_func1(a,b):
time.sleep(10)
return a+b @my_task.task
def my_func2():
time.sleep(10)
return 2 @my_task.task
def my_func3():
time.sleep(10)
return 3

命令行运行

Linux:Linux - celery worker -A s1 -l INFO
Windows:celery worker -A s1 -l INFO -P eventlet
#Windows下需要下载eventlet模块模块,否则celery4的版本不支持windows
#l:日志输出
#c:数量

s2.py

from s1 import my_func1,my_func2,my_func3
pid=my_func1.delay(10,20)
print(pid)
pid=my_func2.delay()
print(pid)
pid=my_func3.delay()
print(pid)

s3.py

from celery.result import AsyncResult
from s1 import my_task
#运行s2.py得到的pid
res=AsyncResult(id='2b36d20f-da07-42fe-b203-1e56fbaafd5e',app=my_task)
if res.successful():
print(res.get())
else:
print("任务正在进行中")

4.爬虫简单应用

在caiji.py中

from flask import Flask,request as requ,jsonify,render_template,send_file
import pymongo
import json
import time
import urllib
import requests
import re
from urllib import request
import uuid
from celery import Celery
import time #使用redis连接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼 #获取各种分类的歌曲列表
@my_task.task
def getcontent():
# content=requ.form.get("content")
# print(content)
headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"}
url="https://www.ximalaya.com/ertong/ertongwenxue"
request=urllib.request.Request(url,headers=headers)
response=urllib.request.urlopen(request) response_text=response.read().decode("utf-8")
title_id_list=re.findall('"album-title line-2 lg.+?" title="(.+?)" href="/ertong/(\d+?)/">',response_text) anthor_list=re.findall('"album-author.+?" title="(.+?)" href',response_text)
response_list=[]
i = 0
for i in range(len(title_id_list)):
response_dict={}
response_dict={
"title":title_id_list[i][0],
"id":title_id_list[i][1],
"author":anthor_list[i]
}
response_list.append(response_dict) # print("返回",response_list)
return response_list #获取music的二进制文件
@my_task.task
def getmusic(id):
print(id)
url="http://m.ximalaya.com/ertong/"+id+"/"
response=requests.get(url)
response.encoding="utf-8"
path=re.findall('"isCopyright":.+?"src":"(.+?)","albumId"',response.text)[0]
print("res",path)
d_data = requests.get(path)
get_str=str(uuid.uuid4())
print(get_str)
name="./music/"+get_str + ".mp3"
with open(name,"wb") as f:
f.write(d_data.content)
return send_file(name)
# getcontent()

在results.py中

from caiji import getcontent,getmusic
res1=getcontent.delay()
print(res1)
for i in res1.get():
res2 = getmusic.delay(i["id"])
print(res2)

5.定时任务(十秒钟后执行函数)

在s4.py中

from celery import Celery
import time
my_task=Celery("task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中代表你对任务在内部的称呼

@my_task.task
def my_func1(a,b):
return 1

在s5.py中

import datetime
import time
from s4 import my_func1

tp = time.time()
utc_time = datetime.datetime.utcfromtimestamp(tp)
add_time = datetime.timedelta(seconds=10)
utc_time = utc_time + add_time
res = my_func1.apply_async(args=(2,3),eta=utc_time)
print(res)

6.周期任务

task_one.py

from celery import Celery
import time

my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379")
@my_task.task
def my_func1():
time.sleep(10)
return "十秒钟执行的"

task_two.py

import time
from task_one import my_task
@my_task.task
def my_func2():
time.sleep(5)
return "五秒钟执行的"

s6.py

from celery import Celery
from celery.schedules import crontab celery_task = Celery("task",
broker="redis://127.0.0.1:6379",
backend="redis://127.0.0.1:6379",
include=["task_one","task_two"]) #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
celery_task.conf.beat_schedule={
"each10s_task":{
"task":"task_one.my_func1",
"schedule":10, # 每10秒钟执行一次
# "args":(10,20)
},
"each5s_task": {
"task": "task_two.my_func2",
"schedule":5, # 每5秒
# "args": (50, 60)
}, } # celery beat -A Celery_task
# celery worker -A Celery_task -l INFO -P eventlet

celery beat -A Celery_task

7.celery项目目录

在selery.py中

from celery import Celery
my_task = Celery("task",
broker="redis://127.0.0.1:6379",
backend="redis://127.0.0.1:6379",
include=["Celery_task.task_one","Celery_task.task_two"])

在task_one.py中

from Celery_task.celery import my_task

@my_task.task
def func1():
return 1 @my_task.task
def func3():
return 3

在task_two.py中

from Celery_task.celery import my_task

@my_task.task
def func2():
return 2

celery worker -A Celery_task -l INFO -P eventlet

最新文章

  1. dotnet Core Asp.net 项目搭建
  2. Sql server2012 常见异常处理
  3. js声明
  4. jdk和tomcat环境部署
  5. js中arguments,caller,callee,apply的用法小结
  6. JAVA中ArrayList用法
  7. hihoCoder 1040 矩形判断(计算几何)
  8. 第1章 游戏之乐——NIM(3)两堆石头的游戏
  9. 3.4.2内核下的I2C驱动
  10. 【转】Android开发之Bitmap的内存优化详解
  11. LCM Cardinality
  12. RandomAccessFile类进行文件加密
  13. redis字符串结构
  14. 获取网站证书的两种方法(wireshark or firefox nightly)
  15. .Net Core 部署在win10 的IIS上注意问题。
  16. Linux0.11启动过程
  17. SQLServer如果查询表相关的视图以及存储过程
  18. linux 查看并对外开放端口(防火墙拦截处理)
  19. cdnbest里站点域名不同步到节点,报400错误的一般原因
  20. jfixed使固定行列可编辑表格

热门文章

  1. Servlet接口的实现类,路径配置映射,ServletConfig对象,ServletContext对象及web工程中文件的读取
  2. DOS批处理
  3. 《Java并发编程实战》第十二章 测试并发程序 读书笔记
  4. windows 下 TensorFlow(GPU 版)的安装
  5. shell问题集合
  6. P和P1指向了O和O1两个变量(对象)的地址, 而不是O和O1的内容(对象的实际地址)——充分证明@是取变量(对象)的地址,而不是变量里面的内容,够清楚!
  7. MyEclipse和Eclipse区别与联系
  8. c#开发移动APP-Xamarin入门
  9. python3获取天气预报
  10. 常用user agent