Storm入门到精通(四)---本地实例Demo
单词实时计数
maven项目的结构:
一、Pom.xml
[html] view plain copy
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.casicloud</groupId>
<artifactId>storm</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>storm Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core www.huayu521.com </artifactId>
<version>1.0.4</version>
</dependency>
</dependencies>
<build>
<finalName>storm<www.dfgjpt.com /finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin<www.dfgjyl.cn/ /artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
二、类
2.1 数据源Spout
[java] view plain copy
* @Description: 向后端发射tuple数据流
*/
public class SentenceSpout extends BaseRichSpout {
//BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现
private SpoutOutputCollector collector;
private String[www.yongshiyule178.com ] sentences = {
"my name is soul",
"im a boy",
"i have a dog",
"my dog has fleas",
"my girl friend is beautiful"
};
private int index=0;
/**
* open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。
* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。
* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。
*/
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
/**
* nextTuple()方法是任何Spout实现的核心。
* Storm调用这个方法,向输出的collector发出tuple。
* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。
*/
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index>=sentences.length) {
index=0;
}
Utils.sleep(1);
}
/**
* declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口
* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段
}
}
2.2语句分割成单词bolt
[java] view plain copy
* @Description: 订阅sentence spout发射的tuple流,实现分割单词
*/
public class SplitSentenceBolt extends BaseRichBolt {
//BaseRichBolt是IComponent和IBolt接口的实现
//继承这个类,就不用去实现本例不关心的方法
private OutputCollector collector;
/**
* prepare()方法类似于ISpout 的open()方法。
* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。
* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,
* 所以prepare()方法只保存OutputCollector对象的引用。
*/
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector=outputCollector;
}
/**
* SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。
* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
* 本例中,收到的元组中查找“sentence”的值,
* 并将该值拆分成单个的词,然后按单词发出新的tuple。
*/
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));//向下一个bolt发射数据
}
}
/**
* plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));//告诉组件发出数据流包含sentence字段
}
}
2.3 单词统计bolt
[java] view plain copy
/**
* @Author
* @Date 2018/2/27
* @Time 10:02
* @Description: 订阅 SplitSentenceBolt的输出流,实现单词计数,并发送当前计数给下一个bolt
*/
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
//存储单词和对应的计数
private HashMap<String, Long> counts = null;//注:不可序列化对象需在prepare中实例化
/**
* 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的
* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。
* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)
* 会抛出NotSerializableException并且拓扑将无法发布。
* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。
* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化
* 而在prepare()方法中对不可序列化的对象进行实例化。
*/
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap<String, Long>();
}
/**
* 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)
* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。
* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。
*/
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;//如果不存在,初始化为0
}
count++;//增加计数
this.counts.put(word, count);//存储计数
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射
//其他bolt可以订阅这个数据流进一步处理
outputFieldsDeclarer.declare(new Fields("word","count"));
}
}
2.4 归总打印bolt
[java] view plain copy
* @Description:生成报告
*/
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;//保存单词和对应的计数
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.counts.put(word, count);
//实时输出
System.out.println("结果:"+this.counts);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//这里是末端bolt,不需要发射数据流,这里无需定义
}
/**
* cleanup是IBolt接口中定义
* Storm在终止一个bolt之前会调用这个方法
* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果
* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接
* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。
*/
public void cleanup(){
System.out.println("---------- FINAL COUNTS -----------");
ArrayList<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key : keys){
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("----------------------------");
}
}
三、入口类
[java] view plain copy
* @Description: 实现单词计数
*/
public class App {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main( String[] args ) throws Exception //throws Exception
{
//System.out.println( "Hello World!" );
//实例化spout和bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例
//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流
//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout
//设置两个Executeor(线程),默认一个
builder.setSpout(SENTENCE_SPOUT_ID, spout,1);
// SentenceSpout --> SplitSentenceBolt
//注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例
//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
//SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)
builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中
//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中
//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
//WordCountBolt单词计数器设置4个Executeor(线程)
builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为
if (args != null && args.length > 0) {
// 集群模式
config.setNumWorkers(2); //设置worker数量
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
// 本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(10000);// 10s后自动结束
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}
四、结果:
....
[html] view plain copy
结果:{a=7023, im=3511, soul=3512, i=3511, is=7023, my=10534, girl=3511, boy=3511, beautiful=3511, name=3512, have=3511, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7023, im=3512, soul=3512, i=3511, is=7023, my=10534, girl=3511, boy=3511, beautiful=3511, name=3512, have=3511, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7023, im=3512, soul=3512, i=3511, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3511, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7023, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3511, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3511, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3512, friend=3511, has=3511, dog=7022, fleas=3511}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3512, friend=3511, has=3511, dog=7023, fleas=3511}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3512, friend=3511, has=3511, dog=7023, fleas=3512}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10534, girl=3511, boy=3512, beautiful=3511, name=3512, have=3512, friend=3511, has=3512, dog=7023, fleas=3512}
结果:{a=7024, im=3512, soul=3512, i=3512, is=7023, my=10535, girl=3511, boy=3512, beautiful=3511, name=3512, have=3512, friend=3511, has=3512, dog=7023, fleas=3512}
......
[html] view plain copy
---------- FINAL COUNTS -----------
a : 7026
beautiful : 3513
boy : 3513
dog : 7026
fleas : 3513
friend : 3513
girl : 3513
has : 3513
have : 3513
i : 3513
im : 3513
is : 7026
my : 10539
name : 3513
soul : 3513
----------------------------
源码下载:https://github.com/loujitao/DemoList/tree/master/stormTest
注意事项:
1、jdk版本会有影响,本例使用1.8.0版本的jdk;
2、Utils.sleep(20000);// 20s后自动结束 时间建议设大一些,以免出现莫名异常
最新文章
- iOS中iconfont(图标字体)的基本使用
- centos设置静态ip地址
- java单例-积木系列
- 用HTML/JS/PHP方式实现页面延时跳转
- word文档中的字号和磅的对应关系
- bzoj1267 3784
- 命令提示符CMD远程连接Mysql学习笔记
- 单选框和下拉框的jquery操作
- HDU 2846 Repository(字典树)
- Mac下安装包管理平台Homebrew(Mac 10.12)
- 通过java.net.URLConnection发送HTTP请求的方法
- SQL连接操作
- Parallel.ForEach 多线程 声明失败 ";未将对象引用设置到对象的实例";
- win7访问局域网总提示用户名密码错误解决方案
- 你想了解Go语言开发吗?
- WikiBooks/Cg Programming
- 修改centos的时间,解决时间比本地实际时间快了8小时
- webView内部跳转后 返回不行了
- 使用JSTL的taglib做if判断
- 木马suppoie 处理的几个思路 木马文件的权限所有者 属主数组 定时任务 目录权限
热门文章
- Tetris(俄罗斯方块)
- C/C++作用域运算符::
- 互联网校招面试必备——Java多线程
- 32bit 天堂2 windows 2000 server架设教程
- vscode调试js,安装了nodejs之后还出现无法在Path上找到运行时的node
- Hyperledger Fabric服务器配置及修改Docker容器卷宗存储根目录/位置
- Cocos2d-x的跨平台原理
- excel窗口独立显示/单独显示
- Dingo Api 1.0在laravel5.2中的简单应用
- plsql 不修改tnsnames.ora文件