在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作。External Data Sources API代码存放于org.apache.spark.sql包中。

具体的分析可参见OopsOutOfMemory的两篇精彩博文:

http://blog.csdn.net/oopsoom/article/details/42061077

http://blog.csdn.net/oopsoom/article/details/42064075

自己尝试实现了一个简易的读取关系型数据库的外部数据源,代码参见:https://github.com/luogankun/spark-jdbc

支持MySQL/Oracle/DB2,以及几种简单的数据类型,暂时还不支持PrunedScan、PrunedFilteredScan,仅支持TableScan,后续在接着完善。

使用步骤:

1、编译spark-jdbc代码

sbt package

2、添加jar包到spark-env.sh

export SPARK_CLASSPATH=/home/spark/software/source/spark_package/spark-jdbc/target/scala-2.10/spark-jdbc_2.-0.1.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/ojdbc6.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/db2jcc4.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/mysql-connector-java-3.0..jar:$SPARK_CLASSPATH

3、SQL使用:启动spark-sql

参数说明:

url :关系型数据库url

user :关系型数据库用户名

password: 关系型数据库密码

sql:关系型数据库sql查询语句

MySQL:

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url 'jdbc:mysql://hadoop000:3306/hive',
user 'root',
password 'root',
sql 'select TBL_ID,TBL_NAME,TBL_TYPE FROM TBLS WHERE TBL_ID < 100'
); SELECT * FROM jdbc_table;

Oracle:

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url 'jdbc:oracle:thin:@hadoop000:1521/ora11g',
user 'coc',
password 'coc',
sql 'select HISTORY_ID, APPROVE_ROLE_ID, APPROVE_OPINION from CI_APPROVE_HISTORY'
); SELECT * FROM jdbc_table;

DB2:

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
url 'jdbc:db2://hadoop000:60000/CI',
user 'ci',
password 'ci',
sql 'select LABEL_ID from coc.CI_APPROVE_STATUS'
); SELECT * FROM jdbc_table;

在测试过程中遇到的问题:

如上的代码在连接MySQL数据库操作时没有问题,但是在操作Oracle或者DB2数据库时,报错如下:

09:56:48,302 [Executor task launch worker-0] ERROR Logging$class : Error in TaskCompletionListener
java.lang.AbstractMethodError: oracle.jdbc.driver.OracleResultSetImpl.isClosed()Z
at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:99)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:85)
at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:110)
at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:108)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:108)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
09:56:48,302 [Executor task launch worker-1] ERROR Logging$class : Error in TaskCompletionListener

跟了下JdbcRDD源代码发现,问题在于:

我在本案例中使用的oracle的驱动是ojdbc14-10.2.0.3.jar,查阅了些资料说是Oracle的实现类没有该方法;

该issues详见: https://issues.apache.org/jira/browse/SPARK-5239

解决办法:

1)升级驱动包;

2)暂时屏蔽掉这两个isClosed的判断方法(https://github.com/apache/spark/pull/4033)

4、Scala API使用方式

import  com.luogankun.spark.jdbc._
val sqlContext = new HiveContext(sc)
val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city")
cities.collect

后续将会继续完善,现在的实现确实很“丑陋”,凑合着先能使用吧。

最新文章

  1. 分享一些学习资料-大量PDF电子书
  2. Gnome_Terminal
  3. js-倒计时自动隐藏
  4. JAVA集合类汇总
  5. 【复位】FGPA的复位 [部分转]
  6. DB2 Add hidden Identity columns
  7. 【转】稍改进过的ListView,很好用哈
  8. 【转载】主数据管理(MDM)与元数据管理
  9. Centos 6.5(64bit)上安装Vertica single node
  10. Hibernate 注解多对一 要求在多那边产生一个外键而不会另外产生一个表
  11. MarkdownPad2的密钥
  12. 前端worker之web worker
  13. Jquery遍历之获取子级元素、同级元素和父级元素
  14. POJ 3481 Double Queue
  15. PIGS POJ - 1149(水最大流)
  16. 10.9 Xadmin
  17. SQL server类型转换
  18. NETCONF+Yang配置TSN
  19. Linux环境安装MySQL数据库(RPM格式的软件包)
  20. npm使用入门(package.json)

热门文章

  1. Java中interface和abstract class的区别和联系
  2. HDU 1402 fft 模板题
  3. (转)Javascript匿名函数的写法、传参、递归
  4. c#中各类日期的计算方法,收藏
  5. Linux下使用yum安装MariaDB
  6. My first Scratch small game
  7. HDU 1829 分组并查集
  8. HDU 4777 Rabbit Kingdom (2013杭州赛区1008题,预处理,树状数组)
  9. Why NSAttributedString import html must be on main thread?
  10. 多线程、多进程、协程、缓存(memcache、redis)