time^B1493534543940^Aid^B02CD^Aasr^B叫爸爸^Anlp^B{"domain":"com.abc.system.chat","intent":"chat","slots":{"tts":"爸爸","asr":"叫爸爸"},"voice":"叫爸爸","confidence":1.0,"cloud":false,"posStart":0,"posEnd":0}^Adomain^Bcom.abc.chat^Aintent^Bchat python版spark代码如下 from operator import add
import time def getInfo(str, sep1, sep2):
thedate = 'today'
sn = 'default'
if str is not None:
fields = str.split(sep1)
if len(fields) > 1:
for field in fields:
if field is not None:
kv = field.split(sep2)
if len(kv) == 2:
if kv[0] == 'time':
timestamp = int(kv[1]) / 1000
time_local = time.localtime(timestamp)
thedate = time.strftime("%Y-%m-%d", time_local)
if kv[0] == 'id':
sn = kv[1]
if thedate is not None and sn is not None:
res = thedate + "|" + sn
return res rdd1 = sc.textFile("/Users/zhangzhenghai/example.log")
rdd2 = rdd1.map(lambda x: (getInfo(x,'\u0001','\u0002'),1))
rdd3 = rdd2.reduceByKey(add)
rdd4 = rdd3.map(lambda x: (x[1],x[0]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x:(x[1],x[0]))



