【大数据课程】高途课程实践-Day03:Scala实现商品实时销售统计
2024-10-20 03:29:22
〇、概述
1、实现内容
使用Scala编写代码,通过Flink的Source、Sink以及时间语义实现实时销量展示
2、过程
(1)导包并下载依赖
(2)创建数据源数据表并写⼊数据
(3)在Mysql数据库中创建统计结果表
(4)编写Flink计算代码
a.参考ShopMysqlSource.scala 代码,进⾏Flink Source 编写,从Mysql读取数据
b.参考GaotuShopFlinkStat.scala代码,进⾏统计逻辑的编写,进⾏FlinkSQL 查询
c.参考ShopStatMysqlSink.scala 代码,进⾏FlinkSink 编写,存⼊数据到Mysql
一、导包并下载依赖
二、创建数据源数据表并写⼊数据
参考执⾏GenerateOrders.scala 代码
package com.gaotu.flink
import org.apache.commons.lang.time.FastDateFormat import java.sql.DriverManager
import java.sql.Connection
import java.util.concurrent.TimeUnit
import scala.util.Random /**
* 向Mysql数据库中添加数据
* CREATE TABLE `g_orders` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '交易id',
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`user_id` int(11) DEFAULT NULL COMMENT '用户id',
`deal_status` varchar(20) DEFAULT NULL COMMENT '交易状态',
`stock_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`deal_amount` double DEFAULT NULL COMMENT '订单金额',
`oper_time` bigint COMMENT '处理时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;
*
*/
object GenerateOrders { def main(args: Array[String]): Unit = {
val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root"
var connection:Connection = null //商品数组
val stocks = Array("HUAWEI Mate40","Apple iphone13","Apple MacBook Pro 14"
,"ThinkBook 14p","RedmiBook Pro14","飞鹤星飞帆幼儿奶粉","爱他美 幼儿奶粉"
,"李宁运动男卫裤","小米踏步机椭圆机","欧莱雅面膜","御泥坊面膜","欧莱雅男士套装","金六福白酒"
,"牛栏山42度","茅台飞天")
val amount = Array(6569.00,6099.00,14999.00,6799.00,4899.00,275,392,199,1299.00,599,399,389,469,175,1399.00) try {
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val ps = connection.createStatement()
for (i <- 1 to 10000){
val formater = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val item = scala.util.Random.nextInt(14)
val deal_time = formater.format(System.currentTimeMillis())
val stock_name = stocks(item)
val user_id = Random.nextInt(100)
val deal_status = "有效"
val stock_cnt = Random.nextInt(20)
val deal_amount = amount(item)
val oper_time = System.currentTimeMillis()
val query = s"insert into g_orders(deal_time,stock_name,user_id,deal_status,stock_cnt,deal_amount,oper_time) values('$deal_time', '$stock_name', $user_id, '$deal_status', $stock_cnt, $deal_amount, $oper_time)"
ps.addBatch(query)
println(query) TimeUnit.SECONDS.sleep(1)
}
ps.executeBatch()
} catch {
case e => e.printStackTrace
}
connection.close()
}
}
三、在Mysql数据库中创建统计结果表
-- 统计结果表
CREATE TABLE `g_orders_stat` (
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`order_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_amt` double DEFAULT NULL COMMENT '商品数量'
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;
四、编写Flink计算代码
1、参考ShopMysqlSource.scala 代码,进⾏Flink Source 编写,从Mysql读取数据
package com.gaotu.flink import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.concurrent.TimeUnit /*
* Mysql Source
*/
class ShopMysqlSource extends RichSourceFunction[(Int, String, Int, Double, Long)] {
val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root" override def run(ctx: SourceContext[(Int, String, Int, Double, Long)]): Unit = {
// 1. 加载MYSQL驱动
Class.forName(driver)
// 2. 建立MYSQL链接
var connection: Connection = DriverManager.getConnection(url, username, password)
var max_id = 0
// 3. 创建PreparedStatement
val sql = s"select id,stock_name,stock_cnt,deal_amount,oper_time from plato.g_orders where id > ${max_id} ;"
// 4. 执行Sql查询
val ps: PreparedStatement = connection.prepareStatement(sql)
for(i <- 0 until 500) {
val queryRequest = ps.executeQuery()
// 5. 遍历结果
while (queryRequest.next()) {
val id = queryRequest.getInt("id")
val stock_name = queryRequest.getString("stock_name")
val stock_cnt = queryRequest.getInt("stock_cnt")
val deal_amount = queryRequest.getDouble("deal_amount")
val deal_time = queryRequest.getLong("oper_time")
ctx.collect((id, stock_name, stock_cnt, deal_amount, deal_time)) if(max_id < id){
max_id = id
}
}
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = {}
}
2、参考GaotuShopFlinkStat.scala代码,进⾏统计逻辑的编写,进⾏FlinkSQL 查询
package com.gaotu.flink import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._ /**
* 实时统计逻辑
*/
object GaotuShopFlinkStat { def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 1.创建流处理环境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 2.设置处理时间为事件
val tableEnv = TableEnvironment.getTableEnvironment(env) // 3.获取Table运行环境
val orderDataStream = env.addSource(new ShopMysqlSource) // 4.获取自定义数据源
// 5.添加水印,允许延迟2秒
val watermarkDataStream:DataStream[(Int, String, Int, Double, Long)] = orderDataStream.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[(Int, String, Int, Double, Long)]{
var currentTimestamp: Long = _
val delayTime = 2000 // 允许延迟2秒
override def getCurrentWatermark: Watermark = { // 生成一个水印时间
val watermark = new Watermark(currentTimestamp - delayTime) // 让时间窗口延迟2秒计算
watermark
}
// 从订单中获取对应的时间戳
override def extractTimestamp(element: (Int, String, Int, Double, Long), previousElementTimestamp: Long): Long = {
val oper_time = element._5 // 获取订单的事件时间(下单时的时间)
currentTimestamp = Math.max(currentTimestamp, oper_time) // 当前时间与事件时间对比,获取最大值
currentTimestamp
}
} ) // 6.注册LinkSQL数据表
tableEnv.registerDataStream("g_order", watermarkDataStream, 'id,'stock_name,'stock_cnt,'deal_amount,'oper_time,'createTime.rowtime)
// 7.编写SQL语句进行统计
val sql =
"""
| select
| stock_name
| ,count(id) as order_cnt
| ,sum(stock_cnt) as stock_sales_cnt
| ,sum(deal_amount) as stock_sales_amt
| from g_order
| group by
| tumble(createTime, interval '5' second ),
| stock_name
|""".stripMargin
// 8.执行sql语句
val table: Table = tableEnv.sqlQuery(sql)
table.printSchema() // 9.将SQL的执行结果写入到Mysql中
tableEnv.toAppendStream[(String, Long, Int, Double)](table).addSink(new ShopStatMysqlSink)
// 10.执行任务
env.execute()
}
}
3、参考ShopStatMysqlSink.scala 代码,进⾏FlinkSink 编写,存⼊数据到Mysql
package com.gaotu.flink import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import java.sql.{Connection, DriverManager, PreparedStatement} /**
* Mysql Sink
*
* CREATE TABLE `g_orders_stat` (
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`order_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_amt` double DEFAULT NULL COMMENT '商品数量'
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;
*
*/
class ShopStatMysqlSink extends RichSinkFunction[(String, Long, Int, Double)] { val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root"
var connection: Connection = null
var ps: PreparedStatement = null override def open(parameters: Configuration): Unit = {
Class.forName(driver) // 1. 加载驱动
connection = DriverManager.getConnection(url, username, password) // 2. 创建连接
val sql = "insert into g_orders_stat (stock_name,order_cnt,stock_sales_cnt,stock_sales_amt) values (?,?,?,?);" // 3. 获得执行语句
ps = connection.prepareStatement(sql)
}
override def invoke(value: (String, Long, Int, Double)): Unit = {// 4. 插入数据
try {
ps.setString(1, value._1)
ps.setLong(2, value._2)
ps.setInt(3, value._3)
ps.setDouble(4, value._4)
ps.execute()
} catch {
case e: Exception => println(e.getMessage)
}
}
// 5. 关闭连接操作
override def close(): Unit = {
if(connection != null) {
connection.close()
}
if(ps != null) {
ps.close()
}
}
}
最新文章
- 《CLR.via.C#第三版》第二部分第4,5章节读书笔记(二)
- 【代码笔记】iOS-浮动的云
- iOS-设置UIPageControl 显示图片
- hessian接口参数,子类与父类不能有同名字段解决方法
- Dagger学习笔记
- Ajax解决缓存的5种方法
- concurrent实用类
- mfc---右键蹦出菜单
- IDL 创建数组
- ubuntu 使用sudo apt-get update 出现 被配置多次导致无法升级错误解决方法
- 织梦dedecms如何去除版权中的Power by DedeCms
- hdu 2044 递推
- 五、LCD屏填充纯色
- Python——with语句、context manager类型和contextlib库
- 20145127《java程序设计》第二次实验
- Session、Cookie简单理解
- golang socket 实现分析(一)
- ubuntu开启sshd服务(转)
- artemplate模板
- GO 功能注释