〇、概述

1、实现内容

使用Scala编写代码,通过Flink的Source、Sink以及时间语义实现实时销量展示

2、过程

(1)导包并下载依赖

(2)创建数据源数据表并写⼊数据

(3)在Mysql数据库中创建统计结果表

(4)编写Flink计算代码

a.参考ShopMysqlSource.scala 代码,进⾏Flink Source 编写,从Mysql读取数据

b.参考GaotuShopFlinkStat.scala代码,进⾏统计逻辑的编写,进⾏FlinkSQL 查询

c.参考ShopStatMysqlSink.scala 代码,进⾏FlinkSink 编写,存⼊数据到Mysql

一、导包并下载依赖

二、创建数据源数据表并写⼊数据

参考执⾏GenerateOrders.scala 代码

package com.gaotu.flink
import org.apache.commons.lang.time.FastDateFormat import java.sql.DriverManager
import java.sql.Connection
import java.util.concurrent.TimeUnit
import scala.util.Random /**
* 向Mysql数据库中添加数据
* CREATE TABLE `g_orders` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '交易id',
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`user_id` int(11) DEFAULT NULL COMMENT '用户id',
`deal_status` varchar(20) DEFAULT NULL COMMENT '交易状态',
`stock_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`deal_amount` double DEFAULT NULL COMMENT '订单金额',
`oper_time` bigint COMMENT '处理时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;
*
*/
object GenerateOrders { def main(args: Array[String]): Unit = {
val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root"
var connection:Connection = null //商品数组
val stocks = Array("HUAWEI Mate40","Apple iphone13","Apple MacBook Pro 14"
,"ThinkBook 14p","RedmiBook Pro14","飞鹤星飞帆幼儿奶粉","爱他美 幼儿奶粉"
,"李宁运动男卫裤","小米踏步机椭圆机","欧莱雅面膜","御泥坊面膜","欧莱雅男士套装","金六福白酒"
,"牛栏山42度","茅台飞天")
val amount = Array(6569.00,6099.00,14999.00,6799.00,4899.00,275,392,199,1299.00,599,399,389,469,175,1399.00) try {
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val ps = connection.createStatement()
for (i <- 1 to 10000){
val formater = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val item = scala.util.Random.nextInt(14)
val deal_time = formater.format(System.currentTimeMillis())
val stock_name = stocks(item)
val user_id = Random.nextInt(100)
val deal_status = "有效"
val stock_cnt = Random.nextInt(20)
val deal_amount = amount(item)
val oper_time = System.currentTimeMillis()
val query = s"insert into g_orders(deal_time,stock_name,user_id,deal_status,stock_cnt,deal_amount,oper_time) values('$deal_time', '$stock_name', $user_id, '$deal_status', $stock_cnt, $deal_amount, $oper_time)"
ps.addBatch(query)
println(query) TimeUnit.SECONDS.sleep(1)
}
ps.executeBatch()
} catch {
case e => e.printStackTrace
}
connection.close()
}
}

三、在Mysql数据库中创建统计结果表

-- 统计结果表 
CREATE TABLE `g_orders_stat` (
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`order_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_amt` double DEFAULT NULL COMMENT '商品数量'
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;

四、编写Flink计算代码

1、参考ShopMysqlSource.scala 代码,进⾏Flink Source 编写,从Mysql读取数据

package com.gaotu.flink

import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.concurrent.TimeUnit /*
* Mysql Source
*/
class ShopMysqlSource extends RichSourceFunction[(Int, String, Int, Double, Long)] {
val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root" override def run(ctx: SourceContext[(Int, String, Int, Double, Long)]): Unit = {
// 1. 加载MYSQL驱动
Class.forName(driver)
// 2. 建立MYSQL链接
var connection: Connection = DriverManager.getConnection(url, username, password)
var max_id = 0
// 3. 创建PreparedStatement
val sql = s"select id,stock_name,stock_cnt,deal_amount,oper_time from plato.g_orders where id > ${max_id} ;"
// 4. 执行Sql查询
val ps: PreparedStatement = connection.prepareStatement(sql)
for(i <- 0 until 500) {
val queryRequest = ps.executeQuery()
// 5. 遍历结果
while (queryRequest.next()) {
val id = queryRequest.getInt("id")
val stock_name = queryRequest.getString("stock_name")
val stock_cnt = queryRequest.getInt("stock_cnt")
val deal_amount = queryRequest.getDouble("deal_amount")
val deal_time = queryRequest.getLong("oper_time")
ctx.collect((id, stock_name, stock_cnt, deal_amount, deal_time)) if(max_id < id){
max_id = id
}
}
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = {}
}

2、参考GaotuShopFlinkStat.scala代码,进⾏统计逻辑的编写,进⾏FlinkSQL 查询

package com.gaotu.flink

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._ /**
* 实时统计逻辑
*/
object GaotuShopFlinkStat { def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 1.创建流处理环境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 2.设置处理时间为事件
val tableEnv = TableEnvironment.getTableEnvironment(env) // 3.获取Table运行环境
val orderDataStream = env.addSource(new ShopMysqlSource) // 4.获取自定义数据源
// 5.添加水印,允许延迟2秒
val watermarkDataStream:DataStream[(Int, String, Int, Double, Long)] = orderDataStream.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[(Int, String, Int, Double, Long)]{
var currentTimestamp: Long = _
val delayTime = 2000 // 允许延迟2秒
override def getCurrentWatermark: Watermark = { // 生成一个水印时间
val watermark = new Watermark(currentTimestamp - delayTime) // 让时间窗口延迟2秒计算
watermark
}
// 从订单中获取对应的时间戳
override def extractTimestamp(element: (Int, String, Int, Double, Long), previousElementTimestamp: Long): Long = {
val oper_time = element._5 // 获取订单的事件时间(下单时的时间)
currentTimestamp = Math.max(currentTimestamp, oper_time) // 当前时间与事件时间对比,获取最大值
currentTimestamp
}
} ) // 6.注册LinkSQL数据表
tableEnv.registerDataStream("g_order", watermarkDataStream, 'id,'stock_name,'stock_cnt,'deal_amount,'oper_time,'createTime.rowtime)
// 7.编写SQL语句进行统计
val sql =
"""
| select
| stock_name
| ,count(id) as order_cnt
| ,sum(stock_cnt) as stock_sales_cnt
| ,sum(deal_amount) as stock_sales_amt
| from g_order
| group by
| tumble(createTime, interval '5' second ),
| stock_name
|""".stripMargin
// 8.执行sql语句
val table: Table = tableEnv.sqlQuery(sql)
table.printSchema() // 9.将SQL的执行结果写入到Mysql中
tableEnv.toAppendStream[(String, Long, Int, Double)](table).addSink(new ShopStatMysqlSink)
// 10.执行任务
env.execute()
}
}

