Flink自定义Sink

Flink 自定义Sink,把socket数据流数据转换成对象写入到mysql存储。

#创建Student类
public class Student { private int id;
private String name;
private int age; @Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
} public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getAge() {
return age;
} public void setAge(int age) {
this.age = age;
}
}

Step2:继承RichSinkFunction,自定义Sink

public class Sink2Mysql extends RichSinkFunction<Student> {

    Connection connection;
PreparedStatement pstmt; private Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://localhost:3306/imooc_flink";
conn = DriverManager.getConnection(url,"root","123456"); } catch (Exception e) {
e.printStackTrace();
} return conn;
} @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters); connection = getConnection();
String sql = "insert into student(id,name,age) values (?,?,?)";
pstmt = connection.prepareStatement(sql); System.out.println("open");
} // 每条记录插入时调用一次
public void invoke(Student value, Context context) throws Exception {
System.out.println("invoke~~~~~~~~~");
// 未前面的占位符赋值
pstmt.setInt(1, value.getId());
pstmt.setString(2, value.getName());
pstmt.setInt(3, value.getAge()); pstmt.executeUpdate(); } @Override
public void close() throws Exception {
super.close(); if(pstmt != null) {
pstmt.close();
} if(connection != null) {
connection.close();
}
}
}

Step3:在mysql创建存储表

create table student (
id int(11) not null auto_increment,
name varchar(25),
age int(10),
primary key (id)
);

Step4:

将socket流转成Student对象,并存储到mysql

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost",7777); SingleOutputStreamOperator<Student> stuStream = source.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) {
String[] splits = value.split(","); Student stu = new Student();
stu.setId(Integer.parseInt(splits[0])) ;
stu.setName(splits[1]);
stu.setAge(Integer.parseInt(splits[2])); return stu;
}
}); stuStream.addSink(new Sink2Mysql()); env.execute("JavaStreamSink2MysqlApp");
}

Step5:测试

在终端开启socket流,并输入数据:

1,xiao,17

2,ming,24

3,uzi,20

查询mysql表:

select * from student;

结果如下:

mysql> select * from student;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | xiao | 17 |
| 2 | ming | 24 |
| 3 | uzi | 20 |
+----+------+------+
3 rows in set (0.00 sec)

注意一点:

如果运行程序时,报错:

java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

可能原因是没有添加依赖 mysql-jdbc 依赖,需要在pom文件添加:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>

这里的版本不需要跟mysql版本对应。

最新文章

  1. Android系统默认对话框添加图片
  2. wcf DataTable作为返回类型
  3. 【转】五种常见的 PHP 设计模式
  4. Android中ListView动态加载数据
  5. SQLServer修改表所有者
  6. Unity3D热更新全书-脚本(一) 初识脚本
  7. jquery attr()方法 添加,修改,获取对象的属性值。
  8. Win7 Print Spooler服務自动关闭
  9. [linux]查看文件编码和编码转换
  10. php5 date()获得的时间不是当前时间
  11. 自助用户选择VM Network
  12. CodeIgniter开发实际案例-新闻网站【转】
  13. 2015.7.17( NOI2015 day1 )
  14. YUI Compressor压缩失效的场景-eval和with
  15. Asp.Net MVC以 JSON传值扩展方法
  16. js判断语句关于true和false后面跟数字或字符串的问题
  17. Adams 2013自定义插件方法zz
  18. qml : qml控件自适应;
  19. CSS3-字体渐变色
  20. Python 字符串的操作

热门文章

  1. day6. while双项循环及for循环
  2. ios 浅谈一下UITextFiled UITextView 在tableview的cell上边展示
  3. qt事件过滤器的使用(可以用于控制屏幕背光等)
  4. 003_对go语言中的工作池代码练习的一些思考和改进
  5. java_Scanner类、Random类、ArrayList 类的使用
  6. VMware启动CentOS出错,提示&quot;该虚拟机似乎正在使用中&quot;
  7. C#LeetCode刷题之#54-螺旋矩阵(Spiral Matrix)
  8. C#设计模式之13-职责链模式
  9. Quartz.Net的基础使用方法,多任务执行
  10. SpringMVC的简介和工作流程