1. package com.icklick.spark.wordSegment
  2. import org.apache.log4j.{ Level, Logger }
  3. import org.apache.spark.{ SparkConf, SparkContext }
  4. import  com.iclick.spark.wordSegment.util.CounterMap
  5. import  scala.collection.mutable.ArrayBuffer
  6. import com.google.common.collect.Maps
  7. import  java.text.SimpleDateFormat
  8. import scala.collection.JavaConversions._
  9. import scala.collection.JavaConverters._
  10. import scala.collection.mutable.Map
  11. import  com.iclick.spark.wordSegment.util.AtomsUitl
  12. import  org.apache.spark.sql.SQLContext
  13. import org.apache.spark.sql.functions._
  14. import  org.apache.spark.sql.SaveMode
  15. import com.iclick.spark.wordSegment.util.ConterHashSet
  16. import org.apache.commons.lang.StringUtils
  17. import com.mysql.jdbc.Driver
  18. ///tmp/yuming/webtable/ds=16-04-17   hadoop数据目录
  19. object WordSegment{
  20. def main(args: Array[String]): Unit = {
  21. //关闭一些不必要的日志
  22. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  23. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  24. //master
  25. if (args.length < 5) {
  26. System.err.println("Usage: path ,maxLen ,pmi, info,shuffle_count")
  27. System.exit(1)
  28. }
  29. val path=args(0).toString
  30. val maxLen=args(1).toInt
  31. val pmi=args(2).toDouble
  32. val  info=args(3).toDouble
  33. val  shuffle_count=args(4).toInt
  34. val save_path_result=if(args.length>=6){ args(5).toString} else "/tmp/wilson/"
  35. val conf = new SparkConf().set("spark.driver.maxResultSize","10g").
  36. set("spark.sql.shuffle.partitions",s"${shuffle_count}").set("spark.network.timeout","850s").
  37. set("spark.shuffle.compress","true").set("spark.shuffle.spill.compress","true").set("spark.shuffle.manager","sort")
  38. if (System.getProperty("local") != null) {
  39. conf.setMaster("local").setAppName("wordSegname")
  40. }
  41. val sc = new SparkContext(conf)
  42. val  sqlContext=new  SQLContext(sc)
  43. //local
  44. /* val conf = new SparkConf().setAppName("wordSegname").setMaster("local[4]").
  45. set("spark.sql.shuffle.partitions","10").set("spark.network.timeout","30s")
  46. .set("spark.shuffle.compress","true").set("spark.shuffle.spill.compress","true")
  47. .set("spark.shuffle.manager","sort")
  48. val sc = new SparkContext(conf)
  49. val  sqlContext=new SQLContext(sc)
  50. val path="D:\\wilson.zhou\\Downloads\\西游记.txt"
  51. val maxLen=6
  52. val path1="D:\\temp\\text.txt"
  53. val pmi=0
  54. val  info=0
  55. val save_path_result="/tmp/wilson/"*/
  56. //    val word=scala.io.Source.fromFile("D:\\wilson.zhou\\Downloads\\红楼梦.txt").getLines().mkString("")
  57. val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd:HH:mm:ss")
  58. var start=sdf.format(System.currentTimeMillis())
  59. val word1=sc.textFile(path).map{x=>
  60. val x_filter=x.replaceAll("[" + AtomsUitl.stopwords + "]", " ").replaceAll("\\p{Punct}", " ").replaceAll("\\pP", " ")
  61. .replaceAll(" ", " ").replaceAll("\\p{Blank}", " ").replaceAll("\\p{Space}", " ").replaceAll("\\p{Cntrl}", " ")
  62. x_filter
  63. }
  64. val    sum_document=word1.count()
  65. val word_document=word1.zipWithIndex.filter { x => !StringUtils.isBlank(x._1) }.flatMap{x=>
  66. val  arr= ArrayBuffer[(String,Int)]()
  67. val  line=x._1.split(" ")
  68. for(i<-line){
  69. arr+=((i,x._2.toInt))
  70. }
  71. arr }.map{x=>(x._1.trim,x._2)}.filter(x=> !StringUtils.isBlank(x._1))
  72. println("Calculate the iterms  documnt")
  73. val word_document_caculate= word_document.map{x=>("$"+ x._1 +"$",x._2)}.flatMap{
  74. x=>   var  arr=ArrayBuffer[(String,Int)]()
  75. for( y<- 1 to AtomsUitl.len(x._1)-2){
  76. arr+=((AtomsUitl.substring(x._1,y, Math.min(maxLen+y,AtomsUitl.len(x._1))),x._2))
  77. }
  78. arr
  79. }.sortBy(x=>x._1)
  80. println("documnet   caculate  will  start")
  81. val word_document_result=word_document_caculate.map{
  82. x=>
  83. val first=AtomsUitl.substring(x._1, 0, 1)
  84. (first,x._1,x._2)
  85. }.groupBy((f:(String,String,Int))=>f._1).map{
  86. x=>x._2
  87. }.flatMap{
  88. x=>
  89. val documnet=Maps.newHashMap[String,ConterHashSet]
  90. var  arrBuff=ArrayBuffer[(String,Int)]()
  91. for(curr <- x){
  92. for( ii<-  1 to AtomsUitl.len(curr._2)-1){
  93. val  w1=AtomsUitl.substring(curr._2, 0,ii)
  94. if(documnet.containsKey(w1)){
  95. documnet.get(w1).addelment(curr._3.asInstanceOf[java.lang.Integer])
  96. }else{
  97. val cm=new  ConterHashSet();
  98. cm.addelment(curr._3.asInstanceOf[java.lang.Integer])
  99. documnet.put(w1,cm)
  100. }
  101. }
  102. }
  103. val documnet_iter=documnet.keySet.iterator
  104. while(documnet_iter.hasNext()){
  105. val w=documnet_iter.next()
  106. val freq=documnet.get(w).getsize()
  107. arrBuff+=((w,freq))
  108. }
  109. arrBuff
  110. }
  111. //    word_document_result.take(20).foreach(println)
  112. //    println("word_document_result's count:"+word_document_result.count())
  113. println("information entropy and information")
  114. val word=word1.flatMap{x=>
  115. val  line=x.split(" ")
  116. line
  117. }.filter(x=> !StringUtils.isBlank(x))
  118. //  //计算左信息熵做准备
  119. println("Calculate the left word information entropy and information entropy .....")
  120. val  wordleft=word.map(x=>AtomsUitl.reverse(x)).map{x=>"$"+ x +"$"}.flatMap{
  121. x=>   var  arr=ArrayBuffer[String]()
  122. for( y<- 1 to AtomsUitl.len(x)-2){
  123. //             arr+=x.substring(y, Math.min(maxLen + y,  x.length()))
  124. arr+=AtomsUitl.substring(x,y, Math.min(maxLen + y,  AtomsUitl.len(x)))
  125. }
  126. arr
  127. }.sortBy(x=>x)
  128. val wordleft_caculate= wordleft.map{
  129. s=>
  130. //            val first=s.substring(0, 1).toString()
  131. val  first=AtomsUitl.substring(s, 0,1).toString
  132. (first,s)
  133. }.groupBy((f:(String,String))=>f._1).map{
  134. x=>x._2
  135. }.flatMap{
  136. x=>
  137. val stat = Maps.newHashMap[String, CounterMap]()
  138. var  arrBuff=ArrayBuffer[(String,Double)]()
  139. for(curr <- x){
  140. for( ii<-  1 to AtomsUitl.len(curr._2)-1){
  141. //                              val w = curr._2.substring(0,ii)
  142. val w = AtomsUitl.substring(curr._2, 0, ii)
  143. //                              val suffix = curr._2.substring(ii).substring(0, 1)
  144. val suffix= AtomsUitl.substring(AtomsUitl.substring(curr._2,ii),0,1)
  145. if (stat.containsKey(w)) {
  146. stat.get(w).incr(suffix)
  147. } else {
  148. val cm = new CounterMap()
  149. cm.incr(suffix)
  150. stat.put(w, cm)
  151. }
  152. }
  153. }
  154. var  iterator_stat=stat.keySet().iterator()
  155. while(iterator_stat.hasNext()){
  156. var w=iterator_stat.next()
  157. var  cm = stat.get(w);
  158. var freq = 0
  159. var re = 0.0
  160. var  cm_iter=cm.countAll().keySet().iterator()
  161. while(cm_iter.hasNext()) {
  162. freq += cm.get(cm_iter.next())
  163. }
  164. var  cm_iter1=cm.countAll().keySet().iterator()
  165. while(cm_iter1.hasNext()) {
  166. var p = cm.get(cm_iter1.next()) * 1.0 / freq
  167. re += -1 * Math.log(p) * p
  168. }
  169. //                                  print("freq的值是:"+freq+"    ")
  170. //                                  println("re的值是:"+re)
  171. arrBuff+=((AtomsUitl.reverse(w),re))
  172. }
  173. arrBuff
  174. }
  175. //      wordleft_caculate.take(20).foreach(println)
  176. //      println("左邻信息个个数是:"+wordleft_caculate.count())
  177. //      println(wordleft_caculate.map(x=>x._1).distinct().count())
  178. //     println("wordleft'coutn----->"+wordleft.count)
  179. //计算右信息熵做准备
  180. println("Calculate the  right word information entropy and information entropy .....")
  181. val wordright=word.map{x=>"$"+ x +"$"}.flatMap{
  182. x=>
  183. var  arr=ArrayBuffer[String]()
  184. //         AtomsUitl.len(x)-2
  185. for( y<- 1 to AtomsUitl.len(x)-2){
  186. //             arr+=x.substring(y, java.lang.Math.min(maxLen + y,  x.length()))
  187. arr+=(AtomsUitl.substring(x,y,Math.min(maxLen+y,AtomsUitl.len(x))))
  188. }
  189. arr
  190. }.sortBy(x=>x)
  191. //计算右邻字信息熵
  192. val wordright_caculate=wordright.map{
  193. s=>
  194. //            val first=s.substring(0, 1).toString()
  195. val  first=AtomsUitl.substring(s, 0,1).toString()
  196. (first,s)
  197. }.groupBy((f:(String,String))=>f._1).map{
  198. x=>x._2
  199. }.flatMap{
  200. x=>
  201. var stat = Maps.newHashMap[String, CounterMap]()
  202. var  arrBuff=ArrayBuffer[(String,Int,Double)]()
  203. for(curr <- x){
  204. for(i<-  1 to AtomsUitl.len(curr._2)-1){
  205. //                              val w = curr._2.substring(0, i)
  206. val w=AtomsUitl.substring(curr._2,0,i)
  207. //                              val suffix = curr._2.substring(i).substring(0, 1)
  208. val suffix=AtomsUitl.substring(AtomsUitl.substring(curr._2, i), 0,1).toString
  209. if (stat.containsKey(w)) {
  210. stat.get(w).incr(suffix);
  211. } else {
  212. val cm = new CounterMap();
  213. cm.incr(suffix);
  214. stat.put(w, cm);
  215. }
  216. }
  217. }
  218. var  iterator_stat=stat.keySet().iterator()
  219. while(iterator_stat.hasNext()){
  220. var w=iterator_stat.next()
  221. var  cm = stat.get(w);
  222. var freq = 0
  223. var re = 0.0
  224. var  cm_iter=cm.countAll().keySet().iterator()
  225. while(cm_iter.hasNext()) {
  226. freq += cm.get(cm_iter.next())
  227. }
  228. var  cm_iter1=cm.countAll().keySet().iterator()
  229. while(cm_iter1.hasNext()) {
  230. var p = cm.get(cm_iter1.next()) * 1.0 / freq
  231. re += -1 * Math.log(p)  * p
  232. }
  233. //                          print("w的值是:"+w+" ")
  234. //                          print("freq的值是:"+freq+" ")
  235. //                          println("re的值是"+re)
  236. arrBuff+=((w,freq,re))
  237. }
  238. arrBuff
  239. }
  240. //    println("计算右邻信息前20条")
  241. //    wordright_caculate.take(20).foreach(println)
  242. //    println("右信息表的总共个数:"+wordright_caculate.count())
  243. //    wordright_caculate.
  244. //左右合并开始
  245. println(" Merge  will begin to  calculated..............")
  246. import sqlContext.implicits._
  247. /*  val  word_caculate_total1=wordright_caculate.union(wordleft_caculate).sortBy(x=>x).groupBy((f:(String,Int,Double))=>f._1,20).map(x=>x._2)
  248. val  word_caculate_total= word_caculate_total1.map{
  249. x=>
  250. val  hashtable=new java.util.Hashtable[String,String]()
  251. hashtable.put("name","null")
  252. hashtable.put("freq","0")
  253. hashtable.put("e",java.lang.Double.MAX_VALUE.toString())
  254. for(str<-x){
  255. hashtable.put("name",str._1)
  256. if(str._2!= -20){
  257. hashtable.put("freq",String.valueOf(str._2))
  258. }
  259. if(str._3<java.lang.Double.parseDouble(hashtable.get("e"))){
  260. hashtable.put("e",String.valueOf(str._3))
  261. }
  262. }
  263. (hashtable.get("name") ,hashtable.get("freq").toInt,hashtable.get("e").toDouble)
  264. }.filter(x=> !StringUtils.isBlank(x._1) && x._1.length>1)*/
  265. val  wordright_caculate_todf=  wordright_caculate.toDF("right_name","freq","right_info")
  266. val  wordleft_caculate_todf=    wordleft_caculate.toDF("left_name","left_info")
  267. val udf_get_min:((Double,Double)=>Double)=(arg1:Double,arg2:Double)=>Math.min(arg1,arg2)
  268. val sqlfunctin=udf(udf_get_min)
  269. val word_caculate_total=wordright_caculate_todf.join(wordleft_caculate_todf,wordright_caculate_todf("right_name")===wordleft_caculate_todf("left_name"),"left").
  270. withColumn("info", sqlfunctin(col("right_info"),col("left_info"))).drop("right_info").
  271. drop("left_name").drop("left_info").filter(length(wordright_caculate_todf("right_name"))>1).rdd
  272. //  wordright_caculate.union(wordleft_caculate).groupBy((f:(String,Int,Double))=>f._1).map(x=>x._2).take(20).foreach(println)
  273. println("计算凝固度")
  274. val  size_pmi=wordright_caculate.count()
  275. println("最后步骤中的size的总数是:"+size_pmi)
  276. println("map_total has down")
  277. //计算凝固度
  278. val  last= word_caculate_total.flatMap{
  279. x=>
  280. var w=x.apply(0).toString
  281. var f=x.apply(1).toString.toInt
  282. var  e=x.apply(2).toString.toDouble
  283. //       var w=x._1
  284. //      var f=x._2
  285. //      var  e=x._3
  286. var  arr=ArrayBuffer[(String,Int,Double,String,String)]()
  287. for(s <- 1 to  AtomsUitl.len(w)-1){
  288. //        var  lw=w.substring(0,s)
  289. try{
  290. var lw=AtomsUitl.substring(w, 0,s)
  291. //        var  rw=w.substring(s)
  292. var rw=AtomsUitl.substring(w, s)
  293. arr+=((w,f,e,lw,rw))
  294. }catch{
  295. case e:Exception=>arr+=(("",0,0.0,"",""))
  296. }
  297. }
  298. arr
  299. }.filter(f=> !StringUtils.isBlank(f._4)&& !StringUtils.isBlank(f._5))
  300. println("dataframe merge  will begin to  calculated..............")
  301. //        last.take(30).foreach(println)
  302. val  df= last.toDF("w_total","f","e","lw","rw")
  303. val  df1=wordright_caculate.toDF("w","freq","re")
  304. val  df2_drop=df.join(df1,df("lw")===df1("w"),"left").drop("re").drop("w").withColumnRenamed("freq", "lw_freq")
  305. //       val df2_drop=df2.drop("re").drop("w").withColumnRenamed("freq", "lw_freq")
  306. val df3_drop=df2_drop.join(df1,df2_drop("rw")===df1("w"),"left").drop("re").drop("w").withColumnRenamed("freq", "rw_freq")
  307. //       val df3_drop=df3.drop("re").drop("w").withColumnRenamed("freq", "rw_freq")
  308. //       948014
  309. //凝固度計算
  310. /*val result=df3_drop.rdd.groupBy{f=>f(0)}.map{
  311. x=>
  312. val map=new java.util.HashMap[String,String]()
  313. map.put("max","1")
  314. for(i<-x._2){
  315. map.put("w_total",i.apply(0).toString)
  316. map.put("f",i.apply(1).toString)
  317. map.put("e",i.apply(2).toString)
  318. var  ff:java.lang.Long=try{
  319. i.apply(5).toString.toLong*i.apply(6).toString.toLong
  320. }catch{
  321. case e:Exception=>1l
  322. }
  323. if(ff>map.get("max").toLong){
  324. map.put("max",ff.toString)
  325. }
  326. }
  327. var   pf=map.get("f").toLong*size_pmi*1.0/map.get("max").toLong
  328. var pmi=Math.log(pf)
  329. var  w_total= map.get("w_total")
  330. var f=map.get("f").toInt
  331. var e=map.get("e").toDouble
  332. map.clear()
  333. (w_total,f,pmi,e,0)
  334. //        ( map.get("w_total"),map.get("f").toInt ,pmi,map.get("e").toDouble,0)
  335. }.filter(f=>f._3>pmi&& f._4>info&& !StringUtils.isBlank(f._1))
  336. val  resultToDf=  result.toDF("name","freq","pmi","info","zero")
  337. */
  338. println("dataframe join has down")
  339. //计算凝聚度 改用DataFrame的形式
  340. val udf_get_pmi=(arg1:Int,arg2:Int,arg3:Int)=>Math.log((arg1.toLong*size_pmi.toLong*1.0)/(arg2.toLong*arg3.toLong))
  341. val udf_get_pmi_udf=udf(udf_get_pmi)
  342. val resultToDf=df3_drop.withColumn("pmi",udf_get_pmi_udf(col("f"),col("rw_freq"),col("lw_freq"))).withColumn("zero", col("f")*0).
  343. drop("rw_freq").drop("lw_freq").drop("lw").drop("rw").sort($"w_total",$"pmi".desc).dropDuplicates(Array("w_total")).
  344. filter($"pmi">pmi && $"e">info).withColumnRenamed("w_total", "name").withColumnRenamed("f", "freq").withColumnRenamed("e", "info")
  345. println("The final result will be caculated")
  346. val  word_document_resultToDf=word_document_result.toDF("name1","document")
  347. val resultToDf2= resultToDf.join(word_document_resultToDf,word_document_resultToDf("name1")===resultToDf("name"),"left").
  348. withColumn("documentcount",col("zero")+sum_document).drop("zero").drop("name1")
  349. //       val resultToDf2 =resultToDf1.withColumn("documentcount",col("zero")+sum_document).drop("zero").drop("name1")
  350. //       resultToDf2.show(20)
  351. //       互信息    凝聚度pmi
  352. //      左右熵  e
  353. //把结果存入到hdfs中
  354. println("Results will stored into  HDFS.")
  355. val sdf1=new SimpleDateFormat("yy-MM-dd")
  356. val save_path=save_path_result+sdf1.format(System.currentTimeMillis())
  357. try{
  358. resultToDf2.rdd.map{
  359. x=>
  360. var  name=x.apply(0).toString
  361. var  freq=x.apply(1).toString
  362. var entropy=x.apply(2).toString
  363. var info=x.apply(3).toString
  364. var document=x.apply(4).toString
  365. var documenttotal=x.apply(5).toString
  366. s"${name},${freq},${info},${entropy},${document},${documenttotal}"
  367. }.saveAsTextFile(save_path)
  368. println("....................sucess.............")
  369. //      resultToDf2.rdd.repartition(1).saveAsTextFile(save_path)
  370. }catch{
  371. case e:Exception=>println("some errors  happend  when sava  the last datas")
  372. }
  373. //把结果插入到mysql数据库中
  374. /*  val  driver="com.mysql.jdbc.Driver"
  375. Class.forName(driver)
  376. val url ="jdbc:mysql://10.1.1.28:3306/spark"
  377. val pro=new java.util.Properties
  378. pro.setProperty("user","usr_dba")
  379. pro.setProperty("password","4rfv%TGB^YHN")
  380. pro.setProperty("use_unicode", "true")
  381. pro.setProperty("characterEncoding", "utf8")
  382. resultToDf2.write.mode(SaveMode.Overwrite).jdbc(url, "wordsegment",pro)
  383. */
  384. println(start)
  385. println(sdf.format(System.currentTimeMillis()))
  386. sc.stop()
  387. }
  388. }

