先说下背景:上周开始给项目添加曾经没有过的消息中间件。虽然说,一路到头非常容易,直接google,万事不愁可是生活远不仅是眼前的“苟且”。首先是想使用其他项目使用过的一套对mq封装的框架,融合进来。虽然折腾了上周六周日两天,总算吧老框架融进项目中了,可是周一来公司和大数据哥们儿一联调发现,收不到数据!所以没办法,当场使用原生那一套撸了个版本出来可是,可是,可是,俗话说得好:生命在于折腾!在上周末融合老框架的时候,我把源码读了遍,发现了很多很好的封装思想,Ok,这周末总算闲了下来,我就运用这个思想,封装一个轻量级的呗,说干就干!

主要思想

说到封装,我想,应该主要是要尽可能减小用户使用的复杂度,尽量少的进行配置,书写,甚至能尽量少的引入第三发或是原生类库。所以在这种想法之下,这套框架的精髓主要在以下几点:

  • 使用注解,减少用户配置
  • 将不同的生产者消费者的初始化方式统一
  • 初次注册生产者或者消费者的时候,进行队列的自动注册
  • 再统一的初始化方式中,使用动态代理的方式,代理到具体的生产者或是消费者的发送接收方法

在这种模式下,我们不用过多的配置,直接建立一个接口,接口上面使用注解声明队列的名称,然后使用同一的Bean进行初始化,就齐活了!

统一初始化Bean的实现

不说啥,直接上代码:


public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> { private Logger logger = LoggerFactory.getLogger(getClass()); private Class<?> serviceInterface; @Autowired
private ConnectionFactory rabbitConnectionFactory; @Value("${mq.queue.durable}")
private String durable; @Value("${mq.queue.exclusive}")
private String exclusive; @Value("${mq.queue.autoDelete}")
private String autoDelete; @SuppressWarnings("unchecked") /**
这个方法很特殊,继承自FactoryBean,就是说管理权归属IoC容器。每次注册一个队列的时候,并且注入到具体的service中使用的时候,就会调用这个getObject方法。所以,对于使用本类初始化的bean,其类型并非本类,而是本类的属性serviceInterface类型,因为最终getObject的结果是返回了一个动态代理,代理到了serviceInterface。
**/
@Override
public T getObject() throws Exception { //初始化
if (getQueueName() != null) {
logger.info("指定的目标列队名[{}],覆盖接口定义。", getQueueName());
} else {
RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);
if (name == null)
throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "没有指定@RPCQueueName");
setQueueName(name.value());
}
//创建队列
declareQueue();
logger.info("建立MQ客户端代理接口[{}],目标队列[{}]。", serviceInterface.getCanonicalName(), getQueueName()); return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//动态代理到目标接口
} private void declareQueue() {
Connection connection = rabbitConnectionFactory.createConnection();
Channel channel = connection.createChannel(true);
try {
channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive)
, Boolean.valueOf(autoDelete), null);
logger.info("注册队列成功!");
} catch (IOException e) {
logger.warn("队列注册失败", e);
}
}
...... } public class RabbitMQProducerInterceptor implements InvocationHandler { private Logger logger = LoggerFactory.getLogger(getClass()); private String queueName; @Autowired
private AmqpTemplate amqpTemplate; @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object sendObj;
Class<?>[] parameterTypes = method.getParameterTypes();
String methodName = method.getName();
boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);
if (isSendOneJson) {
sendObj = args[0];
logger.info("发送单一json字符串消息:{}", (String) sendObj);
} else {
sendObj = new RemoteInvocation(methodName, parameterTypes, args);
logger.info("发送封装消息体:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));
} logger.info("发送异步消息到[{}],方法名为[{}]", queueName, method.getName());
//异步方式使用,同时要告知服务端不要发送响应
amqpTemplate.convertAndSend(queueName, sendObj);
return null; } ......
}

下面是核心的配置文件


<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false"
xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <rabbit:connection-factory id="rabbitConnectionFactory"
host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"
username="${mq.username}" password="${mq.password}" /> <!-- 供自动创建队列 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/> <!-- 创建生产者 -->
<bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean">
<property name="serviceInterface" value="com.example.demo.ISendMsg" />
</bean> </beans>

说明:每次要使用mq,直接导入这个基本配置,和基础jar包即可。对于配置文件中的生产者声明,已经直接简化到三行,这一部分可以单独创建一个类似于producer-config.xml专门的配置文件。

附属类

这里主要就是涉及一个注解类:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RPCQueueName { String value();
}

说明:主要用于队列名称的声明。可以拓展的再建立其他的注解类,并在RabbitMQProducerFactoryBean中进行具体的逻辑实现。对于未来功能添加,起到了非常好的解耦效果。

具体的接口:

@RPCQueueName("test.demo.ISendMsg")
public interface ISendMsg { void sendMsg(String msg);
}

说明:这样,就声明了个队列名叫test.demo.ISendMsg的生产者,每次讲IsendMsg注入到要发送消息的Service里面,直接调用sendMsg即可向注解声明的队列发送消息了。

恩,开源

写了个springboot的小demo:

github地址

接下来我会更新消费者的封装,今天先放一放,出去动动。。哈哈

最新文章

  1. C#写入和读出文本文件
  2. 关于scrolltop 兼容 IE6/7/8, Safari,FF的方法
  3. getGLES1ExtensionString: Could not find GLES 1.x config!
  4. [algorithm] My rookie plan to start
  5. Moon.Orm 5.0(MQL版)分页功能的设计(求指教,邀请您的加入)
  6. GTD时间管理---非行动性
  7. hdu - 4608 - I-number
  8. HTML 中 META的作用
  9. python脚本工具 - 3 目录遍历
  10. bzoj 1800: [Ahoi2009]fly 飞行棋 暴力
  11. ArrayList 、Vector、 LinkList
  12. Poj3484-Showstopper(二分脑洞题)
  13. BZOJ 3503: [Cqoi2014]和谐矩阵( 高斯消元 )
  14. 【Selenium】idea的selenium环境配置
  15. TCP状态转换
  16. deep learning RNN
  17. C# -- 索引器、枚举类型
  18. java之jdbc使用
  19. 系统引导修复,grub2下的各种骚作
  20. Android面试之HashMap的实现原理

热门文章

  1. Python实现一些常用排序算法
  2. 使用everything把一个文件夹里(包含子目录)的所有图片拷贝到另一个文件夹
  3. Error Correct System CodeForces - 527B
  4. Excel2010 日文显示乱码
  5. Struts2【拦截器】就是这么简单
  6. 【BZOJ1087】【SCOI2005】互不侵犯(状态压缩,动态规划)
  7. JavaScript之BOM
  8. Java interview questions(No1)
  9. 创建Maven项目时提示web.xml is missing and &lt;failOnMissingWebXml&gt; is set to true错误解决方案
  10. Loadrunner11不能调用IE8解决方法大全