我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。

为了方便看源代码,我先上我们的继承类:

public class SplitSentenceBolt extends BaseBasicBolt {  public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}

  //5:执行我们自己的逻辑处理方法,接收传入的参数。
  public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = (String)input.getValueByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
word = word.toLowerCase();
collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}

通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序

public class BasicBoltExecutor implements IRichBolt {
public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); private IBasicBolt _bolt;
private transient BasicOutputCollector _collector;
//1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。
public BasicBoltExecutor(IBasicBolt bolt) {
_bolt = bolt;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。
}
 //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_bolt.prepare(stormConf, context);
_collector = new BasicOutputCollector(collector);
}
  //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
public void execute(Tuple input) {
_collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。
try {
_bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。
_collector.getOutputter().ack(input);//这个地方就是响应
} catch(FailedException e) {
if(e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);//这个地方就是响应
}
}
public void cleanup() {
_bolt.cleanup();
}
public Map<String, Object> getComponentConfiguration() {
return _bolt.getComponentConfiguration();
}
}
public class BasicOutputCollector implements IBasicOutputCollector {
private OutputCollector out;
private Tuple inputTuple;
public BasicOutputCollector(OutputCollector out) {
this.out = out;
}
//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
   public void setContext(Tuple inputTuple) {
this.inputTuple = inputTuple;
}
   //6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple来保持tracker链路,
而这个anchor tuple就是bolt接收到转换前的源tuple数据。
  public List<Integer> emit(List<Object> tuple) {
     return emit(Utils.DEFAULT_STREAM_ID, tuple);
   }
public List<Integer> emit(String streamId, List<Object> tuple) {
return out.emit(streamId, inputTuple, tuple);
}
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
out.emitDirect(taskId, streamId, inputTuple, tuple);
}
public void emitDirect(int taskId, List<Object> tuple) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
protected IOutputCollector getOutputter() {
return out;
}
public void reportError(Throwable t) {
out.reportError(t);
}
}

这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。

这个

最新文章

  1. java中的注解(Annotation)
  2. 安装R语言扩展包diveRsity-1
  3. php的数据循环 之li的3个类判断
  4. java中打印变量地址
  5. .net开发笔记(十六) 对前部分文章的一些补充和总结
  6. uboot命令及内核启动参数
  7. SharePoint 2013 JQuery Asset Picket
  8. 3.3.1实现Servlet
  9. 霍夫变换(hough transform)
  10. Error -27791: Server xx has shut down the connection prematurely
  11. 自定义控件【圆形】圆角 BitmapShader
  12. Spring 学习笔记01
  13. Spring 之 注解详解
  14. TensorFlow 深度学习笔记 Stochastic Optimization
  15. jQuery事件绑定方法bind、 live、delegate和on的区别
  16. 仿QQ空间视差效果,ListView.setHeader( )
  17. 百度api集合!
  18. 前端开发面试题总结之——JAVASCRIPT(一)
  19. iOS关于图片点到像素转换之杂谈
  20. C#设计模式(13)——代理模式(Proxy Pattern)(转)

热门文章

  1. VVDocumenter 使用
  2. 有关bootstrap之排版
  3. [原创]直播服务器简单实现 http_flv和hls 内网直播桌面
  4. EasyUI DateTimeBox设置默认时间的注意点
  5. Neutron 默认安全组规则 - 每天5分钟玩转 OpenStack(115)
  6. SQL Server-数据库架构和对象、定义数据完整性(二)
  7. JQuery EasyUI datagrid 复杂表头处理
  8. 当Azure里的虚拟机网卡被禁用了之后……
  9. PHP面试题目搜集
  10. Android中TextView setText int 报错