1. Task的划分

  在flink中,划分task的依据是发生shuffle(也叫redistrubute),或者是并行度发生变化

  • 1.  wordcount为例

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector; import java.util.Arrays; public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = wordAndOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
return value.f1 != null;
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = filtered.keyBy(0);
//SingleOutputStreamOperator并行度为4
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
result.print(); //sink的并行度也是2
env.execute();
}
}

其dataflow图如下所示

socketTextStream是单并行度source,无论你将并行度设置成多少,其并行度都是1,所以到flatMap算子时,并行度就变成了自己设置的4.整个dataflow分成3个Task,9个subTask。

  • 2.  改变1,在flatMap后加上startNewChain(),即开启一个新的链

     按常理来讲,此处的flatMap算子和filter算子间的链是要断开的,但此处自己测试并没有端,暂时还不知道为什么

  • 3. 改变2   在flatMap后加上disableChaining(),即将概算自前后的OperatorChain都断开,将该算子单独划分一个task

可以发现,Task数由3变成4,subTask也相应的编程了13个

注意:此处为什么要使用startNewChain、disablechaining呢?

  我们在进行计算时,会存在一些计算密集型的算子(比如涉及排序的算子),可以将之独立出来,然后将其调度到某些机器上,这个算子就能独享该机器的cpu,内存的资源,提高效率。

总结:Task划分的依据

(1)并行度发生变化时

(2)keyBy() /window()/apply() 等发生 Rebalance 重新分配(即shuffle)

(3)调用startNewChain()方法,开启一个新的算子链

(4)调用diableChaining()方法,即告诉当前算子操作不适用算子链操作

2. 共享资源槽(Sharing slot)

2.1 简单概念

  每一个TaskManager(Worker)是一个JVM进程,它可能会在独立的线程上执行一个或者多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制

  每个task slot表示TaskManager拥有资源的一个固定⼤大⼩小的⼦子集。假如⼀一个TaskManager有三个slot,那么它会将其管理理的内存分成三份给各个slot。资源slot化意味着⼀一个subtask将不不需要跟来⾃自其他job的subtask竞争被管理理的内存,取⽽而代之的是它将拥有⼀一定数量量的内存储备。需要注意的是,这⾥里里不不会涉及到CPU的隔离,slot⽬目前仅仅⽤用来隔离task的受管理理的内存。

  默认情况下,flink允许subtasks共享slots,即使它们是不同tasks的subtasks,只要它们来自同一个job。因此,一个slot可能会负责这个job的某个管道(pipeline)。共享资源槽有如下两个优点:

  • Flink 集群需要与 job 中使用的最高并行度一样多的 slots。若是没有sharing slot,就需要计算作业总共包含多少个 tasks,从而判断集群需要多少slots,非常麻烦。
  • 更好的资源利用率。在没有 slot sharing 的情况下,简单的 subtasks(source/map())将会占用和复杂的 subtasks (window)一样多的资源。

如下:

上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下

由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。

2.2 进一步理解

  SlotSharingGroup是flink中用来实现slot共享的类,它尽可能的让subtasks共享一个slot。保证同一个group的sub-tasks共享一个slots。默认的slot sharing group名称为default,算子也有自己的名字,默认也是default并且算子只能进入与自己名字相同的slot sharing group(即默认一个job下的subtask都可以共享一个slot)。slot sharing group的名字由第一个进入该slot算子的名称而定,比如第一个进入该slot算子的名称为feng,则这个slot sharing group的名称就为feng。

有些时候不想使用资源共享槽,想让算子单独享用某个managerTask中的slot(比如一些计算密集型的算子,比如排序、机器学习等),即防止不合理的共享,这时候可以人为的强制指定operator的共享组。比如someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。

提交一个wordcount程序,并行度为4

代码如下

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector; import java.util.Arrays; public class SharingGroupDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //使用Socket创建DataStream
//socketTextStream是一个非并行的Source,不论并行度设置为多少,总是一个并行
//DataStreamSourc并行度为1
DataStreamSource<String> lines = env.socketTextStream("node-1.51doit.cn", 8888); //DataStream的并行度默认是使用你设置的并行度
//DataStream并行度为4
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
}
}); //keyBy属于shuffle(redistribute)算子
//KeyedStream并行度为4
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//SingleOutputStreamOperator并行度为4
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1); result.print(); env.execute(); }
}

