大数据开发-Spark-Streaming处理数据到mysql
2024-08-28 03:10:49
前面一篇讲到streamin读取kafka数据加工处理后写到kafka数据,大数据开发-Spark-开发Streaming处理数据 && 写入Kafka是针对比如推荐领域,实时标签等场景对于实时处理结果放到mysql也是一种常用方式,假设一些车辆调度的地理位置信息处理后写入到mysql
1.说明
数据表如下:
create database test;
use test;
DROP TABLE IF EXISTS car_gps;
CREATE TABLE IF NOT EXISTS car_gps(
deployNum VARCHAR(30) COMMENT '调度编号',
plateNum VARCHAR(10) COMMENT '车牌号',
timeStr VARCHAR(20) COMMENT '时间戳',
lng VARCHAR(20) COMMENT '经度',
lat VARCHAR(20) COMMENT '纬度',
dbtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据入库时间',
PRIMARY KEY(deployNum, plateNum, timeStr))
2.编写程序
首先引入mysql的驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2.1 jdbc写入mysql
package com.hoult.Streaming.work
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import com.hoult.structed.bean.BusInfo
import org.apache.spark.sql.ForeachWriter
class JdbcHelper extends ForeachWriter[BusInfo] {
var conn: Connection = _
var statement: PreparedStatement = _
override def open(partitionId: Long, epochId: Long): Boolean = {
if (conn == null) {
conn = JdbcHelper.openConnection
}
true
}
override def process(value: BusInfo): Unit = {
//把数据写入mysql表中
val arr: Array[String] = value.lglat.split("_")
val sql = "insert into car_gps(deployNum,plateNum,timeStr,lng,lat) values(?,?,?,?,?)"
statement = conn.prepareStatement(sql)
statement.setString(1, value.deployNum)
statement.setString(2, value.plateNum)
statement.setString(3, value.timeStr)
statement.setString(4, arr(0))
statement.setString(5, arr(1))
statement.executeUpdate()
}
override def close(errorOrNull: Throwable): Unit = {
if (null != conn) conn.close()
if (null != statement) statement.close()
}
}
object JdbcHelper {
var conn: Connection = _
val url = "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf8"
val username = "root"
val password = "123456"
def openConnection: Connection = {
if (null == conn || conn.isClosed) {
val p = new Properties
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, username, password)
}
conn
}
}
2.2 通过foreach来写入mysql
package com.hoult.Streaming.work
import com.hoult.structed.bean.BusInfo
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
object KafkaToJdbc {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
//1 获取sparksession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(KafkaToJdbc.getClass.getName)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
//2 定义读取kafka数据源
val kafkaDf: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "linux121:9092")
.option("subscribe", "test_bus_info")
.load()
//3 处理数据
val kafkaValDf: DataFrame = kafkaDf.selectExpr("CAST(value AS STRING)")
//转为ds
val kafkaDs: Dataset[String] = kafkaValDf.as[String]
//解析出经纬度数据,写入redis
//封装为一个case class方便后续获取指定字段的数据
val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_)).filter(_ != null)
//将数据写入MySQL表
busInfoDs.writeStream
.foreach(new JdbcHelper)
.outputMode("append")
.start()
.awaitTermination()
}
}
2.4 创建topic和从消费者端写入数据
kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic test_bus_info --partitions 2 --replication-factor 1
kafka-console-producer.sh --broker-list linux121:9092 --topic test_bus_info
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注
最新文章
- spring mvc学习笔记一:hello world
- [LeetCode_1] twoSum
- 在 SharePoint Server 2013 中配置建议和使用率事件类型
- BZOJ-4010 菜肴制作 贪心+堆+(拓扑图拓扑序)
- 设计模式-中介者模式(Mediator)
- 使用Json让Java和C#沟通的方法
- 解决 spring mvc 3.+ 结合 hibernate3.+ 使用<;tx:annotation-driven>;声明式事务无法提交的问题
- 【Python@Thread】threading模块
- S3C2440串口及其中断系统详解
- 细品 - 逻辑回归(LR)
- struts文件异常Included file cannot be found
- oracle 内存不足处理
- CLASS 类 __getattr__
- ****** 四十九 ******、软设笔记【UML分析和意义】-建模的意义,UML的特点、结构,用例图
- isNAN的使用方法及介绍
- 662. Maximum Width of Binary Tree二叉树的最大宽度
- SQL Server2008 R2 安装失败后的解决办法
- js判断浏览器语言实现网站国际化
- LoadRunner常用知识点-----LoadRunner日志输出
- Oracle约束的启用和停用
热门文章
- Pytest(7)自定义用例顺序pytest-ordering
- CF-311B Cats Transport(斜率优化DP)
- HDU6430 Problem E. TeaTree【dsu on tree】
- 【洛谷 p3371】模板-单源最短路径(图论)
- 【uva 714】Copying Books(算法效率--二分+贪心)
- P1108 低价购买(DP)
- AcWing 239.奇偶游戏 (带权并查集/种类并查集)
- BIM轻量化——浏览器展示
- shapefile 输出的地理处理注意事项(转载)
- 51nod 1073约瑟夫环 递归公式法