import datetime
import sys
import oss2
from itertools import islice
import pandas as pd
import re
import json
from pandas.tseries.offsets import Day
from multiprocessing import Process, JoinableQueue, cpu_count, Manager
import time def mkbuck(bk):
auth = oss2.Auth(username, password)
bucket = oss2.Bucket(auth, address, bk)
return bucket #获取前天最后一小时的paths
def getbflastpt(bucket, bfyespattern):
bfpamax = []
for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize):
c = bf.key
if c[-1:] != '/':
bfpamax.append(int(c.split('/')[4]))
last = pd.Series(bfpamax).unique().max()
if last < 10:
bflastpt = bfyespattern + '/0' + str(last)
else:
bflastpt = bfyespattern + '/' + str(last)
return bflastpt #获取当天第一个小时的paths
def getnowfirstpt(bucket, nowpattern):
bfpamin = []
for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize):
c = bf.key
if c[-1:] != '/':
bfpamin.append(int(c.split('/')[4]))
first = pd.Series(bfpamin).unique().min()
if first < 10:
nowfirstpt = nowpattern + '/0' + str(first)
else:
nowfirstpt = nowpattern + '/' + str(first)
return nowfirstpt #获取所有的昨日paths,并合并得到完全的paths和数量
def getfullnum(bk, bfyespattern, nowpattern, yespattern):
lists = []
bucket = mkbuck(bk)
bfyespattern = getbflastpt(bucket, bfyespattern)
nowpattern = getnowfirstpt(bucket, nowpattern)
timelist = (s for s in (bfyespattern, yespattern, nowpattern))
for pter in timelist:
for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize):
c = bf.key
lists.append(c)
return lists, len(lists) #以下为进程间通信,即生产者、消费者模型
def getfull(bk, bfyespattern, nowpattern, yespattern, q):
lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern)
for c in lists:
q.put(c)
q.join() def consumer(bk, q, d):
bucket = mkbuck(bk)
repattern2 = re.compile('{.*"adadji",.*}')
while True:
js = []
ress = q.get()
if ress[-1:] != '/':
remote_data = bucket.get_object(ress).read().decode('utf-8')
aa = (d for d in repattern2.findall(remote_data))
for a in aa:
temdic = json.loads(a)
if (starttime <= temdic['created_at']) and (temdic['created_at'] <= endtime):
js.append(temdic)
df = pd.DataFrame(js, columns=['dd','cc'])
d[ress] = df##d为通过主进程Manager共享变量将数据取出
# print(ress)
q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了 if __name__ == '__main__':
s1 = time.time()
now_time = datetime.datetime.now() # 获取当前时间
bfyes_time = (now_time - 2 * Day()).strftime('%Y/%m/%d')
yes_time = (now_time - 1 * Day()).strftime('%Y/%m/%d')
yesdate = (now_time - 1 * Day()).strftime('%Y-%m-%d')
yesdate1 = (now_time - 1 * Day()).strftime('%Y%m%d')
endtime = (now_time - 1 * Day()).strftime('%Y-%m-%d 23:59:59')
starttime = (now_time - 1 * Day()).strftime('%Y-%m-%d 00:00:00')
nowdate = now_time.strftime('%Y/%m/%d') bk = 'xxx'
bfyespattern = '%s/%s' % (bk, bfyes_time)
yespattern = '%s/%s' % (bk, yes_time)
nowpattern = '%s/%s' % (bk, nowdate) q = JoinableQueue(cpu_count())
m = Manager()
d = m.dict() ##创建进程间的共享内存字典,方便各个进程处理好的数据
p1 = Process(target=getfull, args=('xx', bfyespattern, nowpattern, yespattern, q))
#####生成consumer多进程
cc = []
for c in range(cpu_count() - 1):
c1 = Process(target=consumer, args=('xx', q, d))
cc.append(c1) p_l = [p1]
for c in cc:
c.daemon = True
p_l.append(c) for p in p_l:
p.start()
p1.join()
d = d.values()
df1 = pd.concat(d, ignore_index=True)
df1.sort_values('created_at', inplace=True)
print(time.time() - s1)
print('=' * 20)
print(df1)

  说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改

最新文章

  1. [tem]线段树(白书版)
  2. Windows8.1画热度图 - 坑
  3. 将Microsoft Ajax Minifier集成到VS2013对JS、CSS进行编译时压缩
  4. Android 用户界面---拖放(Drag and Drop)(二)
  5. jquery1.9学习笔记 之层级选择器(二)
  6. (简单) POJ 3368 Frequent values,RMQ。
  7. Bear and Three Balls
  8. 并发库应用之十二 &amp; 常用集合问题汇总
  9. 微信内分享第三方H5链接无法使用内置浏览器打开的解决方案
  10. eureka相关异常
  11. div中的相对定位与绝对定位
  12. .Net文件压缩
  13. 使用Jackson解析首字母大写的json字符串
  14. ios 学习路线总结
  15. 今天就整一个bug了
  16. 整合elk(2)(十三)
  17. Alpha 任务状态总览(持续更新)
  18. centos 7 jenkins 部署
  19. 2018 Arab Collegiate Programming Contest (ACPC 2018) H - Hawawshi Decryption 数学 + BSGS
  20. 面试必会函数源代码 strcpy/memcpy/atoi/kmp/quicksort

热门文章

  1. SDUT 2142 数据结构实验之图论二:基于邻接表的广度优先搜索遍历
  2. HUST高级软件工程--测试管理工具实践--Day2
  3. Linux上jmeter-server启动失败
  4. CentOS6.5上Zabbix3.0的RPM安装【二】-汉化
  5. 关于执行memcached报错问题
  6. 小程序开发笔记【二】,抽奖结果json数据拼装bug解决
  7. [SinGuLaRiTy] NOIP模拟赛(TSY)-Day 1
  8. [Django笔记] 从已有的数据库构建应用
  9. 浅识J2EE十三个规范
  10. THINKPHP 框架的模板技术