ID_Mapping与Oneid的作用

大神告诉我们Oneid能用来做什么

输入数据源格式样例

样例数据图1

整理后数据图2

实现原理

联通图

生成最大联通图

留下耀总的数据给大家练习了

当日代码生成


import java.util.UUID
import cn.scfl.ebt.util.UtilTool
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
import org.spark_project.jetty.util.StringUtil /**
* @Author: baierfa
* @version: v1.0
* @description: id_mapping 单天实现暂时不加入多天滚动计算 多天计算需要看另一文件YeAndTodayGraphx
* @Date: 2020-07-05 10:24
*/
object TodayGraphx {
def main(args: Array[String]): Unit = { //声明环境变量
val spark = SparkSession
.builder
.appName(s"${this.getClass.getName}")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val todayPath = "D:\\TESTPATH\\inputpath\\today\\dt=202-07-13"
val outPutPath="D:\\TESTPATH\\outtpath\\today\\dt=202-07-13"
val edgeoutPutPath="D:\\TESTPATH\\edgepath\\today\\dt=202-07-13" todayIdMapping(spark,sc,todayPath,outPutPath,edgeoutPutPath)
spark.close()
} /**
* 功能描述: <输入今天数据路径 按照文件形式输出到指定路径中 并推出今日图计算点与边集合总个数>
* 〈使用今日输入数据转换成唯一数字值 图计算之后再将数值转换回明文 生成唯一uuid〉
* @Param: [spark, sc, todayPath, outPutPath, edgeoutPutPath]
* @Return: void
* @Author: baierfa
* @Date: 2020-08-05 10:18
*/
def todayIdMapping(spark:SparkSession,sc: SparkContext,todayPath: String,outPutPath:String ,edgeoutPutPath:String )={
// 一、数据加载 // 今天数据加载
val todaydf = spark.read.textFile(todayPath)
// 二、处理数据为生成图做准备
// 生成今日点集合
val to_veritx = todaydf.rdd.flatMap(line => {
// 将数据源进行分割
val field = line.split("\t")
//把数据转换成(long,值)要想long值不重复 可以使用hashcode
//本文用于生产环境 使用了md5加密 详细文件请看其他篇章
for (ele <- field if StringUtil.isNotBlank(ele)&&(!"\\N".equals(ele))) yield (UtilTool.getMD5(ele), ele)
})
// 生成今日边集合
val to_edges = todaydf.rdd.flatMap(line => {
// 将数据源进行分割
val field = line.split("\t")
//将数据转换 将值转换成边 用于连线 连线值这边用""想更换看个人意愿
for (i <- 0 to field.length - 2 if StringUtil.isNotBlank(field(i))&&(!"\\N".equals(field(i)))
;j <- i + 1 to field.length - 1 if StringUtil.isNotBlank(field(j))&&(!"\\N".equals(field(j))))
yield Edge(UtilTool.getMD5(field(i)), UtilTool.getMD5(field(j)), "")
})
// 在数据不做多次etl数据操作下可以使用共同出现次数来判定是否归并为同一个用户
// 例如 合并起来用户 mobile 与 device_id 同时出现两次以上才被记入同一个
// .map(edge => (edge, 1))
// .reduceByKey(_ + _)
// .filter(tp => tp._2 > 2)
// .map(tp => tp._1) // 三、汇总各个节点使用图计算生成图
// 单将数据重新赋值适用于以后多数据源合并
val veritx = to_veritx
val edges = to_edges
// 开始使用点集合与边集合进行图计算训练
val graph = Graph(veritx, edges)
// 四、生成最大连通图
val graph2 = graph.connectedComponents()
val vertices = graph2.vertices
// 五、将最小图计算值替换成uuid
val uidRdd = vertices.map(tp => (tp._2, tp._1))
.groupByKey()
.map(tp => (StringUtil.replace(UUID.randomUUID().toString, "-", ""), tp._2))
// 对点与边进行统计作为记录输出 可以用于后期统计检查生成报表警报数据是否异常
val uu = veritx.map(lin=>("vertices",1)).union(edges.map(lin=>("edges",1))).reduceByKey(_ + _)
.map(tp=>tp._1+"\t"+tp._2)
// 将现有的数据转换成铭文识别后展示
// 将各个点的数据汇总到driver端
val idmpMap = veritx.collectAsMap()
// 按照map方式广播出去做转换
val bc = sc.broadcast(idmpMap)
// 将数据的id转换成明文
val ss = uidRdd.mapPartitions(itemap => {
val vert_id_map = bc.value
itemap.map(tp => {
//从广播变量中获取id值的信息并转换
val t2 = for (ele <- tp._2) yield vert_id_map.get(ele).get
//按照将要输出的数据格式进行排版 (uuid mobile1,mobile2,mobile3,device_id1,device_2)
tp._1+"\t"+t2.mkString(",")
})
})
// 数据输出
ss.saveAsTextFile(outPutPath)
uu.saveAsTextFile(edgeoutPutPath)
}
}

引用jar包

 <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.4.0</spark.version>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency> <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency> <dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency> </dependencies>
启动命令
spark-submit \
--class IdMapping \
--master yarn \
--deploy-mode cluster \
--num-executors 40 \
--driver-memory 4g \
--executor-memory 6g \
--executor-cores 3 \
--conf spark.default.parallelism=400 \
--conf spark.shuffle.memoryFraction=0.3 \
ID_Mapping_Spark.jar \
hdfs://user/hive/oneid_data/data_origindata_di/dt=2020-07-13 \
hdfs://user/hive/oneid_data/data_sink_id_mapping/dt=2020-07-14 \
hdfs://user/hive/oneid_data/data_sink_edge_vertex/dt=2020-07-14

辛苦码字如有转载请标明出处谢谢!——拜耳法

都看到这里了非常感谢!
本片章暂未完结 有疑问请+vx :baierfa

PS:我要在下一章在我心中不完美的你打一个淋漓尽致的标签

将大神挂在那片白云下: oneid与用户标签之间的相互打通 实现用户标签

最新文章

  1. BZOJ3679 : 数字之积
  2. sql操作table
  3. js 写成类的形式 js 静态变量 js方法 属性 json类
  4. SQL Server中批量替换数据
  5. jQuery无缝间歇向上滚动
  6. CyclicBarrier的介绍和使用
  7. 华为u8800怎样root?
  8. 【常用小命令】解决windows下有些文件文件名识别不了导致删除不了的问题
  9. VirtualBox 上安装CentOS 6.5
  10. Android开发笔记--hello world 和目录结构
  11. Apache .htaccess语法之RewriteRule
  12. Struts2的概述和入门
  13. HttpClient请求详解
  14. spring boot系列03--spring security (基于数据库)登录和权限控制(上)
  15. Java实现贪吃蛇游戏【代码】
  16. Android 开发 实现文本搜索功能
  17. 安装和使用Docker(Windows7)
  18. .net 简单任务调度平台安装简要说明
  19. cvLogPolar函数详解
  20. Android TV开发 焦点控制

热门文章

  1. 10-Python文件操作
  2. Lucas定理 &amp; Catalan Number &amp; 中国剩余定理(CRT)
  3. 利用宝塔和rainloop搭建咱的邮箱
  4. Linux安装禅道项目管理软件
  5. thymeleaf js绑定多个变量参数
  6. 【论文笔记】Self-Supervised GAN :辅助性旋转损失的自监督生成式对抗网络
  7. JVM系列之:String.intern和stringTable
  8. 老男孩武老师的Django笔记
  9. Linux最常用的基本操作复习
  10. Django学习路6_修改数据库为 mysql ,创建mysql及进行迁徙