开发Storm的第一步就是设计Topology,为了方便开发者入门,首先我们设计一个简答的例子,该例子的主要的功能就是把每个单词的后面加上Hello,World后缀,然后再打印输出,整个例子的Topology图如下:

  整个Topology分为三部分:

  TestWordSpout:数据源,负责发送words

  ExclamationBolt:负责把每个单词后面加上后缀

  PrintBolt:负责把单词打印输出

代码实现

1.使用IDEA创建maven过程,添加Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.ysl</groupId>
<artifactId>storm</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<!-- https://mvnrepository.com/artifact/storm/storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.ysl.WordsTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

TestWordSpout:

package com.ysl.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Map;
import java.util.Random; public class TestWordSpout extends BaseRichSpout{ private static Logger logger = LoggerFactory.getLogger(TestWordSpout.class);
private SpoutOutputCollector collector = null; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
} public void nextTuple() {
Utils.sleep(1000);
final String[] words = new String[]{"fdfs","fdfs","ffsdfs"};
final Random random = new Random();
final String word = words[random.nextInt(words.length)];
collector.emit(new Values(word));
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}

ExclamationBolt:

package com.ysl.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Map; public class ExclamationBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(ExclamationBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
} public void execute(Tuple tuple) {
this.collector.emit(tuple,new Values(tuple.getString(0)+"!!!"));
this.collector.ack(tuple);
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}

PrintBolt:

package com.ysl.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Map; public class PrintBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(PrintBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
} public void execute(Tuple tuple) {
logger.info(tuple.getString(0) + ".......");
this.collector.ack(tuple);
} public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}

WordsTopology:

package com.ysl;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import com.ysl.bolts.ExclamationBolt;
import com.ysl.bolts.PrintBolt;
import com.ysl.spouts.TestWordSpout; public class WordsTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("word",new TestWordSpout(),1);
topologyBuilder.setBolt("exclaim",new ExclamationBolt(),1).shuffleGrouping("word");
topologyBuilder.setBolt("print",new PrintBolt(),1).shuffleGrouping("exclaim");
Config config = new Config();
config.setDebug(true);
if(args != null && args.length > 0){
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0],config,topologyBuilder.createTopology());
}else{
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("test",config,topologyBuilder.createTopology());
Utils.sleep(30000);
localCluster.killTopology("test");
localCluster.shutdown();
}
}
}

2.打包运行

  使用maven打包应用程序,命令如下:

mvn clean install

  storm的运行方式有两种:一是本地运行,适合调试和开发,自己直接在IDEA中执行main函数运行即可,本地模式的代码中有设置睡眠时间,到时间后主动kill topoloyg

  二是远程集群模式运行:集群模式需要先创建一个包含程序代码以及代码所依赖的依赖包的jar包(有关storm的jar包不用包括, 这些jar包会在工作节点上自动被添加到classpath里面去)。如果使用maven, 那么插件:Maven Assembly Plugin可以帮你打包,详细见上述maven的设置。

  远程运行要使用storm的命令提交topology到storm集群:

storm jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar com.ysl.WordsTopology testfrfr

执行上面的命令后,出现下面的日志,表示执行成功:

346  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
351 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar
Start uploading file '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes)
[==================================================] 6196 / 6196
File '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' uploaded to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes)
363 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar
363 [main] INFO backtype.storm.StormSubmitter - Submitting topology testfrfr in distributed mode with conf {"topology.workers":3,"topology.debug":true}
448 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology:

终止一个topology

要终止一个topology, 执行:

storm kill {stormname}

其中{stormname}是提交topology给storm集群的时候指定的名字。

storm不会马上终止topology。相反,它会先终止所有的spout,让它们不再发射任何新的tuple, storm会等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才杀掉所有的工作进程。这会给topology足够的时 间来完成所有我们执行storm kill命令的时候还没完成的tuple。

最新文章

  1. 初识JAVA之OOP
  2. [poj2247] Humble Numbers (DP水题)
  3. Linux字符串截取和处理命令 cut、printf、awk、sed、sort、wc
  4. Gradient Boost Decision Tree(GBDT)中损失函数为什么是对数形式
  5. 剑指Offer:面试题25——二叉树中和为某一值的路径(java实现)
  6. springmvc常用注解与类型转换
  7. 借助 MySQLTuner 优化 MySQL 性能(转载的一篇文章)
  8. jQuery Pagination Ajax分页插件中文详解(转)
  9. Asp.net--Ajax前后台数据交互
  10. sql getdate() 时间格式设置
  11. Bzoj 2393: Cirno的完美算数教室 容斥原理,深搜
  12. hdu4405:概率dp
  13. SSH登陆服务器的简单命令
  14. JAVA I/O使用方法(转)
  15. Underscore.js 的模板功能介绍与应用
  16. List,Set,Map三种接口的区别
  17. 【Vue】 ----- 浅谈vue的生命周期
  18. java web 项目打包(war 包)并部署
  19. Spring定时任务@Scheduled注解使用方式
  20. 《linux就该这么学》第八节课:第六章存储结构与磁盘划分

热门文章

  1. 买铅笔(NOIP2016)
  2. O365 Manager Plus详解
  3. RNN模型(递归神经网络)简介
  4. git常规命令
  5. c# 产生随机数 程序所在路径
  6. IntelliJ IDEA 2017版 spring-boot 2.03后 Pageable用法;Pageable用法,PageRequest过时,新用法;Pageable过时问题;
  7. .net 导出Excel
  8. 【慕课网实战】Spark Streaming实时流处理项目实战笔记九之铭文升级版
  9. 深入浅出javascript(八)this、call和apply
  10. linux 修改ip 地址