Flink支持广播变量,就是将数据广播到具体的taskmanager上,数据存储在内存中,这样可以减缓大量的shuffle操作;

比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

注意因为广播变量是要把dataset广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题

Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的

Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量
/**
* Created by angel;
*/
object BrodCast {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//TODO data2 join data3的数据,使用广播变量完成
val data2 = new mutable.MutableList[(Int, Long, String)]
data2.+=((1, 1L, "Hi"))
data2.+=((2, 2L, "Hello"))
data2.+=((3, 2L, "Hello world"))
val ds1 = env.fromCollection(Random.shuffle(data2))
val data3 = new mutable.MutableList[(Int, Long, Int, String, Long)]
data3.+=((1, 1L, 0, "Hallo", 1L))
data3.+=((2, 2L, 1, "Hallo Welt", 2L))
data3.+=((2, 3L, 2, "Hallo Welt wie", 1L))
val ds2 = env.fromCollection(Random.shuffle(data3))
//todo 使用内部类RichMapFunction,提供open和map,可以完成join的操作
val result = ds1.map(new RichMapFunction[(Int , Long , String) , ArrayBuffer[(Int , Long , String , String)]] { var brodCast:mutable.Buffer[(Int, Long, Int, String, Long)] = null override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
//asScala需要使用隐式转换
brodCast = this.getRuntimeContext.getBroadcastVariable[(Int, Long, Int, String, Long)]("ds2").asScala
}
override def map(value: (Int, Long, String)):ArrayBuffer[(Int , Long , String , String)] = {
val toArray: Array[(Int, Long, Int, String, Long)] = brodCast.toArray
val array = new mutable.ArrayBuffer[(Int , Long , String , String)]
var index = 0 var a:(Int, Long, String, String) = null
while(index < toArray.size){
if(value._2 == toArray(index)._5){
a = (value._1 , value._2 , value._3 , toArray(index)._4)
array += a
}
index = index + 1
}
array
}
}).withBroadcastSet(ds2 , "ds2")
println(result.collect())
}
}

最新文章

  1. 【NLP】揭秘马尔可夫模型神秘面纱系列文章(一)
  2. 自定义iOS7导航栏背景,标题和返回按钮文字颜色
  3. MySQL知识树-查询语句
  4. gitlab配置邮件通知功能操作记录
  5. SharedPreference写入-读取
  6. Java 序列化Serializable接口
  7. 智能设备逆向工程之外部Flash读取与分析篇
  8. VMWARE + CENTOS在windows下配置cocos2d-x android开发环境
  9. iOS:WebKit内核框架的应用与解析
  10. java作用域public ,private ,protected 及不写时的区别(转)
  11. java--vo
  12. C++11下的线程池以及灵活的functional + bind + lamda
  13. UVA10304---(区间DP)
  14. 如何让ios app支持32位和64位?
  15. jdbc操作步骤和preparedStatment相比Statment的好处
  16. 拾遗----javascript一些实用方法
  17. 网页偶现性崩溃-chrome
  18. saltstack SLS
  19. jQuery two way bindings(双向数据绑定插件)
  20. C#基础巩固(3)-Linq To XML 读取XML

热门文章

  1. ansible笔记(4):常用模块之文件操作
  2. 自定义admin(self_admin)
  3. python 安装mysql报错
  4. 06 元祖 字典 集合set
  5. Poco::Crypto--加解密_RSA
  6. centos6.5 python命令行模式左右建无法使用
  7. oracle数据库定时任务dbms_job的用法详解
  8. mongoDB基础使用
  9. vue.js----之router详解(一)
  10. Python操作MySQL案例