一.使用jdbcRDD的接口:

 SparkConf conf = new SparkConf();
conf.setAppName("Simple Application").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf); //1.直接使用jdbcRDD的构造函数
class DbConnection extends AbstractFunction0<Connection> implements
Serializable {
private static final long serialVersionUID = 1L;
private String driverClassName;
private String connectionUrl;
private String userName;
private String password; public DbConnection(String driverClassName, String connectionUrl,
String userName, String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
} @Override
public Connection apply() {
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
}
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
Connection connection = null;
try {
connection = DriverManager.getConnection(connectionUrl,
properties);
} catch (SQLException e) {
}
return connection;
}
} class MapResult extends AbstractFunction1<ResultSet, Object[]>
implements Serializable {
private static final long serialVersionUID = 1L; public Object[] apply(ResultSet row) {
return JdbcRDD.resultSetToObjectArray(row);
}
} String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
String Driver="com.mysql.jdbc.Driver";
String UserName = "root";
String password = "pd";
DbConnection dbConnection = new DbConnection(Driver,
Connection_url, UserName, password);
sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
//lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection,
sql, , , , new MapResult(),
ClassManifestFactory$.MODULE$.fromClass(Object[].class));
JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD,
ClassManifestFactory$.MODULE$.fromClass(Object[].class)); //另外一种实现:
class DbConnectionFactory implements JdbcRDD.ConnectionFactory {
private static final long serialVersionUID = 1L;
private String driverClassName;
private String connectionUrl;
private String userName;
private String password; public Connection getConnection() throws Exception {
Class.forName(driverClassName);
String url = connectionUrl;
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
return DriverManager.getConnection(url, properties);
} public DbConnectionFactory(String driverClassName, String connectionUrl,
String userName, String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
} } String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver,
Connection_url, UserName, password)
javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver,
Connection_url, UserName, password), sql, , , ,new Function<ResultSet,Object[]>()
{
private static final long serialVersionUID = 1L;
public Object[] call(ResultSet resultSet)
{
return JdbcRDD.resultSetToObjectArray(resultSet);
}
});//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1)
//javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)

.使用通过sparksession的接口:

 SparkSession ss = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
//读取方式1
String sql = "(select * from xxx) as tmp; //注意这里的sql格式,该sql也可以直接是一个表名
Dataset<Row> df = session.read().format("jdbc")
.option("url", jdbcURL)
.option("driver", driver)
.option("dbtable", sql)
.option("user", username)
.option("password", password)
.load();
//读取方式2:
Properties connectionProperties = new Properties();
connectionProperties.put("user", username);
connectionProperties.put("password", password);
connectionProperties.put("driver", driver);
session.read().jdbc(url, table, properties)
df = session.read().jdbc(jdbcURL,sql,connectionProperties); //写入方式1:
String saveMode = "Overwrite";
df.write().mode(saveMode).jdbc(jdbcURL, tablename, connectionProperties);
//写入方式2:
final String sql = "insert into tab_xxx (c1,c2,c3) values(?,?,?)"; df.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
private static final long serialVersionUID = -834520661839866305L;
@Override
public void call(Iterator<Row> t) throws Exception {
Class.forName(driver);
Connection conn = (Connection) DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false);
try {
PreparedStatement pstmt = (PreparedStatement) conn.prepareStatement(sql);
int loop = ;
while (t.hasNext()) {
Row row = t.next();
for (int i = ; i < ; i++) { //这里的3是插入的列只有3列
pstmt.setObject(i + , row.get(i));
}
pstmt.executeUpdate();
if (++loop % == ) {
conn.commit();
}
}
conn.commit();
pstmt.close();
} finally {
conn.close();
}
}
});
}
 //写入方法3
3.转换成List<Row>,可以批量写入,但是有可能导致Driver 内存承载过高 private static void InsertTmpTable(Connection conn,Dataset<Row> ndf,final String m_cols,final String tabname) {
System.out.println("InsertTmpTab "+tabname+" start!");
String placeholderStr="values(";
int cnt = m_cols.split(",").length+1;
for(int i = 0;i<cnt;i++){
placeholderStr+="?"+ (i == cnt-1 ? ")" : ",");
} String sql = "insert into "+ tabname+"("+m_cols+",orderby_time)"+placeholderStr;
PreparedStatement pstmt = null; try {
List<Row> lrow=ndf.collectAsList();
pstmt = (PreparedStatement) conn.prepareStatement(sql); for(int j =0;j<lrow.size();j++){
for(int i=0;i<cnt;i++){
pstmt.setObject(i+1,lrow.get(j).get(i));
}
pstmt.executeUpdate();
if(j%10000==0) //批量提交
conn.commit();
}
conn.commit();
pstmt.close();
}catch (Exception e){
System.out.println(e.getMessage());
System.exit(-1);
}
System.out.println("InsertTmpTab "+tabname+" success!");
}

最新文章

  1. .net Socket 通信简单实例(初级入门)
  2. &quot;rel=nofollow&quot;属性
  3. win7默认网关不可用怎么解决
  4. 洛谷⑨月月赛Round2 P3393逃离僵尸岛[最短路]
  5. WebApi 2:属性路由 [Route()],attribute routing
  6. Linux驱动修炼之道-RTC子系统框架与源码分析【转】
  7. HDU 4655 Cut Pieces(数学分析题)
  8. [Java]获取图片高和宽
  9. 编译安装-Apache
  10. document.write 向文档中写内容,包括文本、脚本、元素之类的,但是它在什么时候执行不会覆盖当前页面内容尼?
  11. 使用Discuz!自带参数防御CC攻击以及原理,修改Discuz X 开启防CC攻击后,不影响搜索引擎收录的方法
  12. Block使用中的一些要注意的地方
  13. Python元祖
  14. IMCASH:卖掉了比特币回老家生活的现在怎么样了?
  15. Android补间动画、帧动画和属性动画使用知识介绍
  16. Grape教程-params
  17. mysql中的NULL的判断
  18. 实现checkbox的多选
  19. 关于JS中闭包的问题
  20. UVa 1615 Highway (贪心,区间选点问题)

热门文章

  1. 阿贾克斯(AJAX)
  2. vue中使用window.open会在url前自动添加本地服务器的地址bug修复
  3. 屏蔽JS代码错误
  4. 启动tomcat的时候为啥你启动的是8,启动起来的确实其他的Tomcat
  5. Oracle中字符串截取常用方法总结
  6. Servlet Struts2 SpringMVC 获取参数与导出数据 方法比较
  7. Codeforces Round #487 (Div. 2)
  8. Jquery中复选框选中取消实现文本框的显示隐藏
  9. UIPickerView的简单使用
  10. Linux中Elasticsearch集群部署