Storm WordCount 工作过程

Storm 版本:

1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;

2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;

3、WordCountBolt 接收 SplitBolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

Java 版本:

1、读取文件中的数据,一行一行的读取;

2、将读到的数据进行切割;

3、对切割后的数组中的单词进行计算。

Hadoop 版本:

1、按行读取文件中的数据;

2、在 Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;

3、接收 Mapper()中输出的数据数组,在 Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; public class MainClass { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//创建一个 TopologyBuilder
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("SpoutBolt", new SpoutBolt(), 2); tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");
tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));
//创建配置
Config conf = new Config();
//设置 worker 数量
conf.setNumWorkers(2);
//提交任务
//集群提交
//StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());
//本地提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("myWordcount", conf, tb.createTopology());
}
}

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; public class SplitBolt extends BaseRichBolt{
OutputCollector collector; /** * 初始化 */
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
} /** * 执行方法 */
public void execute(Tuple input) {
String line = input.getString(0);
String[] split = line.split(" ");
for (String word : split) {
collector.emit(new Values(word));
}
} /** * 输出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple; public class CountBolt extends BaseRichBolt{ OutputCollector collector;
Map<String, Integer> map = new HashMap<String, Integer>(); /** * 初始化 */
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
} /** * 执行方法 */
public void execute(Tuple input) {
String word = input.getString(0);
if(map.containsKey(word)){
Integer c = map.get(word);
map.put(word, c+1);
}else{
map.put(word, 1);
}
//测试输出
System.out.println("结果:"+map);
} /** * 输出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values; public class SpoutBolt extends BaseRichSpout{ SpoutOutputCollector collector;
/** * 初始化方法 */
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
} /** * 重复调用方法 */
public void nextTuple() {
collector.emit(new Values("hello world this is a test"));
} /** * 输出 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("test"));
}
}

POM.XML 文件内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging> <name>stormwordcount</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.test.stormwordcount.MainClass</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

遇到的问题

基于Storm的WordCount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为Eclipse IDE for Eclipse Committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:

使用链接下载,Help->Install New SoftWare



点击Add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取

点击连接:http://www.eclipse.org/m2e/index.html 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中Contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自带maven插件的JavaEE IDE了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 Windows->Preferencs->Maven->Installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:

最新文章

  1. Copy-On-Write容器
  2. java后台调用url无协议
  3. 网站错误记录:Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool.
  4. 用 .NET Reflector 8 查看 System.Windows.Controls 命名空间下的类
  5. 使用Zen coding高效编写html代码
  6. Android开发-Hello World+phonegap(Cordova)
  7. 【原】WinForm中的DataGridView加入Combbox或者DropDownButton后,操作变慢
  8. 提交jar作业到spark上运行
  9. [改善Java代码]不推荐覆写start方法
  10. Memcached源码分析之memcached.h
  11. Java开源生鲜电商平台-搜索模块的设计与架构(源码可下载)
  12. c#调用word文件
  13. Oracle篇 之 数据操作
  14. windows10上安装mysql(详细步骤)
  15. 记录学习新框架yii
  16. Github+HEXO FATAL bad indentation of a mapping entry at line 84
  17. ionic的基础学习(第一篇)
  18. iOS archive(归档)
  19. 机器学习-ID3决策树算法(附matlab/octave代码)
  20. Centos 7中的网卡一致性命名规则

热门文章

  1. ArcGIS API For Javascript :如何解决跨网不能正常获取依赖项的问题?
  2. Qt Framework 问题之 framework/Versions/A:bundle format unrecognized, invalid, or unsuitable
  3. [Part 3] 在Ubuntu 16.04源码编译PCL 1.8.1支持VTK和QT
  4. Java基础知识总结之垃圾回收机制
  5. Roarctf 几道pwn 复现
  6. odoo12 修行基础篇之 添加字段 (一)
  7. Python3学习-基础
  8. Lua和C交互的简易教程
  9. 利用keytool、openssl生成证书文件
  10. OSI层次模型