Flink自定义Sink
2024-09-02 18:43:30
Flink自定义Sink
Flink 自定义Sink,把socket数据流数据转换成对象写入到mysql存储。
#创建Student类
public class Student {
private int id;
private String name;
private int age;
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
Step2:继承RichSinkFunction,自定义Sink
public class Sink2Mysql extends RichSinkFunction<Student> {
Connection connection;
PreparedStatement pstmt;
private Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://localhost:3306/imooc_flink";
conn = DriverManager.getConnection(url,"root","123456");
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into student(id,name,age) values (?,?,?)";
pstmt = connection.prepareStatement(sql);
System.out.println("open");
}
// 每条记录插入时调用一次
public void invoke(Student value, Context context) throws Exception {
System.out.println("invoke~~~~~~~~~");
// 未前面的占位符赋值
pstmt.setInt(1, value.getId());
pstmt.setString(2, value.getName());
pstmt.setInt(3, value.getAge());
pstmt.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if(pstmt != null) {
pstmt.close();
}
if(connection != null) {
connection.close();
}
}
}
Step3:在mysql创建存储表
create table student (
id int(11) not null auto_increment,
name varchar(25),
age int(10),
primary key (id)
);
Step4:
将socket流转成Student对象,并存储到mysql
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost",7777);
SingleOutputStreamOperator<Student> stuStream = source.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) {
String[] splits = value.split(",");
Student stu = new Student();
stu.setId(Integer.parseInt(splits[0])) ;
stu.setName(splits[1]);
stu.setAge(Integer.parseInt(splits[2]));
return stu;
}
});
stuStream.addSink(new Sink2Mysql());
env.execute("JavaStreamSink2MysqlApp");
}
Step5:测试
在终端开启socket流,并输入数据:
1,xiao,17
2,ming,24
3,uzi,20
查询mysql表:
select * from student;
结果如下:
mysql> select * from student;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | xiao | 17 |
| 2 | ming | 24 |
| 3 | uzi | 20 |
+----+------+------+
3 rows in set (0.00 sec)
注意一点:
如果运行程序时,报错:
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
可能原因是没有添加依赖 mysql-jdbc 依赖,需要在pom文件添加:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
这里的版本不需要跟mysql版本对应。
最新文章
- Android系统默认对话框添加图片
- wcf DataTable作为返回类型
- 【转】五种常见的 PHP 设计模式
- Android中ListView动态加载数据
- SQLServer修改表所有者
- Unity3D热更新全书-脚本(一) 初识脚本
- jquery attr()方法 添加,修改,获取对象的属性值。
- Win7 Print Spooler服務自动关闭
- [linux]查看文件编码和编码转换
- php5 date()获得的时间不是当前时间
- 自助用户选择VM Network
- CodeIgniter开发实际案例-新闻网站【转】
- 2015.7.17( NOI2015 day1 )
- YUI Compressor压缩失效的场景-eval和with
- Asp.Net MVC以 JSON传值扩展方法
- js判断语句关于true和false后面跟数字或字符串的问题
- Adams 2013自定义插件方法zz
- qml : qml控件自适应;
- CSS3-字体渐变色
- Python 字符串的操作
热门文章
- day6. while双项循环及for循环
- ios 浅谈一下UITextFiled UITextView 在tableview的cell上边展示
- qt事件过滤器的使用(可以用于控制屏幕背光等)
- 003_对go语言中的工作池代码练习的一些思考和改进
- java_Scanner类、Random类、ArrayList 类的使用
- VMware启动CentOS出错,提示";该虚拟机似乎正在使用中";
- C#LeetCode刷题之#54-螺旋矩阵(Spiral Matrix)
- C#设计模式之13-职责链模式
- Quartz.Net的基础使用方法,多任务执行
- SpringMVC的简介和工作流程