原理场景

MQ在所有项目里面都很常见,

1、减少非紧急性任务对整个业务流程造成的延时;

2、减少高并发对系统所造成的性能上的影响;

举例几个场景:

1、给注册完成的用户派发优惠券、加积分、发消息等(派发优惠券、加积分、发消息这些属于非紧急性任务,可交由MQ进行处理,先让用户完成注册)

2、实时收集用户运动数据,并且收集数据后还需要比较复杂和耗时的操作才能完成业务处理(实时的数据采集任务一般并发量都是很高的,我们就应该先发送到MQ,再进行有序的处理)

另外说明一下,高并发的问题有很多种处理手段,而MQ是我认为的最稳健、简单的手段之一,所以我会优先使用

大概流程

首先用docker安装RabbitMQ,快捷

进入控制台,下图简单介绍下各个功能模块

大致流程

1、发送消息到MQ

2、MQ接收并保存消息等待消费

3、消费者有序地进行消息处理

Springboot中的使用

springboot中使用MQ超级简单

1、配置

#RabbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1 #这个根据自己业务情况来定

2、发送

 @Autowired
private AmqpTemplate amqpTemplate; @Test
public void testMq(){
User user = new User();
user.setName("cjh陈"); amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user)));
}

3、消费

  @RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = {@QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))})
public void process(Message message, com.rabbitmq.client.Channel channel) { Long deliveryTag = null;
String data = null;
try { deliveryTag = message.getMessageProperties().getDeliveryTag();
data = new String(message.getBody(), "UTF-8"); //你的业务处理
test01(data); } catch (Exception e) {
logger.warn("MQ异常 {} , {} , {}" , e.getMessage(),e.getCause(),data);
}finally {
logger.warn("======消息处理结束");
} try {
channel.basicAck(deliveryTag, false); // 确认消息成功消费(配置需要开启消费确认模式)
} catch (IOException e) {
logger.warn("======MQ应答出错,请检查");
}
}

3步完成使用

特别说明几点

1、AmqpTemplate 好像做不到发布确认,要用RabbitTemplate,发布确认我主要用在分布式事务的场景

2、containerFactory可以不配置,根据实际情况来,下面再说明

3、concurrency指每个listener在初始化的时候设置的并发消费者的个数(注意是每个),如果需求是一个队列只能有一个消费者的情况可以在 @RabbitListener 里面设置 exclusive = true, concurrency = 1

简单画了一下,,,留着自己看,,

4、declare = "false",很常用,意思就是说加入你MQ控制台里面已经新建好队列或者交换机了,这里就应该配false表示程序不再进行重新定义,不然容易发生报错(当重新定义的参数与已定义的参数不一致时就会报错)

5、权限方面的处理以后再记录,,

回到上面第2点,containerFactory什么时候会用到?

某些配置需要自定义,比如线程池的大小,

当concurrency数值放大的时候,比如100,我发现大部分的消费者并没有工作,这是因为被线程池的大小所限制,网上的人说线程池大小默认是50,我也没去查估计差不多也就这个数,那么这个时候我们就需要自定义的containerFactory:

package xxx;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; /**
* @author cjh
* @Package xxx
* @Description:
* @date: 2019/6/30 20:19
*/
@Configuration
@EnableRabbit
public class MqContainerFactory implements RabbitListenerConfigurer { /**
* containerFactory
* @Description: 自定义配置
* @param
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception{
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//必须是concurrency的两倍以上
ExecutorService service=Executors.newFixedThreadPool(200);
factory.setTaskExecutor(service);
factory.setPrefetchCount(1);
return factory;
} @Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
} @Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
} }

记录完成

有错欢迎指正,转载请注明博客出处:http://www.cnblogs.com/cjh-notes/

最新文章

  1. Information Management Policy(信息管理策略)的使用范例
  2. GitHub注册账号
  3. 为什么没有MMU的处理器无法安装操作系统?
  4. 框架基础——全面解析Java注解
  5. XAF使用数据库访问层缓存的提升性能
  6. ASP.NET 5概观 (ASP.NET 5 Overview)
  7. 十八、Android引导界面
  8. hdu 4740 The Donkey of Gui Zhou(暴力搜索)
  9. word模版另存为网页(*.htm,*.html),转为jsp页面并加入数据后导出成word
  10. Netty方法误解ChannelHandlerContext.writeAndFlush(Object msg)
  11. java集合系列——Map介绍(七)
  12. lnmp一键安装的卸载
  13. 大数据处理的三种框架:Storm,Spark和Samza
  14. SQL Server 取日期时间格式 日期与字符串之间的转换
  15. Testlink1.9.17使用方法(第六章 测试计划制定)
  16. 使用cmd命令删除文件夹下所有文件
  17. chrome升级后出现滚动条无法滚动
  18. ie6 表格td中无内容时不显示边框的解决办法
  19. Flask使用SQLAlchemy两种方式
  20. cordova 插件 调用iOS社交化分享(ShareSDK:微信QQ分享)

热门文章

  1. 错误:The following error occurred attempting to run the DNX design time process (dnx-clr-win-x86.1.0.0-rc1-final)
  2. caffe笔记
  3. Java面试题之Java虚拟机垃圾回收
  4. 如何创建一个前端 React 组件并发布到 NPM
  5. js页面内容只读,不可复制
  6. 【Git】六、分支管理&冲突解决
  7. vue-element-admin 之改变登录界面input的光标颜色
  8. PAT Basic 1072 开学寄语 (20 分)
  9. 树的总结(遍历,BST,AVL原型,堆,练习题)
  10. ACM-ICPC 2018 徐州赛区网络预赛 J. Maze Designer (最大生成树+LCA求节点距离)