package wikipedia

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Level,Logger} case class WikipediaArticle(title: String, text: String) {
/**
* @return Whether the text of this article mentions `lang` or not
* @param lang Language to look for (e.g. "Scala")
*/
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
} object WikipediaRanking {
// 设置日志
Logger.getLogger("org").setLevel(Level.ERROR) val langs = List(
"JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy") val conf: SparkConf = new SparkConf()
val sc: SparkContext = new SparkContext("local[*]", "Wikipedia") // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse) /** Returns the number of articles on which the language `lang` occurs. 返回lang语言出现的文章篇数
* Hint1: consider using method `aggregate` on RDD[T].
* Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
*/
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
rdd.filter(_.mentionsLanguage(lang)).count().toInt /* (1) Use `occurrencesOfLang` to compute the ranking of the languages
* (`val langs`) by determining the number of Wikipedia articles that
* mention each language at least once. Don't forget to sort the
* languages by their occurrence, in decreasing order!
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
rdd.cache() // 允许数据存储在内存
langs.map(lang => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse
/*
对于langs的每一个元素找到包含它的文章篇数。
其中sortBy(_._2)指根据occurrencesOfLang(lang, rdd))来排序,
如果是sortBy(_._1)则根据lang来排序
默认从小到大排序,所以加上.reverse
*/
} /* Compute an inverted index of the set of articles, mapping each language
* to the Wikipedia pages in which it occurs.
*/
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
val articles_Languages = rdd.flatMap(article => {
langs.filter(lang => article.mentionsLanguage(lang))
.map(lang => (lang, article))
})
articles_Languages.groupByKey
} /* (2) Compute the language ranking again, but now using the inverted index. Can you notice
* a performance improvement?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
index.mapValues(_.size).sortBy(-_._2).collect().toList /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
* Can you notice an improvement in performance compared to measuring *both* the computation of the index
* and the computation of the ranking? If so, can you think of a reason?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
rdd.flatMap(article => {
langs.filter(article.mentionsLanguage) // 相当于langs.filter(lang => article.mentionsLanguage(lang)) 或者 langs.filter(article.mentionsLanguage(_))
.map((_, 1))
}).reduceByKey(_ + _)
.sortBy(_._2)
.collect()
.toList
.reverse
} def main(args: Array[String]) { /* Languages ranked according to (1) */
val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd)) /* An inverted index mapping languages to wikipedia pages on which they appear */
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd) /* Languages ranked according to (2), using the inverted index */
val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index)) /* Languages ranked according to (3) */
val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd)) /* Output the speed of each ranking */
println(timing)
sc.stop()
} val timing = new StringBuffer
def timed[T](label: String, code: => T): T = {
val start = System.currentTimeMillis()
val result = code
val stop = System.currentTimeMillis()
timing.append(s"Processing $label took ${stop - start} ms.\n")
result
}
}

最新文章

  1. vector - vector product
  2. 一篇笔记整理JVM工作原理
  3. Nordic Semiconductor nRF52832 蓝牙智能多协议单芯片解决方案荣获《中国电子商情》编辑选择奖
  4. 基于select模型的udp客户端实现超时机制
  5. Web前段优化,提高加载速度 css
  6. iOS高仿城觅应用客户端项目(开发思路和代码)
  7. 宏HASH_DELETE
  8. AFNetworking2.x 使用过程中遇到的问题
  9. wpf 依赖性属性
  10. ActiveReports 交互式报表之向下钻取解决方案
  11. 防止ajax非正常访问
  12. Go 从入门到精通(三)字符串,时间,流程控制,函数
  13. Jmeter使用代理服务器录制脚本
  14. Xposed快速hook关键点
  15. Archlinux安裝指南(uefi+gpt)
  16. EasyUI 使用tabs切换后datagrid显示不了内容
  17. SoupUI 5.1.2(专业版)下载(含破解文件)
  18. Confluence 6 编辑站点欢迎消息
  19. git 新建工程
  20. LinearLayout 线性布局

热门文章

  1. c# winfrom 可折叠的树形控件
  2. 虚拟化代码博客 good
  3. Memcached在Linux系统下的安装和PHP开启 Memcached的 扩展 超级解决方案
  4. 客服端JavaScript线程模型
  5. 记一次腾讯IEG面试失败经历
  6. 10 jQuery的事件绑定和解绑
  7. SpringBoot启动访问JSP页面,直接进入页面或者访问不到,报404,并且加载tomcat插件tomcat-embed-jasper也不行
  8. 微信小程序商城 带java后台源码
  9. 你竟然没用 Maven 构建项目?
  10. 微服务SpringCloud之服务调用与负载均衡