Spark setMaster源码

/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}

要连接到的主URL,例如“local”用一个线程在本地运行,“local [ 4 ]”用4个内核在本地运行,或者“Spark : / / master : 7077”用Spark独立集群运行。

package cn.rzlee.spark.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} // object相当于静态的
object ScalaWordCount {
def main(args: Array[String]): Unit = { //创建spark配置,设置应用程序名字
val conf = new SparkConf().setAppName("wordCountApp") // 创建spark执行入口
val sc = new SparkContext() // 指定以后从哪里读取数据创建RDD(弹性分布式数据集)
val lines: RDD[String] = sc.textFile("")
// 切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
// 将单词和一组合
val wordAndOne: RDD[(String, Int)] = words.map((_, ))
// 按key进行聚合 相同key不变,将value相加
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
// 排序
val sorted = reduced.sortBy(_._2,false)
// 将结果保存到HDFS中
sorted.saveAsTextFile("")
//释放资源
sc.stop()
}
}

基于排序机制的wordCount

java 版本:

package cn.rzlee.spark.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import scala.actors.threadpool.Arrays; /**
* @Author ^_^
* @Create 2018/11/3
*/
public class SortWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf); // 创建line RDD
JavaRDD<String> lines = sc.textFile("C:\\Users\\txdyl\\Desktop\\log\\in\\data.txt", 1); // 执行单词计数
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split("\t"));
}
}); JavaPairRDD<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
}); JavaPairRDD<String, Integer> wordCounts = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}); // 进行key-value的反转映射
JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<>(t._2, t._1);
}
}); // 按照key进行排序
JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false); // 再次进行key-value的反转映射
JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
return new Tuple2<>(t._2, t._1);
}
}); // 打印结果
sortedWordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + " appears " + t._2+ " times.");
}
});
// 关闭JavaSparkContext
sc.close();
}
}

scala版本:

package cn.rzlee.spark.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} object SortWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf) val lines = sc.textFile("C:\\Users\\txdyl\\Desktop\\log\\in\\data.txt",1)
val words: RDD[String] = lines.flatMap(line=>line.split("\t"))
val pairs: RDD[(String, Int)] = words.map(word=>(word,1))
val wordCounts: RDD[(String, Int)] = pairs.reduceByKey(_+_)
val countWords: RDD[(Int, String)] = wordCounts.map(wordCount=>(wordCount._2, wordCount._1))
val sortedCountWords = countWords.sortByKey(false)
val sortedWordCounts: RDD[(String, Int)] = sortedCountWords.map(sortedCountWord=>(sortedCountWord._2, sortedCountWord._1))
sortedWordCounts.foreach(sortedWordCount=>{
println(sortedWordCount._1+" appear "+ sortedWordCount._2 + " times.")
}) sc.stop()
} }

最新文章

  1. 利用stack结构,将中缀表达式转换为后缀表达式并求值的算法实现
  2. 反向Ajax,实现服务器向客户端推送消息之 Comet
  3. C#获取硬件信息
  4. 第一章 管理程序流(In .net4.5) 之 实现多线程和异步处理
  5. 将a、b的值进行交换,并且不使用任何中间变量
  6. (转载)Cocos2dx-OpenGL ES2.0教程:你的第一个立方体(5)
  7. Linq学习之旅——LINQ查询表达式
  8. Android开发手记(27) Java多线程的操作
  9. libconfig第一篇———使用指南
  10. adb deviecs时显示的emulator-5554如何删除
  11. Atom本地安装插件右上角出现红色报错解决方案
  12. iframe获取元素
  13. ajax跨域问题及相关解决方案
  14. sas data infile 语句选项
  15. Java注解Annotation学习笔记
  16. [OpenCV] Samples 02: Mat - 图像矩阵
  17. harbor私有镜像仓库的搭建与使用与主从复制
  18. [2017BUAA软工]第3次个人作业
  19. HTML5和XHTML的区别
  20. #include &amp;lt;NOIP2010 Junior&amp;gt; 三国游戏 ——using namespace wxl;

热门文章

  1. 查看Tomcat状态页,管理app,主机管理
  2. vi编辑器命令大全
  3. ArrayList remove注意事项
  4. FPGA开发流程
  5. 一个方便的图片载入框架——ImageViewEx
  6. Android开发 adb命令提示:Permission denied (转)
  7. phalcon builder get raw sql
  8. Android 禁止状态栏下拉
  9. centos7.0 关闭防火墙
  10. EasyNVR无插件摄像机直播之:摄像机网页低延时无插件直播实现