前言

本文介绍如何在IDEA上快速开发基于Flink框架的DataStream程序。先直接上手!

环境清单

案例是在win7运行。安装VirtualBox,在VirtualBox上安装Centos操作系统。所有资源都在百度云上,有需要请直接下载。安装教程基本都是傻瓜式,文章不做讲述,有需要直接网上搜索。

资源 版本
VirtualBox 5.2.16
Centos 6.5
Maven 3.6.3
JDK 8u241
IDEA 2019.3.2
Flink 1.10.0

链接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row

提取码:qt2p

轻装上阵

1、IP设置

Centos的设置静态IP为192.168.2.20,关闭防火墙

 vi /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
TYPE=Ethernet
ONBOOT=yes #开机启动eth0网卡
NM_CONTROLLED=yes
BOOTPROTO=static
IPADDR=192.168.2.20
GATEWAY=192.168.2.1
NETMASK=255.255.255.0
     如果此时ping www.baidu.com等不通,需要我们添加dns服务器。
 [root@localhost network-scripts]# vi /etc/resolv.conf
nameserver 192.168.2.1
  重新启动网络服务
 [root@localhost network-scripts]# service network restart
正在关闭接口 eth0:[确定]
关闭环回接口:[确定]
弹出环回接口:[确定]
弹出界面 eth0:Determining if ip address 192.168.2.20 is already in use for device eth0...
[确定]
      关闭防火墙
 [root@localhost network-scripts]# service iptables stop

2、创建项目

在win7的命令行下,用mvn命令创建开发模板

 mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
这种方式允许你为新项目命名。它将以交互式的方式询问你项目的 groupId、artifactId 和 package 名称。
用tree命令看下,如下结构。项目是一个 Maven project,它包含了两个类:StreamingJob 和 BatchJob
分别是 DataStream and DataSet 程序的基础骨架程序。main 方法是程序的入口,既可用于IDE测试/执行,也可用于部署。
 │  pom.xml
└─src
└─main
├─java
│ └─com
│ └─ryan
│ BatchJob.java
│ StreamingJob.java
└─resources
log4j.properties
3、写一个自己的DataStream的程序

功能介绍:WindowWordCount.java,5s为一个时间窗口,摄取数据源的数据,计算单词出现的次数。

实时数据流计算简易架构图:

为了演示方便,这里我们只演示消息队列和Flink Job两个模块,利用nc工具来替代消息队列作为Flink Job摄取的数据源。

代码:

 package com.ryan;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.2.20", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

在centos机器上,命令行启动nc

 nc -lk 9999

IDEA上直接run main方法,然后在centos机器上,不断输入单词。

 [ryan@localhost ~]$ nc -lk 9999
java
java
shen
深圳 深圳
IDEA控制台上输出如下:

注意:第一次在IDEA上运行这个程序,可能会报如下异常

 java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream

原因是IDEA没有导入flink 的lib下的jar包。导入即可。

4、打包发布到centos平台上的Flink集群

修改pom.xml文件的mainclass的值为com.ryan.WindowWordCount

 <mainClass>com.ryan.WindowWordCount</mainClass>

执行mvn clean install,得到flink-demo-1.0-SNAPSHOT.jar,并上传到centos机器上。

 mvn clean install

打开两个centos的控制台,一个用于打开nc,一个用于运行我们打包好的Flink jar包。

 [ryan@localhost ~]$ nc -lk 9999
java
shen
深圳 深圳 深圳
 [root@localhost flink-1.10.0]# bin/flink run flink-demo/flink-demo-1.0-SNAPSHOT.jar
Job has been submitted with JobID 9931a9dfc2eddeb2d0b5ed15578bd488
  回到win7上,用浏览器打开http://192.168.2.20:8081/,在Running Jobs上,可以看到一条记录。

在Task Managers上,Stdout模块看到程序输出的结果。

所有代码都上传到github上,有需要的朋友可以下载

 https://github.com/qinxiongzhou/flink-demo

至此,我们完成了开发编译调试到最终上线生产运行。喜欢请关注公众号--程序猿牧场,谢谢!

最新文章

  1. bzoj 3718
  2. tomcat安装配置.md
  3. 实用的VS工具
  4. Windows平台安装Redmine2.5.x
  5. vim 空格和换行的删除和替换
  6. jquery实现点击页面空白隐藏指定菜单
  7. leetcode Largest Rectangle in Histogram 单调栈
  8. java传递json数据到前台jsp
  9. Android之 compileSdkVersion, minSdkVersion, and targetSdkVersion
  10. 网易云课堂_程序设计入门-C语言_第六章:数组_1多项式加法
  11. Python之路:Python简介
  12. Fedora Linux 下安装配置C开发环境Code::Blocks
  13. 【NIO】Java NIO之选择器
  14. 我的学习之路_第二十一章_JDBC连接池
  15. Java学习笔记(一)网格袋布局
  16. postman设置环境变量
  17. SpringMVC 使用PUT请求遇到的问题小结
  18. LDAP认证模式简介
  19. jmeter创建基本的FTP测试计划
  20. c#批量更新list对象sql

热门文章

  1. 3dmax2013卸载/安装失败/如何彻底卸载清除干净3dmax2013注册表和文件的方法
  2. jQuery选择器的效率问题
  3. 细看Java序列化机制
  4. WEB前端资源集(二)
  5. [PyTorch入门之60分钟入门闪击战]之入门
  6. 查漏补缺:socket编程:TCP粘包问题和常用解决方案(上)
  7. 如何正确的hook方法objc_msgSend &middot; jmpews
  8. 从国内APP更新“精雕细琢” 看国内外产品理念之差
  9. 从游戏到汽车 “明星IP”的发财轨迹
  10. 关于vue+element-ui项目的分页,返回默认显示第一页的问题解决