轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序
前言
本文介绍如何在IDEA上快速开发基于Flink框架的DataStream程序。先直接上手!
环境清单
案例是在win7运行。安装VirtualBox,在VirtualBox上安装Centos操作系统。所有资源都在百度云上,有需要请直接下载。安装教程基本都是傻瓜式,文章不做讲述,有需要直接网上搜索。
资源 | 版本 |
VirtualBox | 5.2.16 |
Centos | 6.5 |
Maven | 3.6.3 |
JDK | 8u241 |
IDEA | 2019.3.2 |
Flink | 1.10.0 |
链接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row
提取码:qt2p
轻装上阵
1、IP设置
Centos的设置静态IP为192.168.2.20,关闭防火墙
vi /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
TYPE=Ethernet
ONBOOT=yes #开机启动eth0网卡
NM_CONTROLLED=yes
BOOTPROTO=static
IPADDR=192.168.2.20
GATEWAY=192.168.2.1
NETMASK=255.255.255.0
如果此时ping www.baidu.com等不通,需要我们添加dns服务器。
[root@localhost network-scripts]# vi /etc/resolv.conf
nameserver 192.168.2.1
重新启动网络服务
[root@localhost network-scripts]# service network restart
正在关闭接口 eth0:[确定]
关闭环回接口:[确定]
弹出环回接口:[确定]
弹出界面 eth0:Determining if ip address 192.168.2.20 is already in use for device eth0...
[确定]
关闭防火墙
[root@localhost network-scripts]# service iptables stop
2、创建项目
在win7的命令行下,用mvn命令创建开发模板
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
这种方式允许你为新项目命名。它将以交互式的方式询问你项目的 groupId、artifactId 和 package 名称。
用tree命令看下,如下结构。项目是一个 Maven project,它包含了两个类:StreamingJob 和 BatchJob
分别是 DataStream and DataSet 程序的基础骨架程序。main 方法是程序的入口,既可用于IDE测试/执行,也可用于部署。
│ pom.xml
└─src
└─main
├─java
│ └─com
│ └─ryan
│ BatchJob.java
│ StreamingJob.java
└─resources
log4j.properties
3、写一个自己的DataStream的程序
功能介绍:WindowWordCount.java,5s为一个时间窗口,摄取数据源的数据,计算单词出现的次数。
实时数据流计算简易架构图:
为了演示方便,这里我们只演示消息队列和Flink Job两个模块,利用nc工具来替代消息队列作为Flink Job摄取的数据源。
代码:
package com.ryan;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.2.20", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
在centos机器上,命令行启动nc
nc -lk 9999
IDEA上直接run main方法,然后在centos机器上,不断输入单词。
[ryan@localhost ~]$ nc -lk 9999
java
java
shen
深圳 深圳
IDEA控制台上输出如下:
注意:第一次在IDEA上运行这个程序,可能会报如下异常
java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream
原因是IDEA没有导入flink 的lib下的jar包。导入即可。
4、打包发布到centos平台上的Flink集群
修改pom.xml文件的mainclass的值为com.ryan.WindowWordCount
<mainClass>com.ryan.WindowWordCount</mainClass>
执行mvn clean install,得到flink-demo-1.0-SNAPSHOT.jar,并上传到centos机器上。
mvn clean install
打开两个centos的控制台,一个用于打开nc,一个用于运行我们打包好的Flink jar包。
[ryan@localhost ~]$ nc -lk 9999
java
shen
深圳 深圳 深圳
[root@localhost flink-1.10.0]# bin/flink run flink-demo/flink-demo-1.0-SNAPSHOT.jar
Job has been submitted with JobID 9931a9dfc2eddeb2d0b5ed15578bd488
回到win7上,用浏览器打开http://192.168.2.20:8081/,在Running Jobs上,可以看到一条记录。
在Task Managers上,Stdout模块看到程序输出的结果。
所有代码都上传到github上,有需要的朋友可以下载
https://github.com/qinxiongzhou/flink-demo
至此,我们完成了开发编译调试到最终上线生产运行。喜欢请关注公众号--程序猿牧场,谢谢!
最新文章
- bzoj 3718
- tomcat安装配置.md
- 实用的VS工具
- Windows平台安装Redmine2.5.x
- vim 空格和换行的删除和替换
- jquery实现点击页面空白隐藏指定菜单
- leetcode Largest Rectangle in Histogram 单调栈
- java传递json数据到前台jsp
- Android之 compileSdkVersion, minSdkVersion, and targetSdkVersion
- 网易云课堂_程序设计入门-C语言_第六章:数组_1多项式加法
- Python之路:Python简介
- Fedora Linux 下安装配置C开发环境Code::Blocks
- 【NIO】Java NIO之选择器
- 我的学习之路_第二十一章_JDBC连接池
- Java学习笔记(一)网格袋布局
- postman设置环境变量
- SpringMVC 使用PUT请求遇到的问题小结
- LDAP认证模式简介
- jmeter创建基本的FTP测试计划
- c#批量更新list对象sql
热门文章
- 3dmax2013卸载/安装失败/如何彻底卸载清除干净3dmax2013注册表和文件的方法
- jQuery选择器的效率问题
- 细看Java序列化机制
- WEB前端资源集(二)
- [PyTorch入门之60分钟入门闪击战]之入门
- 查漏补缺:socket编程:TCP粘包问题和常用解决方案(上)
- 如何正确的hook方法objc_msgSend ·; jmpews
- 从国内APP更新“精雕细琢” 看国内外产品理念之差
- 从游戏到汽车 “明星IP”的发财轨迹
- 关于vue+element-ui项目的分页,返回默认显示第一页的问题解决