Spring boot 集成MQ
2024-08-31 23:54:18
import lombok.extern.java.Log;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* @author 周志伟
* @projectname 项目名称: ${project_name}
* @classname: RabbitConfig
* @description:
* @date 2018/6/22:10:00
*/
@Configuration
@Log
public class RabbitConfig {
@Qualifier("firstConnectionFactory")
@Autowired
public ConnectionFactory connectionFactory; @Autowired
public MQProperties mqProperties; @Bean(name="firstRabbitTemplate")
public RabbitTemplate firstRabbitTemplate(){
log.info("########################初始化rabitTemplate################################");
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
} @Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
} @Bean
public TopicExchange defaultExchange() {
return new TopicExchange(mqProperties.getCar_exchange());
}
package com.car.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* @author 周志伟
* @projectname 项目名称: ${project_name}
* @classname: ConnectionFactoryConfig
* @description:初始化数据源
* @date 2018/6/22:11:30
*/
@Configuration
public class ConnectionFactoryConfig {
@Bean(name="firstConnectionFactory")
public ConnectionFactory firstConnectionFactory(@Value("${spring.rabbitmq.car_host}") String host,
@Value("${spring.rabbitmq.car_port}") int port,
@Value("${spring.rabbitmq.car_username}") String username,
@Value("${spring.rabbitmq.car_password}") String password,
@Value("${spring.rabbitmq.car_vhost}") String virtualhost
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
}
package com.car.config; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component; /**
* @author 周志伟
* @projectname 项目名称: ${project_name}
* @classname: RabbitMQ_Queue
* @description:创建队列
* @date 2018/6/22:13:09
*/
@Configuration
@Component
public class RabbitMQQueue extends RabbitConfig{
Logger logger = LoggerFactory.getLogger(RabbitMQQueue.class); public RabbitMQQueue(){
logger.info("############### RabbitMQ Queue 创建 ###############");
} @Bean
public Queue CreationCarQueue() {
logger.info("创建队列:{}", mqProperties.getCar_queuename());
return new Queue(mqProperties.getCar_queuename());
} @Bean
public Binding binding() {
logger.info("绑定队列,exchange:{},routingKey:{},queueName:{}", mqProperties.getCar_exchange(), mqProperties.getCar_routingkey(), mqProperties.getCar_queuename());
return BindingBuilder.bind(CreationCarQueue()).to(defaultExchange()).with(mqProperties.getCar_routingkey());
}
}
package com.car.modules.MqMsg.mq; import com.carloan.feign.info.InserttheorderServiceFeign;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /**
* @author 周志伟
* @projectname 项目名称: ${project_name}
* @classname: ReqCarMqMsg
* @description:配置监听
* @date 2018/5/28:10:50
*/ @Component
public class ReqCarMqMsg{ private Logger logger = LoggerFactory.getLogger(ReqCarMqMsg.class); @RabbitListener(queues = "${spring.rabbitmq.car_queuename}", containerFactory = "rabbitListenerContainerFactory")
public void process(@Payload byte[] bytes) throws UnsupportedEncodingException {
String result = null;
try {
result = new String(bytes, "UTF8");
logger.info("监听到消息==========="+ result);
} catch (UnsupportedEncodingException e) {
logger.error("监听车贷门店Mq: insuranceTrial UnsupportedEncodingException error :{}",e);
} catch (Exception e){
logger.error("监听车贷门店Mq: insuranceTrial Exception error :{}",e);
}
}
}
package com.car.modules.MqMsg.mq; import com.car.config.MQProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component; import java.util.UUID; /**
* @author 周志伟
* @projectname 项目名称: ${project_name}
* @classname: PushMq
* @description:推送消息
* @date 2018/6/25:9:52
*/
@Component
public class PushMq {
Logger logger = LoggerFactory.getLogger(PushMq.class); @Qualifier("firstRabbitTemplate")
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
public MQProperties mqProperties; public void pushmessage(String message){
try{
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(mqProperties.getCar_exchange(), mqProperties.getCar_put_routingkey(), message,correlationId);
logger.info("推送消息:{}", message);
}catch (Exception e){
logger.error("推送异常:{}",e);
} }
}
最新文章
- Bzoj1426 收集邮票
- Angularjs ng-if和ng-show的区别
- composer 一些使用说明
- js倒计时跳转链接
- iOS Apple Pay
- 自定义view获取宽高
- TCP/IP详解学习笔记(1)-基本概念
- Javascript基本格式
- javascript数据变量类型判断(JS变量是否是数组,是否是函数的判断)
- hdu 3836 Equivalent Sets(tarjan+缩点)
- 探索Windows命令行系列(7):通过命令编译C#类和Java类
- 使用javaMail实现简单邮件发送
- 关于陌生的依赖模块,如withStyles、react-apollo等
- flex实现三栏等分布局
- Qt基础学习---滑动条之QSlider
- Java ClassLoad详解
- SSIS 控制流和数据流
- Window 10 单机配置MYSQL主从同步
- is_array判断是否为数组
- ***XX-net 和 proxyee-down