Flink读写mysql

如果是mvn项目的话,需要预先导入相应的包:

        <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.24</version>
</dependency>

1、读

import java.time.LocalDateTime

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.log4j.Logger object OperatorMysql extends Logger("opeartorMysql") {
val log = Logger.getLogger("opeartorMysql") def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8"
val username = "root"
val password = "123456"
log.info("--------read mysql-----------")
val sql_read = "select relationid,year,month from reid_kequn_arave_times"
readMysql(env, url, driver, username, password, sql_read)
}
/**
* 读mysql
*
* @param env
* @param url
* @param user
* @param pwd
* @param sql
*/
def readMysql(env: ExecutionEnvironment, url: String, driver: String, user: String, pwd: String, sql: String) = {
val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(pwd)
.setQuery(sql)
.setRowTypeInfo(new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.finish())
dataResult.map(x => {
val relationid = x.getField(0)
val year = x.getField(1)
val month = x.getField(2)
(relationid, year, month)
}).print()
}

运行结果:

2、写

import java.time.LocalDateTime

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.log4j.Logger /**
* AUTHOR Guozy
* DATE 2020/3/14-10:44
**/
object OperatorMysql extends Logger("opeartorMysql") {
val log = Logger.getLogger("opeartorMysql") def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8"
val username = "root"
val password = "winner@001"
log.info("--------write mysql-----------")
val sql_write = "insert into attention_newdata(Hostname,ChannelNum,enabled,LastTime,CreateTime,ModifyTime) values(?,?,?,?,?,?) on duplicate key update ModifyTime=NOW()"
val curTime = LocalDateTime.now().toString.replace("T", " ")
val outputData = env.fromElements(("ttt", "0", "1", curTime, curTime, curTime))
.map(x => {
val row = new Row(6)
row.setField(0, x._1)
row.setField(1, x._2)
row.setField(2, x._3)
row.setField(3, x._4)
row.setField(4, x._5)
row.setField(5, x._6)
row
})
writeMysql(env, outputData, url, username, password, sql_write)
}
// 写mysql
def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl(url)
.setUsername(user)
.setPassword(pwd)
.setQuery(sql)
.finish())
env.execute("insert data to mysql")
print("data write successfully")
}

运行结果:

  

  

最新文章

  1. java nio系列文章
  2. sql server 2008 相关基础(物理备份还原)
  3. RSA算法小记
  4. gulp.spriteSmith使用
  5. crackme_zapline分析
  6. UDF 编写自定函数
  7. hiho一下103周 平衡树&#183;Treap
  8. MM32 RTC学习(兼容STM32)
  9. vs2005 测试 lua环境
  10. FormView分页显示数据的例子
  11. 初探swift语言的学习—Object-C与Swift混编
  12. web前端利用HSTS(新的Web安全协议HTTP Strict Transport Security)漏洞的超级Cookie(HSTS Super Cookie)
  13. cf747 D. Winter Is Coming
  14. Jenkins安装与配置
  15. java四则运算
  16. 如何让div水平居中呢?
  17. 20175126《Java程序设计》第二周学习总结
  18. apicloud 自定义模块的开发与上架注意事项
  19. Ubuntu java install &amp; config
  20. 服务器响应慢的分析与解决(Linux服务器)

热门文章

  1. 力扣 - 92. 反转链表II
  2. 清明|TcaplusDB持续为您保驾护航
  3. 201871030127-王明强 实验三 结对项目—《D{0-1}KP 实例数据集算法实验平台》项目报告
  4. 浅谈 Fresco 框架结构
  5. 2. IntelliJ Idea 常用快捷键列表
  6. 由电脑专卖系统引发的Java设计模式:访问者模式
  7. C++ 内存模型之单独编译
  8. All in All UVA - 10340
  9. JDBC_02_JDBC连接数据库 (INSERT INTO)
  10. 读取ini配置文件 及 UI对象库