原文:http://www.cnblogs.com/kqdongnanf/p/4778672.html

------------------------------------------------------------------------------------------------------------------------------------

关于Storm tick

1. tick的功能

Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自__systemd的__tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。

Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.1上测试。

2. 在代码中使用tick及其作用

在代码中如需使用tick,可以参照下面的方式:

2.1. 为bolt设置tick

若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。

getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以”Topology.*”开头的此bolt特定的Config

这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自__systemd的__tick stream的tick tuple,因此execute()方法可以实现如下:

2.2. 为Topology全局设置tick

若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:

2.3. tick设置的优先级

与Linux中的环境变量的优先级类似,storm中的tick也有优先级,即全局tick的作用域是全局bolt,但对每个bolt其优先级低于此bolt定义的tick。

这个参数的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具有一定的迷惑性,一眼看上去应该是Topology全局的,但实际上每个bolt也可以自己定义。

2.4. tick的精确度

Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1ms左右。测试环境:3台虚拟机做supervisor,每台配置:4Cpu、16G内存、千兆网卡。

3. storm tick的实现原理

在bolt中的getComponentConfiguration()定义了该bolt的特定的配置后,storm框架会在TopologyBuilder.setBolt()方法中调用bolt的getComponentConfiguration()方法,从而设置该bolt的配置。

调用路径为:TopologyBuilder.setBolt()

-> TopologyBuilder.initCommon()

-> getComponentConfiguration()

4. 附件

测试使用的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package storm.starter;
 
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
 
public class MyTickTestTopology {
 
  public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();
     
     
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
       
      if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
         && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
          System.out.println("################################WorldCount bolt: "
                                 new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
      }
      else{
          collector.emit(new Values("a"1));
      }
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word""count"));
    }
     
    @Override
    public Map<String, Object> getComponentConfiguration() {
            Config conf = new Config();
            conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);
            return conf;
    }
  }
   
  public static class TickTest extends BaseBasicBolt{
       
       
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            // 收到的tuple是tick tuple
          if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
             && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
              System.out.println("################################TickTest bolt: "
                                  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
            }
            // 收到的tuple时正常的tuple
          else{
              collector.emit(new Values("a"));
            }
           
        }
 
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("test"));
        }
         
        @Override
        public Map<String, Object> getComponentConfiguration() {
                Config conf = new Config();
                conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20);
                return conf;
        }
      }
 
  public static void main(String[] args) throws Exception {
 
    TopologyBuilder builder = new TopologyBuilder();
 
    builder.setSpout("spout"new RandomSentenceSpout(), 3);
    builder.setBolt("count"new WordCount(), 3).shuffleGrouping("spout");
    builder.setBolt("tickTest"new TickTest(), 3).shuffleGrouping("count");
 
    Config conf = new Config();
    conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
    conf.setDebug(false);
 
    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);
 
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());
 
//      Thread.sleep(10000);
//      cluster.shutdown();
    }
  }
}

  

原创文章,转载请注明:转载自kqdongnanf-博客园;Email:kqdongnanf@yahoo.com。

最新文章

  1. C#-WinForm-ListView-表格式展示数据、如何将数据库中的数据展示到ListView中、如何对选中的项进行修改
  2. uc_client是如何与UCenter进行通信的
  3. js代码优化
  4. 在SQL语言中,join什么时候用,什么时候不用啊?请高手举例解释一下。谢谢
  5. poj 3250 栈应用
  6. 关于AFinal的混淆
  7. SharePoint 2013 强制安装解决方案
  8. 使用ConcurrentDictionary实现轻量缓存
  9. 5.7.1.3 Global 对象的属性
  10. Windows 7上使用HP QC的问题
  11. iOS项目评估报告
  12. windows下pycharm远程调试pyspark
  13. JAVA线程池应用的DEMO
  14. 201521123105 《Java程序设计》第1周学习总结
  15. xamarin android viewpager的用法
  16. ubuntu下加载mcypt
  17. Python中文词频统计
  18. iOS NSDictionary JSON 相互转换
  19. 17秋 软件工程 团队第五次作业 Alpha Scrum1
  20. json文件不能有注释

热门文章

  1. react中的jsx详细理解
  2. 《少年先疯队》第九次团队作业:Beta冲刺第三天
  3. echart-柱状图
  4. ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath
  5. Eclipse Code Recommenders 自动补全(联想)神器
  6. js模块化入门与commonjs解析与应用
  7. Word转html并移植到web项目
  8. Spring上传报错413
  9. java中HashMap,LinkedHashMap,TreeMap,HashTable的区别
  10. ajax 请求json数据中json对象的构造获取问题