1 storm求和简单操作

主要逻辑,就是spout发送数据源,blot进行处理数据,主要注意的点就是 spout这有个nextTuple自旋,和使用父类的declare..方法声明要发送到下游的名称,然后blot execute接受到进行执行

1.1代码实现

package com.xiaodao.big;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; import java.util.Map; /**
* 累积求和
*/
public class LocalSumStormTopology { /**
* spout 需要继承baserichspout
* 数据源需要产生并发送数据
*/
public static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector collector;
/**
* 初始化方法只会被调用一次
*
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
} int num = 0;
/**
* 会产生数据,在生产上肯定是从消息队列中获取数据
* 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
*/
public void nextTuple() {
collector.emit( new Values(num++));
System.out.println("Spout:发送 "+ num);
Utils.sleep(2000); } /**
* 声明下一个blot接受的名称,不然blot不知道接受到了什么
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("no"));
}
} /**
* 数据的累积求和 blot,接受数据,并处理
*/
public static class SumBlot extends BaseRichBolt{ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } int sum =0;
/**
* 也是一个自旋锁.(死循环)
* @param input
*/
public void execute(Tuple input) {
//这里获取方式有很多
Integer no = input.getIntegerByField("no");
sum +=no;
System.out.println("Blot: sum = ["+ sum+"]"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { }
} public static void main(String[] args) {
//任何一个作业都需要topology
//需要控制好blot spout 顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
Config conf = new Config();
conf.setNumWorkers(2);
//如果到200个消息就不发送了
conf.setMaxSpoutPending(200);
//创建一个本地的模式,不需要搭建
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
}
}

执行运行就可以

最新文章

  1. 配置linux----------------ip
  2. sqlserver 作业调度(作业常用的几个步骤)
  3. ETHREAD APC 《寒江独钓》内核学习笔记(4)
  4. HTML5入门7---"session的会话缓存"和"localStorage的cookie"缓存数据
  5. DJANGO中filter_horizontal和raw_id_fields的作用
  6. PHP socket类
  7. paip.将数据导入到在英语语音数据库mysql道路解决空原则问题
  8. java GUI简单记事本
  9. javascript 删除 url 中指定参数,并返回 url
  10. Python列表操作集合
  11. SQL SERVER 行列转换(动态)
  12. docker版本升级
  13. 20165308 预备作业3 Linux安装及学习
  14. 【Ansible 文档】配置
  15. Spark记录-官网学习配置篇(一)
  16. APMServ5.2.6 升级php5.2 到 5.3版本及Memcache升级!
  17. HDU 3726 Graph and Queries (离线处理+splay tree)
  18. Codeforces Round #346 (Div. 2) E. New Reform dfs
  19. golang的beego框架开发时出现的问题纪录
  20. BZOJ1934:[SHOI2007]善意的投票 & BZOJ2768:[JLOI2010]冠军调查——题解

热门文章

  1. SQL 高级查询(层次化查询,递归)
  2. 新更新kb4493472导致无法正常开机
  3. Python + PyQt5 实现美剧爬虫可视工具(二)
  4. DotNetCore跨平台~EFCore2.0连接Mysql的烦恼-已解决
  5. JavaScript夯实基础系列(三):this
  6. 阿里 Java 手册系列教程:为啥强制子类、父类变量名不同?
  7. bootsect及setup
  8. Redis协议规范(RESP)
  9. Linux下PAM模块学习总结
  10. ASP.NET Core 基于JWT的认证(一)