spark load data from mysql

code first

本机通过spark-shell.cmd启动一个spark进程

	SparkSession spark = SparkSession.builder().appName("Simple Application").master("local[2]").getOrCreate();

        Map<String, String> map = new HashMap<>();
map.put("url","jdbc:mysql:xxx");
map.put("user", "user");
map.put("password", "pass");
String tableName = "table";
map.put("dbtable", tableName);
map.put("driver", "com.mysql.jdbc.Driver");
String lowerBound = 1 + ""; //低界限
String upperBound = 10000 + ""; //高界限 map.put("fetchsize", "100000"); //实例和mysql服务端单次拉取行数,拉取后才能执行rs.next()
map.put("numPartitions", "50"); //50个分区区间,将以范围[lowerBound,upperBound]划分成50个分区,每个分区执行一次查询
map.put("partitionColumn", "id"); //分区条件列
System.out.println("tableName:" + tableName + ", lowerBound:"+lowerBound+", upperBound:"+upperBound);
map.put("lowerBound", lowerBound);
map.put("upperBound", upperBound); Dataset dataset = spark.read().format("jdbc").options(map).load(); //transform操作
dataset.registerTempTable("tmp__");
Dataset<Row> ds = spark.sql("select * from tmp__"); //transform操作
ds.cache().show(); //action,触发sql真正执行

执行到show时,任务开始真正执行,此时,我们单机debug,来跟踪partitionColumn的最终实现方式

debug类

org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.buildScan

此时parts为size=50的分区列表

  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
sparkSession.sparkContext,
schema,
requiredColumns,
filters,
parts,
jdbcOptions).asInstanceOf[RDD[Row]]
}

单个分区内的whereClause值

whereCluase="id < 21 or id is null"

继续往下断点,到单个part的执行逻辑,此时代码应该是在Executor中的某个task线程中

org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute

	val myWhereClause = getWhereClause(part)

	val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
rs = stmt.executeQuery()
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())

此时

myWhereClause=WHERE id < 21 or id is null

最终的sql语句

sqlText=SELECT id,xx FROM tablea WHERE id < 21 or id is null

所有part都会经过compute

Executor执行完任务后,将信息发送回Driver

Executor: Finished task 7.0 in stage 2.0 (TID 12). 1836 bytes result sent to driver

总结

  • numPartitions、partitionColumn、lowerBound、upperBound结合后,spark将生成很多个parts,每个part对应一个查询whereClause,最终查询数据将分成numPartitions个任务来拉取数据,因此,partitionColumn必须是索引列,否则,效率将大大降低
  • 自动获取table schema,程序会执行类型select * from tablea where 1=0 来获取字段及类型
  • lowerBound,upperBound仅用来生成parts区间,最终生成的sql中,不会使用它们来作为数据范围的最小或最大值

最新文章

  1. 著名ERP厂商的SSO单点登录解决方案介绍一
  2. Visual Studio的背景插件
  3. Struts2漏洞利用实例
  4. (转)对SQLSERVER数据库事务日志的疑问
  5. 让FineUI数据绑定支持dynamic对象
  6. 基于 REST 的 Web 服务:基础
  7. jsp项目与mysql链接
  8. form表单中的常用控件
  9. android 中文转拼音
  10. fedora 23 安装genymotion解决方案
  11. My-sql #1045 - Access denied for user &#39;root&#39;@&#39;localhost&#39; (using password: NO)
  12. NYOJ737
  13. (转)sql语句中charindex的用法
  14. Linux C 程序的开发环境
  15. JAVA读取、写入Excel表格(含03版)
  16. JAVA之旅(二十四)——I/O流,字符流,FileWriter,IOException,文件续写,FileReader,小练习
  17. 数据库【mysql篇】典型的一些练习题目
  18. 微信公众号_订阅号_爬虫puppeteer
  19. [UE4]Throbber,横向动态图标
  20. 性能测试 Performance Test Report

热门文章

  1. 从 LRU Cache 带你看面试的本质
  2. java中对 闰年的计算 以及月份天数
  3. Azure Storage 系列(六)使用Azure Queue Storage
  4. Anaconda, conda, pyenv, virtualenv的区别
  5. hadoop分布式格式化时出现异常java.net.unknownhostexception
  6. 抛弃vue-webpack-template,踩坑Vue-Cli创建vue项目
  7. 请编写sql多语句表值函数统,计指定年份中每本书的销售总额
  8. 对do{ }while();一直以来的误解 -----如何理解do{ }while( );语句
  9. 洛谷 P6851 【onu】贪心
  10. 推荐Java字节码解析工具classpy