一直对rabbitmq p2p 模式的多服务器下做相同配置的 各个服务器数据接受情况比较好奇

今天有空测试了下

xml 文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans
  5. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  6. http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  8. http://www.springframework.org/schema/rabbit
  9. http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
  10. <!--配置connection-factory,指定连接rabbit server参数 -->
  11. <rabbit:connection-factory id="connectionFactory"
  12. port="5672"  username="guest" password="guest" host="127.0.0.1"
  13. />
  14. <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
  15. <rabbit:admin connection-factory="connectionFactory" />
  16. <!--定义queue -->
  17. <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
  18. <!-- 定义direct exchange,绑定queueTest -->
  19. <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
  20. <rabbit:bindings>
  21. <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
  22. </rabbit:bindings>
  23. </rabbit:direct-exchange>
  24. <bean id="jsonMessageConverter"
  25. class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
  26. <!--定义rabbit template用于数据的接收和发送 -->
  27. <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"
  28. exchange="exchangeTest"   message-converter="jsonMessageConverter" />
  29. <!-- 消息接收者 -->
  30. <bean id="messageReceiver" class="com.bimatrix.revit.mq.MessageConsumer"></bean>
  31. <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
  32. <rabbit:listener-container connection-factory="connectionFactory">
  33. <rabbit:listener queues="queueTest" ref="messageReceiver"/>
  34. </rabbit:listener-container>
  35. </beans>
  1. </pre><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_4_8975174" name="code" class="html">
  1. </pre><p></p><p>消费者:</p><p></p><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_5_4133388" name="code" class="java">@Component
  2. public class MessageConsumer implements MessageListener {
  3. private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
  4. int receiveNum =0 ;
  5. @Override
  6. public void onMessage(Message message) {
  7. receiveNum++ ;
  8. //logger.info("receive message:{}",message);
  9. //      System.out.println("receive message:{}"+message);
  10. System.out.println("receive: "+receiveNum);
  11. }
  12. }

测试代码:

  1. @RunWith(value = SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = { "classpath*:config/applicationContext.xml" })
  3. public class TestQueue {
  4. @Autowired
  5. AmqpTemplate amqpTemplate;
  6. final String queue_key = "queueTestKey";
  7. @Test
  8. public void send() {
  9. try {
  10. //System.out.println("-----------------------------------------");
  11. for(int i=1;i<=1000;i++){
  12. amqpTemplate.convertAndSend(queue_key, i);
  13. //System.out.println(i);
  14. }
  15. //System.out.println("-----------------------------------------");
  16. } catch (Exception e) {
  17. //          LOGGER.error(e);
  18. }
  19. }

连续发送1000条数据给消费者,同一台机器8080 9080端口各启动一个tomcat 代码完全一样,

接受情况

A 机器 receive: 481

receive: 57  不知道为什么分两次打出来 ,在两个输出界面一个tomcat console 一个是junit的测试输出console

B 机器 receive: 462

三者合计就是1000  再次测试总和也是对的

说明两台服务器在p2p的配置下 各接受了一部分生产者发送来的数据

再多服务器下配置p2p 相当于阻塞队列 多个线程同时处理,各吃掉一部分队列的数据,起到了分流的效果

当然每个消费者那里还可以在起动线程池来处理各自接收的数据。

高并发情况下 这种配置也是可以减轻各个服务器压力

最新文章

  1. 【Linux】Vim语法高亮显示
  2. struts2 中属性驱动(其实就是struts2 action 中处理 request 的参数【old 方式servlet api 封装数据到javabean中(or beanutils)】),这里属性驱动是新方式
  3. ODOO从哪里开始??OpenERP的第一根线头儿
  4. OpenCV中的全景拼接例程
  5. mmc生产运输问题
  6. 转:ASCII码表_全_完整版
  7. 【Vue】详解Vue生命周期
  8. socket字符流循环截取
  9. Java中的io流学习(了解四大基类和基本步骤)
  10. [OutLook]关闭Outlook时最小化
  11. Maximum-SubsequenceSum
  12. 雷林鹏分享:XML 简介
  13. ONVIF协议学习笔记
  14. word2vec训练中文模型
  15. JSON实例(单对象)
  16. zabbix安装故障点分析
  17. hdu 5894(组合数取模)
  18. 【Lua】linux下lua+mod_lwt环境搭建
  19. 在Dubbo中使用高效的Java序列化(Kryo和FST)
  20. Sharepoint2013搜索学习笔记之自定义查询规则(十)

热门文章

  1. MAVEN项目(仓库中没有jar包)
  2. google浏览器插件安装
  3. PAT 1137 Final Grading[一般][排序]
  4. 线程、进程、daemon、GIL锁、线程锁、递归锁、信号量、计时器、事件、队列、多进程
  5. Python虚拟环境的安装
  6. Vue学习笔记之Vue组件
  7. mysql Alter 的问题
  8. Python isspace()方法--转载
  9. 反射中的 Class.forName() 与 ClassLoader.loadClass() 的区别
  10. Spring IOC 源码简单分析 01 - BeanFactory