之前看了视频学习第一个flink  word count使用,但是对于socket发送数据作为数据源我这里有点忘记了,加上最近有个项目要发布,一直在忙,所以迟迟无法完成;

1、首先我们要有数据源,因为不论是流计算处理还是批次处理,都需要数据源,然后经过transformation转换成我们想要的数据输出到某个地方,这里我们就输出到控制台即可;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket; public class SocketTalkServer { public static void main(String[] args) {
try {
ServerSocket server = null;
// 创建一个端口为9000监听客户端请求的serversocket
try {
server = new ServerSocket(9000);
System.out.println("服务端启动成功:服务端端口号为9000");
} catch (IOException e) {
// 如果连接不上,打印出错信息
System.out.println("can not listen to:"+e);
}
Socket serverSocket = null;
try {
// 使用accept()阻塞等待客户请求,有客户请求则产生一个Socket对象,并继续执行
serverSocket = server.accept();
// 有客户端连接
System.out.println("有个客户端连接:"+serverSocket.getInetAddress()+":"+serverSocket.getPort());
} catch (IOException e) {
// 客户端请求异常
System.out.println(e);
}
String line;
// 通过Socket对象得到输出流,构造printwriter对象
PrintWriter serverPrintWriter = new PrintWriter(serverSocket.getOutputStream());
// 通过控制台构造bufferedreader对象
BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in));
// 服务端控制台上输入的数据源字符串
String serverLine = serverInput.readLine();
// 如果输入bye,停止循环
while (!serverLine.equals("bye")){
// 向客户端输出字符串
serverPrintWriter.println(serverLine);
// 刷新输出流
serverPrintWriter.flush();
// 在系统控制台上打印输入的内容;
System.out.println("Server:"+serverLine);
// 继续输入然后重新读取字符串
serverLine = serverInput.readLine();
}
serverPrintWriter.close();
serverSocket.close();
server.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

2、编写flink计算程序,也是我的第一个程序,这里有几个步骤,我觉着视频中的老师写的非常好,就抄过来了,十分易于理解:

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector; public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
// 获取所需要的端口号
int port = 9000;
// try{
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// port = parameterTool.getInt("port");}
// catch (Exception e){
// System.err.println("no port specified. use default 9000");
// port = 9000;
// }
// 获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "127.0.0.1";
String delimiter = "\n";
// 链接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<WordIsCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordIsCount>() {
@Override
public void flatMap(String value, Collector<WordIsCount> out) throws Exception {
String[] words = value.split("\\s");
for (String word : words) {
out.collect(new WordIsCount(word, 1L));
}
}
}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))// 指定时间窗口大小为2秒,指定时间间隔为1秒
.sum("count");// 在这里使用sum或者reduce都可以
// 将数据打印到控制台,并设置并行度
windowCounts.print().setParallelism(1); // 这一行代码一定要实现,否则不执行
env.execute("socket window count"); } public static class WordIsCount{
public String word;
public long count; public WordIsCount(String word, long count) {
this.word = word;
this.count = count;
} public WordIsCount() {
} @Override
public String toString() {
return "WordIsCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

最新文章

  1. HTML5 input placeholder 颜色修改
  2. Xcode无法启动ios模拟器的问题
  3. intel vt-x处于禁用状态下如何处理
  4. 在linux的shell里访问一个URL
  5. [转载]用.NET开发的磁力搜索引擎——Btbook.net
  6. Web之CSS开发技巧: CSS 居中大全
  7. LNMP环境出现”504 gateway time-out”错误的解决方案
  8. 【转】ffmpeg中的sws_scale算法性能测试
  9. c#控制台实现post网站登录
  10. Sping IOC
  11. sh, 批量执行Linux命令
  12. ASP.NET Core 实现带认证功能的Web代理服务器
  13. NGUI之实现连连看小游戏
  14. sql字符串根据日期产生日期+自增长标志
  15. 二:Recovery models(恢复模式)
  16. npm安装vue
  17. jupyter notebook添加Anaconda虚拟环境的python kernel
  18. linux笔记_day12_shell编程
  19. _ENV和_G
  20. Bootstrap学习笔记01

热门文章

  1. oo 第一次博客作业
  2. python———day02
  3. nginx的命令
  4. PHP超全局变量$_SERVER分析
  5. [HTML]音乐自动播放(兼容微信)
  6. linux脚本启动应用
  7. windows,mac os与 linux 3系统共存
  8. 如何设置.net控件SplitContainer平均分配
  9. camstart API 服务器负载均衡
  10. 计算系统中互联设备Survey