# -*- coding: utf-8 -*-
# @Time : 2019-11-08 14:04
# @Author : cxa
# @File : mongohelper.py
# @Software: PyCharm
import asyncio
from helper.logger_helper import logger as storage
import datetime
from motor.motor_asyncio import AsyncIOMotorClient
from collections import Iterable try:
import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass db_configs = {
'host': '127.0.0.1',
'port': '27017',
'db_name': 'spider',
'user': ''
} class MotorOperation:
def __init__(self):
self.__dict__.update(**db_configs)
if self.user:
self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}?authSource={self.db_name}"
else:
self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}"
self.client = AsyncIOMotorClient(self.motor_uri)
self.mb = self.client[self.db_name] # async def get_use_list(self):
# fs = await aiofiles.open("namelist.txt", "r", encoding="utf-8")
# data = (i.replace("\n", "") async for i in fs)
# return data async def save_data_with_status(self, items, col="dianping_seed_data"):
tasks = []
for item in items:
data = dict()
data["update_time"] = datetime.datetime.now()
data["create_time"] = datetime.datetime.now()
data["status"] = 0 # 0初始
data.update(item)
tasks.append(data)
print("tasks", tasks)
await self.mb[col].insert_many(tasks) async def add_index(self, col="dianping_seed_data"):
# 添加索引
await self.mb[col].create_index('url') async def save_data(self, items, col="dianping_seed_data", key="url"):
# storage.info(f"此时的items:{items}")
if isinstance(items, Iterable):
for item in items:
try:
item[key] = item[key]
await self.mb[col].update_one({
key: item.get(key)},
{'$set': item},
upsert=True)
except Exception as e:
storage.error(f"数据插入出错:{e.args}此时的item是:{item}")
elif isinstance(items, dict):
try:
items[key] = items[key]
await self.mb[col].update_one({
key: items.get(key)},
{'$set': items},
upsert=True)
except Exception as e:
storage.error(f"数据插入出错:{e.args}此时的item是:{items}") async def change_status(self, condition, col="dianping_seed_data", status_code=1):
# status_code 0:初始,1:开始下载,2下载完了
try:
item = dict()
item["status"] = status_code
item["update_time"] = datetime.datetime.now()
# storage.info(f"修改状态,此时的数据是:{item}")
await self.mb[col].update_one(condition, {'$set': item})
except Exception as e:
storage.error(f"修改状态出错:{e.args}此时的数据是:{item}") async def get_detail_data(self, col="dianping_seed_data"):
data = self.mb[col].find({'status': 0})
async for item in data:
print(item)
return data async def reset_status(self, col="dianping_seed_data"):
await self.mb[col].update_many({'status': 1}, {'$set': {"status": 0}}) async def reset_all_status(self, col="dianping_seed_data"):
await self.mb[col].update_many({}, {'$set': {"status": 0}}) async def find_data(self, col="dianping_seed_data"):
""" 获取状态为0的数据,作为爬取对象。
:return:AsyncGeneratorType
"""
cursor = self.mb[col].find({'status': 0}, {"_id": 0})
async_gen = (item async for item in cursor)
return async_gen async def delete_old_data(self, col="dianping_seed_data"):
try:
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
result = await self.mb[col].delete_many({'update_time': {'$lte': yesterday}})
print(f"成功删除{result.deleted_count}条")
except Exception as e:
print("删除错误", e.args) if __name__ == '__main__':
m = MotorOperation()
loop = asyncio.get_event_loop()
loop.run_until_complete(m.delete_old_data(col="dianping_seed_data"))

最新文章

  1. 【工匠大道】一些Vim(Linux)不常见但很逼格的命令(不断更新中)
  2. MySQL 主从配置
  3. 第七天 面向对象进阶与socket编程
  4. Jmeter—5 关联 响应数据传递-正则表达式提取器
  5. Guaranteeing message processing —— 可靠的消息处理
  6. RobotFramework 安装配置(一)
  7. mysql datetime设置now()无效,直接用程序设置默认值比较好
  8. Facebook内部分享:25个高效工作的小技巧
  9. hairline!ios实现边框0.5px
  10. $_GLOBALS超全局数组和global定义的全局变量区别?
  11. nginx随着passenger构造ruby on rails页
  12. Nginx+Python+uwsgi+Django的web开发环境安装及配置
  13. requireJS对文件合并与压缩(二)
  14. BZOJ1093 ZJOI2007最大半连通子图(缩点+dp)
  15. 解决winscp中普通用户无法上传、删除、移动文件
  16. HTML5超酷秒表动画 可暂停和重置秒表
  17. [UFLDL] Linear Regression & Classification
  18. 缓存中使用的ReentrantReadWriteLock锁
  19. Redis哈希
  20. NET Core应用框架之BitAdminCore框架应用篇系列

热门文章

  1. SQL语法基础之ALTER语句
  2. 【JQ】jq动态绑定事件.on()、解绑事件off()
  3. 【JS】js将 /Date(1518943415760+0800)/ 转换为2018-2-18 16:43:35格式
  4. 如何优雅地使用Sublime Text3
  5. zookeeper安装(单机版)
  6. Kafka权威指南 读书笔记之(一)初识Kafka
  7. Jsp的四大作用域与九大对象
  8. npm离线安装插件
  9. java代码实现ftp服务器的文件上传和下载
  10. 【五】服务熔断、降级 —— Hystrix(豪猪)