1.引入依赖

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置生成的消息队列

spring:
rabbitmq:
host: 47.113.120.XX
port: 5672
password: XXXX
username: XXXX
virtual-host: XXX # rabbitmq 初始化配置
rabbit-init:
list:
- {exchange: "cs.user.topic",queues: [user.permission] , bindingKey: '#.permission', type: topic }

3.配置类

@ConfigurationProperties("rabbit-init")
@Data
public class RabbitMQInitProperty {
private List<RabbitEntity> list = new ArrayList<>();
}

4.RabbitMqConfig类

@Configuration
@Component
@Slf4j
public class RabbitMQConfig implements RabbitListenerConfigurer { /**
* 回调函数: confirm确认
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
if(!ack){
//可以进行日志记录、异常处理、补偿处理等
System.err.println("异常处理...."+cause);
}else {
//更新数据库,可靠性投递机制
}
}
};
/**
* 回调函数: return返回
*/
public final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
/**
* rabbitmq 初始配置
*/
@Autowired
private RabbitMQInitProperty property ;
/**
*
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 增加rabbitTemplate回调函数
*/
@Bean
public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setConfirmCallback(confirmCallback);
return rabbitTemplate;
} /**
*
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(){
return new RabbitAdmin(rabbitTemplate());
} /**
* 初始化消息队列
* @param rabbitAdmin
* @return
*/
@Bean
public RabbitMQInitProperty getRabbitMQProperty(RabbitAdmin rabbitAdmin){
List<RabbitEntity> list = property.getList();
if(StringUtils.isEmpty(list)) {
return null ;
}
list.stream().forEach(entity -> {
List<String> queues = entity.getQueues();
String binding = entity.getBindingKey();
String exchange = entity.getExchange();
String type = !StringUtils.isEmpty(entity.getType())? entity.getType() : ExchangeTypes.DIRECT;
if(StringUtils.isEmpty(queues) || StringUtils.isEmpty(binding)
|| StringUtils.isEmpty(exchange)
|| StringUtils.isEmpty(type)){
return;
}
Exchange exchangeTempt= new ExchangeBuilder(exchange, type).durable(true).build();
rabbitAdmin.declareExchange(exchangeTempt);
for(String str : queues){
Queue queue = QueueBuilder.durable(str).build();
rabbitAdmin.declareQueue(queue);
Binding bind = BindingBuilder.bind(queue).to(exchangeTempt).with(binding).noargs();
rabbitAdmin.declareBinding(bind);
}
});
return this.property ;
} /**
* 对象数据格式化
* @return
*/
@Bean
public MessageConverter messagetConverter() {
MessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
} @Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
} }

5.RabbitMQ 发送进行封装

public class RabbitSender {

    /**
* 自动注入RabbitTemplate模板类
*/
@Autowired
private RabbitTemplate rabbitTemplate; /**
*
* @return
*/
private CorrelationData getCorrelation(){
return new CorrelationData(UUID.randomUUID().toString().replace("-", ""));
} /**
*
* @param exchange
* @param routingKey
* @param message
*/
public void convertAndSend(String exchange,String routingKey, Object message){
CorrelationData correlation = getCorrelation();
log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),exchange,
routingKey,message.toString());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlation);
} /**
*
* @param entity
*/
public void convertAndSend(RabbitSenderEntity entity) {
CorrelationData correlation = getCorrelation();
log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),entity.getExchange(),
entity.getRouteKey(),entity.getParams());
rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRouteKey(), entity.getParams(), correlation);
}
}

6.测试使用

 @RequestMapping("/setUserPermission")
public ResultObj setUserPermission(@RequestBody UserInfo user){
try {
Assert.notNull(user);
RabbitSenderEntity entity = RabbitSenderEntity.builder()
.exchange("cs.user.topic")
.routeKey("user.permission")
.params(JsonMapperUtil.toString(user)).build();
sender.convertAndSend(entity);
} catch (Exception e) {
log.error(e.getMessage());
return ResultObj.failObj(e.getMessage());
}
return ResultObj.successObj("权限设置成功");
}
@RabbitListener(queues="user.permission")
public void setUserPermission(Message message, Channel channel) throws IOException {
try {
UserInfo user = RabbitUtil.getMessageBody(message, UserInfo.class);
userInfoService.updateById(user);
} catch (IOException e) {
log.error("消费方法{},爆出错误信息:{}","setUserPermission",e.getMessage());
} finally {
//告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

最新文章

  1. C#封装C++DLL
  2. 最小的K个数:用快排的思想去解相关问题
  3. [Everyday Mathematics]20150222
  4. centos安装ruby on rails
  5. [改善Java代码] 提倡异常的封装
  6. QLinkedList和std::forward_list(都是双向链表,不支持operator[],好处可能是插入和删除都比较快)
  7. CSS3 background-size:cover/contain
  8. struts1.x mvc简单例程
  9. 【转载】JAVA基础复习与总结&lt;三&gt; Object类的常用方法
  10. 南邮攻防训练平台逆向第四题WxyVM
  11. Python 学习笔记2 变量
  12. 20135202闫佳歆--week3 构造一个简单的Linux系统MenuOs--学习笔记
  13. 数据库行列转换sql
  14. 实习培训——Java基础(4)
  15. [LeetCode] 103. Binary Tree Zigzag Level Order Traversal _ Medium tag: BFS
  16. noip 2012 提高组 day2 部分题解
  17. mac和windows自动清理内存工具
  18. Android学习笔记_47_SIM卡介绍
  19. SQL 视图 局部变量 全局变量 条件语句 事务 触发器
  20. Java I/O 笔记

热门文章

  1. (九)DVWA之SQL Injection--SQLMap&amp;Fiddler测试(High)
  2. (前端)angular报错日常以及记录日常
  3. Python--编码转换
  4. 09.Django-信号
  5. CentOS7——搭建LNMP环境(WordPress案例)
  6. 通过Nginx、Consul、Upsync实现动态负载均衡和服务平滑发布
  7. EIGRP-15-其他和高级的EIGRP特性-1-路由器ID
  8. Jupyter的搭建
  9. mitmproxy的简单使用
  10. cb07a_c++_迭代器和迭代器的范围