学习storm实现求和操作
2024-08-25 15:20:26
1 storm求和简单操作
主要逻辑,就是spout发送数据源,blot进行处理数据,主要注意的点就是 spout这有个nextTuple自旋,和使用父类的declare..方法声明要发送到下游的名称,然后blot execute接受到进行执行
1.1代码实现
package com.xiaodao.big; import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; import java.util.Map; /**
* 累积求和
*/
public class LocalSumStormTopology { /**
* spout 需要继承baserichspout
* 数据源需要产生并发送数据
*/
public static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector collector;
/**
* 初始化方法只会被调用一次
*
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
} int num = 0;
/**
* 会产生数据,在生产上肯定是从消息队列中获取数据
* 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
*/
public void nextTuple() {
collector.emit( new Values(num++));
System.out.println("Spout:发送 "+ num);
Utils.sleep(2000); } /**
* 声明下一个blot接受的名称,不然blot不知道接受到了什么
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("no"));
}
} /**
* 数据的累积求和 blot,接受数据,并处理
*/
public static class SumBlot extends BaseRichBolt{ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } int sum =0;
/**
* 也是一个自旋锁.(死循环)
* @param input
*/
public void execute(Tuple input) {
//这里获取方式有很多
Integer no = input.getIntegerByField("no");
sum +=no;
System.out.println("Blot: sum = ["+ sum+"]"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { }
} public static void main(String[] args) {
//任何一个作业都需要topology
//需要控制好blot spout 顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
Config conf = new Config();
conf.setNumWorkers(2);
//如果到200个消息就不发送了
conf.setMaxSpoutPending(200);
//创建一个本地的模式,不需要搭建
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
}
}
执行运行就可以
最新文章
- 配置linux----------------ip
- sqlserver 作业调度(作业常用的几个步骤)
- ETHREAD APC 《寒江独钓》内核学习笔记(4)
- HTML5入门7---";session的会话缓存";和";localStorage的cookie";缓存数据
- DJANGO中filter_horizontal和raw_id_fields的作用
- PHP socket类
- paip.将数据导入到在英语语音数据库mysql道路解决空原则问题
- java GUI简单记事本
- javascript 删除 url 中指定参数,并返回 url
- Python列表操作集合
- SQL SERVER 行列转换(动态)
- docker版本升级
- 20165308 预备作业3 Linux安装及学习
- 【Ansible 文档】配置
- Spark记录-官网学习配置篇(一)
- APMServ5.2.6 升级php5.2 到 5.3版本及Memcache升级!
- HDU 3726 Graph and Queries (离线处理+splay tree)
- Codeforces Round #346 (Div. 2) E. New Reform dfs
- golang的beego框架开发时出现的问题纪录
- BZOJ1934:[SHOI2007]善意的投票 &; BZOJ2768:[JLOI2010]冠军调查——题解