Spark-Streaming hdfs count 案例
2024-10-20 16:10:30
Streaming hdfs count 需要先启动 hadoop 集群。
# 启动 hadoop 集群
start-dfs.sh
start-yarn.sh # 查看是否启动成功
# 命令 jps
jps
hadoop 启动成功之后,下面就是关于 stream 的代码,stream 统计代码如下,将下面的代码进行打包,上传到服务器上即可。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext} object HdfsWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
} // StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(10)) // Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// wordCounts.saveAsTextFiles(args(1))
ssc.start()
ssc.awaitTermination()
} }
代码需要传递两个参数,一个是 stream 监控的数据输入目录,一个是输出目录。对应的执行脚本如下。
$SPARK_HOME/bin/spark-submit\
--class com.hw.streaming.HdfsWordCount\
--master yarn-cluster \
--executor-memory 1G \
--total-executor-cores 2 \
--files $HIVE_HOME/conf/hive-site.xml \
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar \
./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://master:9000/data/input hdfs://master:9000/data/output
执行对应的脚本
# 脚本是跑在 yarn-cluster 上的,所以可以通过 ui 界面查看对应的内容
sh hdfs_run.sh
脚本运行之后,一开始监控的输入目录是没有任何数据的,现在尝试往输入目录上传对应的数据文件,如下。
# 随便上传一个文件,比如这里是 3.txt,对应的内容是
# cat 3.txt
hello world
hello world
hello world
hello world
hello world
hello world
hello world
a
a
a
a
a
a
a b b b
# 将 3.txt 上传到 hdfs
hadoop fs -put 3.txt /data/input
文件上传之后,可以打开浏览器,通过查看日志来看效果。
# 浏览器输入 192.168.56.122:8088
# 点击对应的 application
# 点击对应的 log
# 点击查看 log 详情
# 会看到下面的日志输出 -------------------------------------------
Time: 1564279580000 ms
-------------------------------------------
(b,3)
(hello,7)
(world,7)
(a,7)
以上就是 Streaming hdfs count 的案例,一开始调试的时候没有通过是没有看清楚,是先把数据文件上传到 hdfs 里面了,导致后面统计不出来,后来发现是启动之后监控的,因此,需要先启动,在向里面放数据。查看日志的时候,发现 INFO 也打印出来了,如果不需要看 INFO 信息,可以在 hadoop 配置文件中 log4j.properties 中把日志级别调高,或者去掉 INFO,即可。
最新文章
- snmp ubuntu/centos--
- iOS的数据持久化
- C段旁注工具CCC.exe
- 慕课网-Java入门第一季-7-2 Java 中无参无返回值方法的使用
- Qt connect parent widget 连接父控件的信号槽
- SQL 锁的介绍
- hibernate和mybatis思想,区别,优缺点
- where和having的区别
- jquery实现点击改变背景色,点击其他恢复原来背景色,被点击的改变背景色
- Web端的Tab控件在切换Tab时Load数据出错的处理
- UE4使用C++创建枚举变量适用于C++与蓝图
- 基于N-Gram判断句子是否通顺
- setInterval()与setTimeout()的区别
- Android为TV端助力 很详细的序列化过程Parcelable
- ElasticSearch核心知识总结(一)es的六种搜索方式和数据分析
- Python CBV和FBV
- jsonp跨域设置cookie
- ubuntu上mongodb的安装
- Python 之 Difflib
- 初级字典树查找在 Emoji、关键字检索上的运用 Part-2