现将并行度沈设置成2,能发现有2个slot是空置的

维持并行度为2,但是修改Task资源槽的名称,让该Task对应的subTask独立出来

此处在sum算子上打标签,即(sum.slotSharingGroup("doit")),sum包括其后面的算子名称都变为doit,但此处keyed为什么会变doit就不清楚了。

3.Flink的容错

3.1 State状态

  Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做state。state可以是多种类型的,默认是保存在JobManager的内存中,也可以保存到TaskManager本地文件系统或HDFS这样的分布式文件系统。

3.2 StateBackEnd

  用来保存的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存本地系统或HDFS这样的分布式文件系统。

3.3 CheckPointing

  Flink实时计算为了容错,可以将中间数据定期保存下来,这种定期触发保存中间结果的机制叫CheckPointing,CheckPointing是周期性执行的,具体的过程是JobManager定期向TaskManager中的SubTask发送RPC消息,SubTask将其计算的State保存到StateBackEnd中,并且向JobManager响应Checkpoint是否成功。如果程序出现异常或者重启,TaskManager中的SubTask可以从上一次成功的CheckPointing的state恢复,具体见下图

注意:JobManager只有在接收到所有subTask将计算结果的state成功保存到StateBackEnd的消息后,才会标记checkpoint成功。

3.4 重启策略

  Flink实时计算程序为了容错,需要开启CheckPointing,一旦开启CheckPointing,如果没有重启策略,默认的重启策略是无限重启,也可以设置成其他的重启策略,如:重启固定次数以及重启间的间隔时间

3.5 CheckPointingMode

  • exactly-once

  精确一次性语义,可以保证数据消费且消费一次,但是要结合对应的数据源,比如Kafla支持exactly-once

  • ar-least-once

  至少消费一次,可能会重复消费,但是效率要比exactly-once高

4  state案例

4.1 简单概念

(1)state概念:

  State是Flink计算过程的中间结果和状态信息,为了容错,必须把状态持久化到一个外部系统中去

(2)state的分类

值得看的博客:https://www.lizenghai.com/archives/46460.html(下图来自此博客)

  • KeyState:调用keyBy方法后,每个分区中相互独立的state
  • Operatecast state:没有分组,每一个subTask自己维护一个状态

(3)state的使用

  • 先定义一个状态描述器
  • 通过context获取state
  • 对数据处理后要更新数据

案例1:重启策略

RestartStrages

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector; import java.util.Arrays; public class RestartStrages {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
// 开启checkpoint,索九checkpoint一次
env.enableCheckpointing(5000);
// 默认的重启策略就是无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
if (word.equals("feng")) {
int i = 1 / 0;
}
return Tuple2.of(word, 1);
}
});
//keyBy属于shuffle(redistribute)算子
//KeyedStream并行度为4
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//SingleOutputStreamOperator并行度为4
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
result.print();
env.execute();
}
}

发现程序中断后会重启,并且重启后,前面的计算结果还能被复用(sum算子内部实现了state的保存)

案例2:能否自己实现sum算子,既能正确的累加单词的次数,还能在程序出现异常时容错

MyHashMapDemo

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; public class MyHashMapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//开启checkpoint
env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
if (word.equals("laoduan")) {
int i = 1 / 0; //模拟出现错误,任务重启
}
return Tuple2.of(word, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private HashMap<String, Integer> state = new HashMap<>();
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
String currentKey = input.f0;
Integer currentCount = input.f1;
Integer historyCount = state.get(currentKey);
if (historyCount == null) {
historyCount = 0;
}
int sum = historyCount + currentCount; //累加
//更新状态数据(我自己实现的计数器)
state.put(currentKey, sum);
return Tuple2.of(currentKey, sum); //输出结果
}
});
result.print();
env.execute();
}
}

此种定义hashMap的形式只能正确的累加单词的次数,并不能实现容错。