最新文章

  1. [Intel Edison开发板] 03、Edison开发IDE入门及跑官方提供的DEMO
  2. Python os模块介绍
  3. [转]&lt;jsp:include&gt;和&lt;%@include%&gt;的区别
  4. App前后台判断
  5. JS生成二维码,允许中文转码
  6. C#对七牛云的操作
  7. 【BZOJ 2594】【WC 2006】水管局长数据加强版
  8. Spring系列之beanFactory与ApplicationContext
  9. Linux Bash Shell 快速入门
  10. qemu源码架构
  11. shell获取本地ip的三种方法
  12. javascript form验证、完善 第24节
  13. QueryRunner的使用
  14. 第三方浏览器内核嵌入一、Crosswalk
  15. angularjs实现首页轮播图
  16. Weblogic Exception in AppMerge flows&#39; progression
  17. Linux 学习 (七) 挂载命令 &amp; 用户登陆查看
  18. Windows7 (Win7) 配置Windows Update 时失败 正在还原更改
  19. 通过Navicat Premium迁移Oracle到EDB迁移实战
  20. 使用jar命令打jar/war包、创建可执行jar包、运行jar包、及批处理脚本编写

热门文章

  1. Ehcache的配置与使用
  2. C#WinFrom写的拼图游戏
  3. LeetCode Path Sum 判断树的路径之和
  4. 给网站添加图标: Font Awesome
  5. jwt 在.net core 2.0的使用
  6. Oracle 11g基础
  7. ceph-文件存储
  8. CSS中margin: 0 auto;样式没有生效
  9. Oracle 使用Nid 修改数据库的DBID 和 Database Name
  10. 关于 Angular引用Material出现node_modules/@angular/material/button-toggle/typings/button-toggle.d.ts(154,104): error TS2315: Type &#39;ElementRef&#39; is not generic.问题