Streaming hdfs count 需要先启动 hadoop 集群。

# 启动 hadoop 集群
start-dfs.sh
start-yarn.sh # 查看是否启动成功
# 命令 jps
jps

  hadoop 启动成功之后,下面就是关于 stream 的代码,stream 统计代码如下,将下面的代码进行打包,上传到服务器上即可。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext} object HdfsWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
} // StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(10)) // Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// wordCounts.saveAsTextFiles(args(1))
ssc.start()
ssc.awaitTermination()
} }

  代码需要传递两个参数,一个是 stream 监控的数据输入目录,一个是输出目录。对应的执行脚本如下。

$SPARK_HOME/bin/spark-submit\
--class com.hw.streaming.HdfsWordCount\
--master yarn-cluster \
--executor-memory 1G \
--total-executor-cores 2 \
--files $HIVE_HOME/conf/hive-site.xml \
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar \
./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://master:9000/data/input hdfs://master:9000/data/output

  执行对应的脚本

# 脚本是跑在 yarn-cluster 上的,所以可以通过 ui 界面查看对应的内容
sh hdfs_run.sh

  脚本运行之后,一开始监控的输入目录是没有任何数据的,现在尝试往输入目录上传对应的数据文件,如下。

# 随便上传一个文件,比如这里是 3.txt,对应的内容是
# cat 3.txt
hello world
hello world
hello world
hello world
hello world
hello world
hello world
a
a
a
a
a
a
a b b b
# 将 3.txt 上传到 hdfs
hadoop fs -put 3.txt /data/input

  文件上传之后,可以打开浏览器,通过查看日志来看效果。

# 浏览器输入 192.168.56.122:8088
# 点击对应的 application
# 点击对应的 log
# 点击查看 log 详情
# 会看到下面的日志输出 -------------------------------------------
Time: 1564279580000 ms
-------------------------------------------
(b,3)
(hello,7)
(world,7)
(a,7)

  以上就是 Streaming hdfs count 的案例,一开始调试的时候没有通过是没有看清楚,是先把数据文件上传到 hdfs 里面了,导致后面统计不出来,后来发现是启动之后监控的,因此,需要先启动,在向里面放数据。查看日志的时候,发现 INFO 也打印出来了,如果不需要看 INFO 信息,可以在 hadoop 配置文件中 log4j.properties 中把日志级别调高,或者去掉 INFO,即可。

  

最新文章

  1. snmp ubuntu/centos--
  2. iOS的数据持久化
  3. C段旁注工具CCC.exe
  4. 慕课网-Java入门第一季-7-2 Java 中无参无返回值方法的使用
  5. Qt connect parent widget 连接父控件的信号槽
  6. SQL 锁的介绍
  7. hibernate和mybatis思想,区别,优缺点
  8. where和having的区别
  9. jquery实现点击改变背景色,点击其他恢复原来背景色,被点击的改变背景色
  10. Web端的Tab控件在切换Tab时Load数据出错的处理
  11. UE4使用C++创建枚举变量适用于C++与蓝图
  12. 基于N-Gram判断句子是否通顺
  13. setInterval()与setTimeout()的区别
  14. Android为TV端助力 很详细的序列化过程Parcelable
  15. ElasticSearch核心知识总结(一)es的六种搜索方式和数据分析
  16. Python CBV和FBV
  17. jsonp跨域设置cookie
  18. ubuntu上mongodb的安装
  19. Python 之 Difflib
  20. 初级字典树查找在 Emoji、关键字检索上的运用 Part-2

热门文章

  1. Laravel入门及实践,快速上手ThinkSNS+二次开发
  2. MySQL Install--编译安装MySQL 5.7
  3. 什么影响了mysql的性能-硬件资源及系统方面优化
  4. lvm逻辑卷扩容报错解决
  5. Linux访问控制列表(Access Control List,简称ACL)
  6. C++(五十) — 容器中元素满足的条件
  7. [AI] 切换cuda版本的万金油
  8. jmeter脚本中请求参数获取的几种方式
  9. siblings,next,prev
  10. shell脚本中向hive动态分区插入数据