3、参考ShopStatMysqlSink.scala 代码,进⾏FlinkSink 编写,存⼊数据到Mysql

package com.gaotu.flink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import java.sql.{Connection, DriverManager, PreparedStatement} /**
* Mysql Sink
*
* CREATE TABLE `g_orders_stat` (
`stock_name` varchar(20)DEFAULT NULL COMMENT '商品名称',
`order_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_cnt` int(11) DEFAULT NULL COMMENT '商品数量',
`stock_sales_amt` double DEFAULT NULL COMMENT '商品数量'
) ENGINE=InnoDB AUTO_INCREMENT=120 DEFAULT CHARSET=utf8;
*
*/
class ShopStatMysqlSink extends RichSinkFunction[(String, Long, Int, Double)] { val driver = "com.mysql.cj.jdbc.Driver"
val url = "jdbc:mysql://localhost/plato"
val username = "root"
val password = "root"
var connection: Connection = null
var ps: PreparedStatement = null override def open(parameters: Configuration): Unit = {
Class.forName(driver) // 1. 加载驱动
connection = DriverManager.getConnection(url, username, password) // 2. 创建连接
val sql = "insert into g_orders_stat (stock_name,order_cnt,stock_sales_cnt,stock_sales_amt) values (?,?,?,?);" // 3. 获得执行语句
ps = connection.prepareStatement(sql)
}
override def invoke(value: (String, Long, Int, Double)): Unit = {// 4. 插入数据
try {
ps.setString(1, value._1)
ps.setLong(2, value._2)
ps.setInt(3, value._3)
ps.setDouble(4, value._4)
ps.execute()
} catch {
case e: Exception => println(e.getMessage)
}
}
// 5. 关闭连接操作
override def close(): Unit = {
if(connection != null) {
connection.close()
}
if(ps != null) {
ps.close()
}
}
}

最新文章

  1. 《CLR.via.C#第三版》第二部分第4,5章节读书笔记(二)
  2. 【代码笔记】iOS-浮动的云
  3. iOS-设置UIPageControl 显示图片
  4. hessian接口参数,子类与父类不能有同名字段解决方法
  5. Dagger学习笔记
  6. Ajax解决缓存的5种方法
  7. concurrent实用类
  8. mfc---右键蹦出菜单
  9. IDL 创建数组
  10. ubuntu 使用sudo apt-get update 出现 被配置多次导致无法升级错误解决方法
  11. 织梦dedecms如何去除版权中的Power by DedeCms
  12. hdu 2044 递推
  13. 五、LCD屏填充纯色
  14. Python——with语句、context manager类型和contextlib库
  15. 20145127《java程序设计》第二次实验
  16. Session、Cookie简单理解
  17. golang socket 实现分析(一)
  18. ubuntu开启sshd服务(转)
  19. artemplate模板
  20. GO 功能注释

热门文章

  1. Linux下从零开始创建lvm虚拟磁盘阵列+脚本化解决方案
  2. windows bat文件设置环境变量
  3. 2_爬豆瓣电影_ajax动态加载
  4. 手把手教你玩转 Gitea|使用 Docker 安装 Gitea
  5. 2022.9.17 Java第二次课总结
  6. 数据结构之单链表(基于Java实现)
  7. Bing 广告平台迁移到 .net6
  8. BigDecimal 用法总结
  9. JAVA员工名字 年龄 工资 工种
  10. 部署 LNMP(源码安装版本)shell脚本