1、搭建基本spark+Hadoop的本地环境

  https://blog.csdn.net/u011513853/article/details/52865076?tdsourcetag=s_pcqq_aiomsg

2、下载对应的spark与pyspark的版本进行安装

  https://pypi.org/project/pyspark/2.3.0/#history

3、单词统计测试

  a、python版本

import os
import shutil from pyspark import SparkContext inputpath = './data/wc.txt'
outputpath = './data/out.txt' sc = SparkContext('local', 'wordcount') # 读取文件
input = sc.textFile(inputpath)
# 切分单词
words = input.flatMap(lambda line: line.split(' '))
# 转换成键值对并计数
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) # 输出结果
counts.foreach(print) # 删除输出目录
if os.path.exists(outputpath):
shutil.rmtree(outputpath, True) # 将统计结果写入结果文件
counts.saveAsTextFile(outputpath)

  

  b、scala版本

package com.wcount

import java.io.{File, PrintWriter}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} object ScalaWordCount { def main(args: Array[String]): Unit = {
/**
* SparkConf:表示spark application的参数,
* setMaster:表示运行的模式:
*
* local:本地模式,一般用于测试
* standalone:spark集群自带的资源调度模式
* yarn:hadoop
* mesos:资源调度框架
* setAppName:设置application的名称
*/
val conf = new SparkConf().setMaster("local").setAppName("workJob")
/**
* SparkContext:spark application的上下文环境,通往集群的唯一入口
*/
val sc = new SparkContext(conf) // val session: SparkSession = SparkSession.builder.appName("wc").master("local").getOrCreate() val lines: RDD[String] = sc.textFile("./data/wc.txt")
val words: RDD[String] = lines.flatMap(line => {
println("flatmap...........")
line.split(" ")
})
val tuple: RDD[(String, Int)] = words.map(word => {
println("map............")
new Tuple2(word, 1)
})
val result: RDD[(String, Int)] = tuple.reduceByKey((v1: Int, v2: Int) => v1 + v2)
//result.foreach(println) //文件写入
val outWriter = new PrintWriter(new File("./data/out.txt"))
var wt:String = "" for (item<-result){
wt =item._1.toString+":"+item._2.toString+" "
println(wt)
}
println(wt)
outWriter.println(wt)
outWriter.close() while (true){ }
// sc.textFile("./data/wc").flatMap(line => {line.split(" ")}).map(word => {new Tuple2(word, 1)}).reduceByKey((v1: Int, v2: Int) => v1 + v2).foreach(println)
sc.stop()
}
}

  

最新文章

  1. pagination 分页
  2. C/C++变量名与值的问题
  3. ES6入门系列三(特性总览下)
  4. console 让 js 调试更简单
  5. Linux Kernel中获取当前目录方法(undone)
  6. 用Rufus来制作Windows10的U盘安装盘
  7. 对volatile关键字的理解
  8. RocketMQ三主三从二命名服务平滑版本升级实操
  9. Oracle中的AWR,全称为Automatic Workload Repository
  10. 微信小程序(五) 利用模板动态加载数据
  11. 21-matlab 迷宫题
  12. 使用WPF教你一步一步实现连连看(二)
  13. CenOS下安装 Git,Git的初始设置,添加文件,提交文件
  14. svn冲突问题解决办法
  15. Strict Mode (JavaScript)
  16. UA 用户代理
  17. Python生成器-博文读后感
  18. C++ 递归实现汉诺塔
  19. tornado 多进程模式
  20. python之字符串处理

热门文章

  1. JS获取当前日期和时间的方法,并按照YYYY-MM-DD格式化
  2. centos7 远程桌面连接到xfce桌面
  3. (转)分布式锁的几种使用方式(redis、zookeeper、数据库)
  4. SpringMVC @RequestParam
  5. Kendo UI使用教程:入门指南
  6. ps制作雾的效果
  7. C语言中time函数和localtime获取系统时间和日期
  8. 对Serverless的研究
  9. wed.xml 中 filter、servlet 配置格式
  10. .net2.0 Thread 多线程