案例3:使用keyState实现sum,能满足需求

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//开启checkpoint
env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
if (word.equals("laoduan")) {
int i = 1 / 0; //模拟出现错误,任务重启
}
return Tuple2.of(word, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private transient ValueState<Integer> countState; //在构造器方法之后,map方法之前执行一次
@Override
public void open(Configuration parameters) throws Exception {
//初始化状态或恢复状态
//使用状态的步骤:
//1.定义一个状态描述器,状态的名称,存储数据的类型等
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
"wc-state",
Integer.class
);
//2.使用状态描述从对应的StateBack器获取状态
countState = getRuntimeContext().getState(stateDescriptor);
} @Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
String currentKey = input.f0;
Integer currentCount = input.f1;
Integer historyCount = countState.value();
if(historyCount == null) {
historyCount = 0;
}
int sum = historyCount + currentCount;
//更新state
countState.update(sum);
return Tuple2.of(currentKey, sum);
}
}); result.print(); env.execute();

 案例4:OperatorState

自定义Source

MyAtLeastOnceSource

package cn._51doit.flink.day03;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.RandomAccessFile; public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction { private transient ListState<Long> listState; private boolean flag = true;
private Long offset = 0L; //在构造方法之后,open方法之前执行一次,用于初始化Operator State或恢复Operator State
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
//定义一个状态描述器
ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>(
"offset-state",
Long.class
);
//listState中存储的就是一个long类型的数值
listState = context.getOperatorStateStore().getListState(stateDescriptor); //从ListState中恢复数据
if(context.isRestored()) {
for (Long first : listState.get())
offset = first;
}
} //snapshotState方法是在checkpoint时,会调用
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//将上一次checkpoint的数据清除
listState.clear();
//将最新的偏移量保存到ListState中
listState.add(offset);
} @Override
public void run(SourceContext<String> ctx) throws Exception {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
RandomAccessFile raf = new RandomAccessFile("/Users/xing/Desktop/data/" + taskIndex + ".txt", "r");
//从指定的位置读取数据
raf.seek(offset);
//获取一个checkpoint的锁
final Object checkpointLock = ctx.getCheckpointLock();
while (flag) {
String line = raf.readLine();
if(line != null) {
//获取最新的偏移量
synchronized (checkpointLock) {
line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
offset = raf.getFilePointer();
ctx.collect(taskIndex + ".txt => " + line);
}
} else {
Thread.sleep(1000);
}
} } @Override
public void cancel() {
flag = false;
}
}

MyAtLeastOnceSourceDemo

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MyAtLeastOnceSourceDemo { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(30000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); //自定义一个多并行的Source
DataStreamSource<String> lines1 = env.addSource(new MyAtLeastOnceSource()); DataStreamSource<String> lines2 = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<String> error = lines2.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
if (line.startsWith("error")) {
int i = 1 / 0;
}
return line;
}
}); DataStream<String> union = lines1.union(error); union.print(); env.execute(); }
}

两次checkpoint之间的数据会被重复读,所以是AtListOnce

  

MyHashMaoDemo

最新文章

  1. 再次思考 classpath 环境变量 等
  2. [NHibernate]存储过程的使用(二)
  3. android上的图片占用内存问题
  4. sql-char和varchar,nvarchar的区别
  5. 【前端学习】sublime开启vim模式
  6. java的几种for循环方法
  7. 表达式:使用API创建表达式树(4)DynamicExpression
  8. Talking about Health
  9. 一周一个小demo — 前端后台的交互实例
  10. AngularJS高级程序设计读书笔记 -- 控制器篇
  11. Kindeditor编辑插件的使用
  12. zookeeper 环境搭建
  13. 使用go-template自定义kubectl get输出
  14. python全栈开发笔记---------函数
  15. Centos 7 64位 minimal 最小化安装的系统中静默安装oracle 11g r2
  16. ambari安装 QA
  17. JEECG-P3首个开源插件诞生!CMS网站插件 Jeecg-p3-biz-cms1.0版本发布!
  18. Linux 变量的使用
  19. Vue + Element UI 实现权限管理系统 前端篇(八):管理应用状态
  20. django学习--2 模板

热门文章

  1. Bzoj P2054 疯狂的馒头 | 并查集
  2. nod_1009 数字1的数量(分析题)
  3. DeWeb和WebXone的区别
  4. Centos7 升级过内核 boot分区无法挂载修
  5. centos安装pm2报错
  6. selenium2.x 与 selenium3.x 最大区别
  7. 12组-Alpha冲刺-2/6
  8. Docker Compose 容器编排 NET Core 6+MySQL 8+Nginx + Redis
  9. 最大连续子数组和--dp
  10. Android LayoutInflater(布局填充器)