本文来自官网: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#physical-partitioning

Flink还通过以下函数对转换后的数据精确流分区进行低级控制(如果需要)。

1、自定义分区

  使用用户定义的分区程序为每个元素选择目标任务。

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

如简单的hash 分区(下面的实例不是官网):

val input = env.addSource(source)
.map(json => {
// json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"}
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
.setParallelism(1)
.partitionCustom(new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = {
// numPartitions 是下游算子的并发数
key.hashCode % numPartitions
}
}, "id")
.map(l => {
LateDataEvent(l.key, l.id, l.amt, l.createTime)
})
.setParallelism(3)

注:key 是传入的field 的类型

2、随机分区

根据均匀分布随机分配元素(类似于: random.nextInt(5),0 - 5 在概率上是均匀的)

dataStream.shuffle()

源码:

@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private Random random = new Random(); @Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 传入下游分区数
return random.nextInt(numberOfChannels);
} @Override
public StreamPartitioner<T> copy() {
return new ShufflePartitioner<T>();
} @Override
public String toString() {
return "SHUFFLE";
}
}

3、均匀分区  rebalance

分区元素循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。

dataStream.rebalance()

源码:

public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels); nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
} @Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 轮训的发往下游分区
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
} public StreamPartitioner<T> copy() {
return this;
} @Override
public String toString() {
return "REBALANCE";
}
}

4、rescale

分区元素循环到下游操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全重新平衡,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。
上游操作发送元素的下游操作的子集取决于上游和下游操作的并行度。例如,如果上游操作具有并行性2并且下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。另一方面,如果下游操作具有并行性2而上游操作具有并行性4,那么两个上游操作将分配到一个下游操作,而另外两个上游操作将分配到其他下游操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。

dataStream.rescale()

源码:

public class RescalePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private int nextChannelToSendTo = -1; @Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
} public StreamPartitioner<T> copy() {
return this;
} @Override
public String toString() {
return "RESCALE";
}
}

很遗憾这段代码只能看出,上游分区往下游分区发的时候,每个上游分区内部的数据是轮训发到下游分区的(没找到具体分配的地方,从这段代码debug,一直往上,找到分区出现在 RuntimeEnvironment 的对象里面,找不具体分配的地方)。

5、广播

向每个分区广播元素。

dataStream.broadcast()

最新文章

  1. SAMSUNG某型号一千短信成功记录!对比其他软件恢复不成功的案列!
  2. php和syslog
  3. 《OOC》笔记(3)——C语言变长参数va_list的用法
  4. windows上安装numpy,scipy
  5. jQuery 预习视频
  6. 配置 Hdp 4 Window 中的一些问题
  7. Windows phone 8 学习笔记(5) 图块与通知(转)
  8. ext2磁盘布局
  9. UVALive 4959 Jumping monkey
  10. php对数组中指定键值排序
  11. Android学习-应用程序管理
  12. Android L(5.0)源码之手势识别onTouchEvent
  13. 偶遇event.target
  14. Mybatis按顺序获取数据
  15. ext window嵌jsp页面自适应
  16. 通达OA数据库优化方案之_历史数据清理
  17. 开窗函数 函数() OVER()
  18. 91平台iOS接入demo
  19. JavaScript学习总结(四)——逻辑OR运算符详解
  20. Win7系统安装 Photoshop CC 中文完全破解版

热门文章

  1. Python之sort()函数详解
  2. bzoj 4128: Matrix ——BSGS&amp;&amp;矩阵快速幂&amp;&amp;哈希
  3. [React] Use the React Effect Hook in Function Components
  4. js判断radio选中状态
  5. PHP 源码安装常用配置参数和说明
  6. shell脚本之文件操作
  7. LibreOJ #6191. 「美团 CodeM 复赛」配对游戏
  8. luogu P1160 队列安排
  9. 【概率论】5-7:Gama分布(The Gamma Distributions Part II)
  10. Redash(开源轻量级商业智能) 生产环境部署及实践 (without docker)