spark操作数据库的几种方法
2024-09-04 12:17:16
一.使用jdbcRDD的接口:
SparkConf conf = new SparkConf();
conf.setAppName("Simple Application").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf); //1.直接使用jdbcRDD的构造函数
class DbConnection extends AbstractFunction0<Connection> implements
Serializable {
private static final long serialVersionUID = 1L;
private String driverClassName;
private String connectionUrl;
private String userName;
private String password; public DbConnection(String driverClassName, String connectionUrl,
String userName, String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
} @Override
public Connection apply() {
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
}
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
Connection connection = null;
try {
connection = DriverManager.getConnection(connectionUrl,
properties);
} catch (SQLException e) {
}
return connection;
}
} class MapResult extends AbstractFunction1<ResultSet, Object[]>
implements Serializable {
private static final long serialVersionUID = 1L; public Object[] apply(ResultSet row) {
return JdbcRDD.resultSetToObjectArray(row);
}
} String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
String Driver="com.mysql.jdbc.Driver";
String UserName = "root";
String password = "pd";
DbConnection dbConnection = new DbConnection(Driver,
Connection_url, UserName, password);
sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
//lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection,
sql, , , , new MapResult(),
ClassManifestFactory$.MODULE$.fromClass(Object[].class));
JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD,
ClassManifestFactory$.MODULE$.fromClass(Object[].class)); //另外一种实现:
class DbConnectionFactory implements JdbcRDD.ConnectionFactory {
private static final long serialVersionUID = 1L;
private String driverClassName;
private String connectionUrl;
private String userName;
private String password; public Connection getConnection() throws Exception {
Class.forName(driverClassName);
String url = connectionUrl;
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
return DriverManager.getConnection(url, properties);
} public DbConnectionFactory(String driverClassName, String connectionUrl,
String userName, String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
} } String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver,
Connection_url, UserName, password)
javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver,
Connection_url, UserName, password), sql, , , ,new Function<ResultSet,Object[]>()
{
private static final long serialVersionUID = 1L;
public Object[] call(ResultSet resultSet)
{
return JdbcRDD.resultSetToObjectArray(resultSet);
}
});//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1)
//javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)
二.使用通过sparksession的接口:
SparkSession ss = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
//读取方式1
String sql = "(select * from xxx) as tmp; //注意这里的sql格式,该sql也可以直接是一个表名
Dataset<Row> df = session.read().format("jdbc")
.option("url", jdbcURL)
.option("driver", driver)
.option("dbtable", sql)
.option("user", username)
.option("password", password)
.load();
//读取方式2:
Properties connectionProperties = new Properties();
connectionProperties.put("user", username);
connectionProperties.put("password", password);
connectionProperties.put("driver", driver);
session.read().jdbc(url, table, properties)
df = session.read().jdbc(jdbcURL,sql,connectionProperties); //写入方式1:
String saveMode = "Overwrite";
df.write().mode(saveMode).jdbc(jdbcURL, tablename, connectionProperties);
//写入方式2:
final String sql = "insert into tab_xxx (c1,c2,c3) values(?,?,?)"; df.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
private static final long serialVersionUID = -834520661839866305L;
@Override
public void call(Iterator<Row> t) throws Exception {
Class.forName(driver);
Connection conn = (Connection) DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false);
try {
PreparedStatement pstmt = (PreparedStatement) conn.prepareStatement(sql);
int loop = ;
while (t.hasNext()) {
Row row = t.next();
for (int i = ; i < ; i++) { //这里的3是插入的列只有3列
pstmt.setObject(i + , row.get(i));
}
pstmt.executeUpdate();
if (++loop % == ) {
conn.commit();
}
}
conn.commit();
pstmt.close();
} finally {
conn.close();
}
}
});
}
//写入方法3
3.转换成List<Row>,可以批量写入,但是有可能导致Driver 内存承载过高 private static void InsertTmpTable(Connection conn,Dataset<Row> ndf,final String m_cols,final String tabname) {
System.out.println("InsertTmpTab "+tabname+" start!");
String placeholderStr="values(";
int cnt = m_cols.split(",").length+1;
for(int i = 0;i<cnt;i++){
placeholderStr+="?"+ (i == cnt-1 ? ")" : ",");
} String sql = "insert into "+ tabname+"("+m_cols+",orderby_time)"+placeholderStr;
PreparedStatement pstmt = null; try {
List<Row> lrow=ndf.collectAsList();
pstmt = (PreparedStatement) conn.prepareStatement(sql); for(int j =0;j<lrow.size();j++){
for(int i=0;i<cnt;i++){
pstmt.setObject(i+1,lrow.get(j).get(i));
}
pstmt.executeUpdate();
if(j%10000==0) //批量提交
conn.commit();
}
conn.commit();
pstmt.close();
}catch (Exception e){
System.out.println(e.getMessage());
System.exit(-1);
}
System.out.println("InsertTmpTab "+tabname+" success!");
}
最新文章
- .net Socket 通信简单实例(初级入门)
- ";rel=nofollow";属性
- win7默认网关不可用怎么解决
- 洛谷⑨月月赛Round2 P3393逃离僵尸岛[最短路]
- WebApi 2:属性路由 [Route()],attribute routing
- Linux驱动修炼之道-RTC子系统框架与源码分析【转】
- HDU 4655 Cut Pieces(数学分析题)
- [Java]获取图片高和宽
- 编译安装-Apache
- document.write 向文档中写内容,包括文本、脚本、元素之类的,但是它在什么时候执行不会覆盖当前页面内容尼?
- 使用Discuz!自带参数防御CC攻击以及原理,修改Discuz X 开启防CC攻击后,不影响搜索引擎收录的方法
- Block使用中的一些要注意的地方
- Python元祖
- IMCASH:卖掉了比特币回老家生活的现在怎么样了?
- Android补间动画、帧动画和属性动画使用知识介绍
- Grape教程-params
- mysql中的NULL的判断
- 实现checkbox的多选
- 关于JS中闭包的问题
- UVa 1615 Highway (贪心,区间选点问题)