1、定义拓扑topology

public class MessageTopology {

    public static void main(String[] args) throws Exception {
//组装topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("RabbitmqSpout", new RabbitmqSpout());
topologyBuilder.setBolt("FilterBolt", new FilterBolt()).shuffleGrouping("RabbitmqSpout"); Config conf = new Config ();
try {
if (args.length > 0) {
StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology());
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("messageTopology", conf, topologyBuilder.createTopology());
}
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
}
}

2、定义数据源RabbitmqSpout

RabbitmqSpout继承自org.apache.storm.topology.IRichSpout接口,实现对应的方法:open(),close(),activate(),deactivate(),nextTuple(),ack(),fail()。

unconfirmedMap对象存储了MQ所有发射出去等待确认的消息唯一标识deliveryTag,当storm系统回调ack、fail方法后进行MQ消息的成功确认或失败重回队列操作(Storm系统回调方法会在bolt操作中主动调用ack、fail方法时触发)。

public class RabbitmqSpout implements IRichSpout {
private final Logger LOGGER = LoggerFactory.getLogger(RabbitmqSpout.class); private Map map;
private TopologyContext topologyContext;
private SpoutOutputCollector spoutOutputCollector; private Connection connection;
private Channel channel; private static final String QUEUE_NAME = "message_queue";
private final Map<String, Long> unconfirmedMap = Collections.synchronizedMap(new HashMap<String, Long>()); //连接mq服务
private void connect() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/"); connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
} @Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector; try {
this.connect();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
} @Override
public void close() {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
} @Override
public void nextTuple() {
try {
GetResponse response = channel.basicGet(QUEUE_NAME, false);
if (response == null) {
Utils.sleep(3000);
} else {
AMQP.BasicProperties props = response.getProps();
String messageId = UUID.randomUUID().toString();
Long deliveryTag = response.getEnvelope().getDeliveryTag();
String body = new String(response.getBody()); unconfirmedMap.put(messageId, deliveryTag);
LOGGER.info("RabbitmqSpout: {}, {}, {}, {}", body, messageId, deliveryTag, props); this.spoutOutputCollector.emit(new Values(body), messageId);
}
} catch (IOException e) {
e.printStackTrace();
}
} @Override
public void ack(Object o) {
String messageId = o.toString();
Long deliveryTag = unconfirmedMap.get(messageId);
LOGGER.info("ack: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size());
try {
unconfirmedMap.remove(messageId);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
} @Override
public void fail(Object o) {
String messageId = o.toString();
Long deliveryTag = unconfirmedMap.get(messageId);
LOGGER.info("fail: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size());
try {
unconfirmedMap.remove(messageId);
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
e.printStackTrace();
}
} @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("body"));
} @Override
public Map<String, Object> getComponentConfiguration() {
return null;
} @Override
public void activate() { } @Override
public void deactivate() { }
}

3、定义数据流处理FilterBolt

public class FilterBolt implements IRichBolt {
private final Logger LOGGER = LoggerFactory.getLogger(FilterBolt.class); private Map map;
private TopologyContext topologyContext;
private OutputCollector outputCollector; @Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
} @Override
public void execute(Tuple tuple) {
String value = tuple.getStringByField("body"); LOGGER.info("FilterBolt:{}", value);
outputCollector.ack(tuple);
} @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("body"));
} @Override
public Map<String, Object> getComponentConfiguration() {
return null;
} @Override
public void cleanup() { }
}

最新文章

  1. thinkPHP的用法之创建新项目
  2. jquery版时钟(css3实现)
  3. JMeter设置集合点
  4. 文件上传限制大小 dotnet/C#
  5. easyui datagrid使用
  6. Java Hour 26 Execution
  7. T-SQL JOIN
  8. 蘑菇街iOS客户端应用源码
  9. JavaScript的function对象
  10. JUnit4基础 使用JUnit4进行单元测试
  11. lsof查看进程打开了哪些文件目录套接字
  12. map对象建立家族姓氏查询
  13. WeQuant交易策略—KDJ
  14. java课设
  15. J2EE规范标准
  16. ansible基本使用方法
  17. sql对于字符串的处理
  18. kolla-ansible 一键安装openstack
  19. 《完全用Linux工作》作者:王垠
  20. MySQL Transaction--RC事务隔离级别下加锁测试

热门文章

  1. Backbone学习笔记 - Collection及Router篇
  2. CentOS IPv6设置
  3. Python 断言 assert 的用法
  4. tzhpxc
  5. Linux core 文件 gdb
  6. tomcat安装配置常见问题详解
  7. jQuery核心函数的四种不同用法
  8. WPF一步步开发XMPP IM客户端1:入门
  9. Java并发编程实践读书笔记(4)任务取消和关闭
  10. (转)inspect — Inspect live objects