一、JDBC数据源案例

1、概述

Spark SQL支持使用JDBC从关系型数据库(比如MySQL)中读取数据。读取的数据,依然由DataFrame表示,可以很方便地使用Spark Core提供的各种算子进行处理。

这里有一个经验之谈,实际上用Spark SQL处理JDBC中的数据是非常有用的。比如说,你的MySQL业务数据库中,有大量的数据,比如1000万,然后,你现在需要编写一个程序,
对线上的脏数据某种复杂业务逻辑的处理,甚至复杂到可能涉及到要用Spark SQL反复查询Hive中的数据,来进行关联处理。 那么此时,用Spark SQL来通过JDBC数据源,加载MySQL中的数据,然后通过各种算子进行处理,是最好的选择。因为Spark是分布式的计算框架,对于1000万数据,肯定是分布式处理的。
而如果你自己手工编写一个Java程序,那么不好意思,你只能分批次处理了,先处理2万条,再处理2万条,可能运行完你的Java程序,已经是几天以后的事情了。 Java版本
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://spark1:3306/testdb");
options.put("dbtable", "students");
DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); Scala版本
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://spark1:3306/testdb",
"dbtable" -> "students")).load() 案例:查询分数大于80分的学生信息

#授权表权限
grant all on testdb.* to ''@'spark1' with grant option;
flush privileges;

2、准备数据

mysql> create database testdb;
mysql> use testdb; mysql> create table student_infos(name varchar(20), age integer); mysql> create table student_scores(name varchar(20), score integer); mysql> insert into student_infos values('leo', 18),('marry', 17),('jack', 19); mysql> insert into student_scores values('leo', 88),('marry', 99),('jack', 60); mysql> create table good_student_infos(name varchar(20), age integer, score integer);

3、java案例实现

package cn.spark.study.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType; import scala.Tuple2; /**
* JDBC数据源
* @author Administrator
*
*/
public class JDBCDataSource { public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JDBCDataSource");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc); // 总结一下
// jdbc数据源
// 首先,是通过SQLContext的read系列方法,将mysql中的数据加载为DataFrame
// 然后可以将DataFrame转换为RDD,使用Spark Core提供的各种算子进行操作
// 最后可以将得到的数据结果,通过foreach()算子,写入mysql、hbase、redis等等db / cache中 // 分别将mysql中两张表的数据加载为DataFrame
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://spark1:3306/testdb");
options.put("dbtable", "student_infos"); DataFrame studentInfosDF = sqlContext.read().format("jdbc")
.options(options).load(); options.put("dbtable", "student_scores");
DataFrame studentScoresDF = sqlContext.read().format("jdbc")
.options(options).load(); // 将两个DataFrame转换为JavaPairRDD,执行join操作
JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = studentInfosDF.javaRDD().mapToPair( new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),
Integer.valueOf(String.valueOf(row.get(1))));
} })
.join(studentScoresDF.javaRDD().mapToPair( new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.get(0)),
Integer.valueOf(String.valueOf(row.get(1))));
} })); // 将JavaPairRDD转换为JavaRDD<Row>
JavaRDD<Row> studentRowsRDD = studentsRDD.map( new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { private static final long serialVersionUID = 1L; @Override
public Row call(
Tuple2<String, Tuple2<Integer, Integer>> tuple)
throws Exception {
return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
} }); // 过滤出分数大于80分的数据
JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter( new Function<Row, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(Row row) throws Exception {
if(row.getInt(2) > 80) {
return true;
}
return false;
} }); // 转换为DataFrame
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields); DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType); Row[] rows = studentsDF.collect();
for(Row row : rows) {
System.out.println(row);
} // 将DataFrame中的数据保存到mysql表中
// 这种方式是在企业里很常用的,有可能是插入mysql、有可能是插入hbase,还有可能是插入redis缓存
studentsDF.javaRDD().foreach(new VoidFunction<Row>() { private static final long serialVersionUID = 1L; @Override
public void call(Row row) throws Exception {
String sql = "insert into good_student_infos values("
+ "'" + String.valueOf(row.getString(0)) + "',"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null;
Statement stmt = null;
try {
conn = DriverManager.getConnection(
"jdbc:mysql://spark1:3306/testdb", "", "");
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
} }); sc.close();
} }

最新文章

  1. 室内定位系列(一)——WiFi位置指纹(译)
  2. 图片拾取器-PicPicker
  3. Theano3.5-练习之深度卷积网络
  4. Ubuntu 更改默认apt-get源
  5. html5 +css3 第一章学习和笔记
  6. Java多线程——&lt;三&gt;简单的线程执行:Executor
  7. NSOutputStream\NSInputStream
  8. mysql-error --(ERROR 1135 (HY000): Can't create a new thread (errno 11); if you are not out)
  9. Python 3.5.1 Syntax &amp; APIs(Continue Updating..
  10. MySQL常用命令总结3
  11. React-Native 开发(二) 在react-native 中 运用 redux
  12. QueryError:Incorrect result size: expected 1, actual 0
  13. [CQOI2010]内部白点
  14. Linux:ssh_config快速访问服务器
  15. Scrapyd 改进第二步: Web Interface 添加 STOP 和 START 超链接, 一键调用 Scrapyd API
  16. Kafka技术内幕 读书笔记之(四) 新消费者——心跳任务
  17. 【hdu5306】 Gorgeous Sequence
  18. Linux系统无线网卡的安装【转】
  19. Android开发基于百度地图的乘车助手
  20. VIM 乱码终极解决

热门文章

  1. new Image 读取宽高为0——onload
  2. .net core 定时程序
  3. Java调用WebService方法总结(9,end)--Http方式调用WebService
  4. 关于移动端图片浏览,previewimage的使用
  5. TR-业务流程图
  6. Qt 接受拖放
  7. 2019.7月-前端面试总结(H5+C3+JS+ES6+Vue+浏览器)
  8. WebSocket转载
  9. pod健康检查(liveness probe存活探针&amp;&amp;readiness probe 可读性探针)
  10. 【HCIA Gauss】学习汇总-数据库基础介绍-1