场景

现在餐厅老板已经不满足仅仅统计历史用户消费金额总数了,他想知道每个用户半年,每个月,每天,或者一小时消费的总额,来店消费的次数以及平均金额。

给出的例子计算的是每5秒,每30秒,每1分钟的用户消费金额,消费次数,平均消费。

数据格式

{"user":"zhangsan","payment":8}
{"user":"wangwu","payment":7}
....

制作kafka输入数据

与我上篇文章相同

参考这里

处理流程

package StreamingTest

/**
* Created by liangshiwei on 15/9/9.
*/
import net.liftweb.json._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext} object WindowsFunction { //利用用户消费金额总和计算结果以及用户消费次数统计计算结果计算平均消费金额
def avgFunction(sum:DStream[(String,Double)],count:DStream[(String,Int)]): DStream[(String,Double)] = {
val payment = sum.join(count).map(r => {
val user = r._1
val sum = r._2._1
val count = r._2._2
(user,sum/count)
})
payment
} def main (args: Array[String]) { def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5)) val zkQuorum = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181"
val consumerGroupName = "user_payment"
val kafkaTopic = "user_payment"
val kafkaThreadNum = 1 val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap val user_payment = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{
parse(x._2)
}) //计算每5s每个用户的消费总和
val paymentSum = user_payment.map(jsonLine =>{
implicit val formats = DefaultFormats
val user = (jsonLine \ "user").extract[String]
val payment = (jsonLine \ "payment").extract[String]
(user,payment.toDouble)
}).reduceByKey(_+_) //输出结果
paymentSum.print() //计算每5s每个用户的消费次数
val paymentCount = user_payment.map(jsonLine =>{
implicit val formats = DefaultFormats
val user = (jsonLine \ "user").extract[String]
(user,1)
}).reduceByKey(_+_) // paymentCount.print() //计算每5s每个用户平均的消费金额
val paymentAvg = avgFunction(paymentSum,paymentCount)
// paymentAvg.print() //窗口操作,在其中计算不同时间段的结果,入库的话根据使用场景选择吧
def windowsFunction() {
//每5秒计算最后30秒每个用户消费金额
val windowSum_30 = paymentSum.reduceByKeyAndWindow((a: Double, b: Double) => (a + b),_-_, Seconds(30), Seconds(5))
// windowSum_30.print() //每5秒计算最后30秒每个用户消费次数
val windowCount_30 = paymentCount.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),_-_, Seconds(30), Seconds(5))
// windowCount_30.print() //每5秒计算最后30秒每个用户平均消费
val windowAvg_30 = avgFunction(windowSum_30,windowCount_30)
// windowAvg_30.print() //每5秒计算最后60秒每个用户消费金额
val windowSum_60 = windowSum_30.reduceByKeyAndWindow((a:Double,b:Double)=>(a+b),_-_,Seconds(10),Seconds(5))
// windowSum_60.print() //每5秒计算最后60秒每个用户消费次数
val windowCount_60 = windowCount_30.reduceByKeyAndWindow((a:Int,b:Int) => (a+b),_-_,Seconds(10),Seconds(5))
// windowCount_60.print() //每5秒计算最后60秒每个用户平均消费
val windowAvg_60 = avgFunction(windowSum_60,windowCount_60)
// windowAvg_60.print
} windowsFunction() ssc
} val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _) context.start()
context.awaitTermination()
}
}

结果节选

实际效果你们自己去体验吧

//-----------消费总额-------
(zhangsan,146.0)
(lisi,79.0)
(wangwu,99.0)
(zhaoliu,115.0) //-----------消费次数-------
(zhangsan,32)
(lisi,18)
(wangwu,30)
(zhaoliu,24) //-----------平均消费-------
(zhangsan,4.5625)
(lisi,4.388888888888889)
(wangwu,3.3)
(zhaoliu,4.791666666666667)

最新文章

  1. 使用Jackson解析Json示例
  2. 02-Swift初体验
  3. UISegment属性
  4. Bag Problem
  5. iOS 非ARC基本内存管理系列 2-多对象内存管理(2)
  6. B. Fixed Points
  7. 一般处理程序中使用Session出现未将对象引用设置到对象的实例
  8. poj1850
  9. Linux方面收藏的一点儿资料
  10. Hoffmann树
  11. ubuntu 修改root密码
  12. incallui中如何查询联系人数据
  13. 禁止root远程登录
  14. SSH实现无密码验证登录
  15. c++ 指针总结 函数参数指针调用和堆栈内存的分配原理
  16. 【ASP.NET】 Config Error: This configuration section cannot be used at this path.
  17. SpringBoot的学习【3.HelloWorld配置细节】
  18. ubuntu 菜单栏和终端都消失了,鼠标也成了一个× 解决办法!!!
  19. 异步方法(promise版)出错自调用
  20. 向集合中添加Person类型并对其排序

热门文章

  1. C# DateTime类型和时间戳 互相转换
  2. php日期时间函数 整理
  3. Nodejs路由之间的数据传递
  4. 小米1plus MIUI RadioButton的问题
  5. Asp.net MVC23 使用Areas功能的常见错误
  6. 【C++】C++求vector中的最大最小值
  7. JMeter学习-015-JMeter 断言之-Bean Shell Assertion
  8. Finally的执行时机
  9. Exception not a valid month
  10. ps、grep和kill联合使用杀掉进程(转)