sample采样倾斜key并单独进行join代码
2024-08-28 04:35:22
/**
* sample采样倾斜key单独进行join
*/ JavaPairRDD<Long, String> sampledRDD = userid2PartAggrInfoRDD.sample(false, 0.1, 9); JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair( new PairFunction<Tuple2<Long,String>, Long, Long>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
} }); JavaPairRDD<Long, Long> computedSampledRDD = mappedSampledRDD.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
} }); JavaPairRDD<Long, Long> reversedSampledRDD = computedSampledRDD.mapToPair( new PairFunction<Tuple2<Long,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
} }); final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2; JavaPairRDD<Long, String> skewedRDD = userid2PartAggrInfoRDD.filter( new Function<Tuple2<Long,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
} }); JavaPairRDD<Long, String> commonRDD = userid2PartAggrInfoRDD.filter( new Function<Tuple2<Long,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
} }); JavaPairRDD<String, Row> skewedUserid2infoRDD = userid2InfoRDD.filter( new Function<Tuple2<Long,Row>, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
} }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() { private static final long serialVersionUID = 1L; @Override
public Iterable<Tuple2<String, Row>> call(
Tuple2<Long, Row> tuple) throws Exception {
Random random = new Random();
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>(); for(int i = 0; i <; i++) {
int prefix = random.nextInt(100);
list.add(new Tuple2<String, Row>(prefix + "_" + tuple._1, tuple._2));
} return list;
} }); JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair( new PairFunction<Tuple2<Long,String>, String, String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
} }).join(skewedUserid2infoRDD).mapToPair( new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<Long, Tuple2<String, Row>> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
long userid = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, Row>>(userid, tuple._2);
} }); JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(userid2InfoRDD); JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2); JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = joinedRDD.mapToPair( new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, String> call(
Tuple2<Long, Tuple2<String, Row>> tuple)
throws Exception {
String partAggrInfo = tuple._2._1;
Row userInfoRow = tuple._2._2; String sessionid = StringUtils.getFieldFromConcatString(
partAggrInfo, "\\|", Constants.FIELD_SESSION_ID); int age = userInfoRow.getInt(3);
String professional = userInfoRow.getString(4);
String city = userInfoRow.getString(5);
String sex = userInfoRow.getString(6); String fullAggrInfo = partAggrInfo + "|"
+ Constants.FIELD_AGE + "=" + age + "|"
+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
+ Constants.FIELD_CITY + "=" + city + "|"
+ Constants.FIELD_SEX + "=" + sex; return new Tuple2<String, String>(sessionid, fullAggrInfo);
} });
最新文章
- iframe关于滚动条的去除和保留
- sqlserver 多库查询 sp_addlinkedserver使用方法(添加链接服务器)
- Android ——单元测试
- Mongodb3.0.6副本集+分片学习笔记
- 《30天自制操作系统》03_day_学习笔记
- C# 多线程网络通信
- Android开发中,那些让你相见恨晚的方法、类或接口
- c++ union学习
- zabbix-check of pre-requisites
- transform的2D部分,嗯…就这个标题了。
- org.xml.sax.SAXParseException; lineNumber: 2; columnNumber: 6; 不允许有匹配 ";[xX][mM][lL]"; 的处理指令目标。
- RESTful相关理解
- 16.xml
- python赋值和生成器
- $.contents().find设置的data在iframe子页面无法获取值
- python的Web框架,中间件middleware及djangoAdmin
- lnmp----------lnmp集成环境使用lnmp安装包安装lnmp集成环境的步骤
- 关于右键属性与du -sh显示的文件大小不一致的解决
- Fiddler 4 抓包(APP HTTPS )
- WinForm 弹窗