前情提要

之前我的项目里有一个合作编辑的功能,多个客户端的用户可以合作写一篇文章的不同部分,而且合作的任意作者互相还可以进行文字通讯。这种需求肯定是首选websocket了,因为服务器需要主动给客户端推送消息,维持一个长连接是最经济实惠的手段。如果一个客户端需要给另一个客户端做推送,那肯定是需要中间服务器作为中转。当两个客户端注册到同一个服务器的时候,发送方带上消息接受者的相关信息,服务器替发送方去对接受者做推送,这样实现了通讯的功能。

那么,这个设计有没有什么问题呢?

在微服务的时代,很多时候需要考虑的是部署多台机器的情况。这时候,上述的思路就不好用了。例如,当客户端A注册到了第一台机器,客户端B注册到了第二台机器,这时候如果客户端A还希望第一台机器为他完成转发的功能,显然是找不到客户端B的。这就是websocket的集群化问题。这问题也让我陷入了深深的思考,那就是产生了一个问题:微信是如何做出来的?

设计个微信?

当很久很久以前,微信还是一个小而美的通讯软件,只有通讯的功能和朋友圈功能。在这个时期,不妨思考思考微信会怎么做。消息通讯的核心仍然是websocket,只不过微信不可能只有一台服务器,微信肯定是一个集群才能正常运转,只有一台机器维持不了那么多的连接。这时候很容易想到微信应该是利用了消息中间件对消息进行了精准投递。对于不在线的用户,微信应该是将离线消息储存到了数据库,用户上线时进行查库,收到消息。

何为“精准投递”?

当客户端A想给客户端B投送消息时,服务器1需要找到客户端B注册到了哪台机器,这时可以考虑用数据库将客户端B建立连接的机器写到数据库里,通过这个数据去找到对应的机器。这时候,消息队列就产生了它应有的作用。核心在于Rabbit MQ的Topic交换机可以绑定多条队列,而且可以选择性推送到指定的队列。所以这时候就产生一个思考,每台机器进行一条队列的绑定,服务器和队列的对应关系可以使用服务器的ip地址,这样保证了唯一性,而且可以复用这个ip标志。

思路设计

当客户端A和服务器1建立连接时,需要往Redis里维持一份会话,这份会话信息保存了客户端A和服务器1建立了连接,也就是保存了服务器1的ip地址。客户端2也是如此。当客户端A想给客户端B发送消息时,先从redis里找到客户端B注册到机器的ip地址,进而确定消息投递的队列。当客户端B收到消息后,将消息推送到客户端B。

代码

那么看看代码吧:

 public static String WS_QUEUE = "wx_queue_";
public static final String WS_TOPIC_EXCHANGE = "wx_topic_exchange";
public static final String WS_BINDING_KEY = "wx_exchange_"; @Bean("WS_QUEUE")
public Queue wsQueue() {
WS_QUEUE += serverIpHost;
return new Queue(WS_QUEUE);
} //交换机
@Bean("WS_TOPIC_EXCHANGE")
public TopicExchange wsExchange() {
return new TopicExchange(WS_TOPIC_EXCHANGE);
} //绑定队列和ES交换机
@Bean
public Binding wsTopicBinding(@Qualifier("WS_QUEUE") Queue wsQueue, @Qualifier("WS_TOPIC_EXCHANGE") TopicExchange wsExchange) {
return BindingBuilder.bind(wsQueue).to(wsExchange).with(WS_BINDING_KEY + serverIpHost);
}

将本机ip写到配置文件里,在初始化队列的时候,将队列名字和该ip关联起来。不同机器这个地方是需要改配置的。也就是一台机器对应一个队列。

对于某一个通讯ws接口:

    @MessageMapping("/chat/{from}/{to}/{blogId}")
public void chat(String msg, @DestinationVariable String from, @DestinationVariable Long to, @DestinationVariable Long blogId) {
UserEntityVo userEntityVo = MyUtils.jsonToObj(redisTemplate.opsForHash().get(Const.CO_PREFIX + blogId, to.toString()), UserEntityVo.class);
if (userEntityVo != null) {
String toServerIpHost = userEntityVo.getServerIpHost();
String idStr = blogId.toString();
String toStr = to.toString();
ChatDto dto = MyUtils.transferToDto(Message.class, ChatDto.class, new Object[]{msg, from, toStr, idStr},
new Class[]{msg.getClass(), from.getClass(), toStr.getClass(), idStr.getClass()});
rabbitTemplate.convertAndSend(
RabbitConfig.WS_TOPIC_EXCHANGE,RabbitConfig.WS_BINDING_KEY + toServerIpHost,
dto);
}
}

即是从redis查出对方所在机器的ip地址,然后投递到那个队列里。


@Bean("WSMessageListener")
//processMessage作为listener
MessageListenerAdapter wSMessageListener(WSMessageHandler wSMessageHandler) {
return new MessageListenerAdapter(wSMessageHandler, "processMessage");
} //在container内将queue和listener绑定
@Bean("WSMessageListenerContainer")
SimpleMessageListenerContainer wSMessageListenerContainer(ConnectionFactory connectionFactory,
@Qualifier("WSMessageListener") MessageListenerAdapter listenerAdapter,
@Qualifier("WS_QUEUE") Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queue.getName());
container.setMessageListener(listenerAdapter);
return container;
} public void processMessage(MessageDto msg) { String methodName = msg.getMethodName(); switch (methodName) {
...
case "chat":
Container<Message> containerV4 = msg.getData();
Message message = containerV4.getData();
String id = message.getBlogId();
String to = message.getTo();
simpMessagingTemplate.convertAndSendToUser(to, "/" + id + "/queue/chat", message);
break;
... }

这里监听到方法的机器其实就是客户端B所在的机器,其他机器不监听这个队列,是不会做消息转发的尝试的。这里直接推送给客户端B就可以了。

方案的优点

这个方案的优点就是理论上集群可以无限扩张,每台机器的代码并不需要改,只是配置文件会有一些差异。当然也可以将ip地址换成uuid,只要有唯一性就可以了。

最新文章

  1. ASP.NET Core 中文文档 第三章 原理(5)错误处理
  2. jQuery的使用
  3. 谷歌的网页排序算法(PageRank Algorithm)
  4. 更快、更强——解析Hadoop新一代MapReduce框架Yarn(CSDN)
  5. Linux: .vimrc
  6. SQL 2005 日志损坏的恢复方法
  7. hdu 4081 Qin Shi Huang&#39;s National Road System(最小生成树+dp)2011 Asia Beijing Regional Contest
  8. linux进程间通信--无名管道
  9. C++ I/O标准库
  10. range和xrange的区别详解
  11. Java 程序员必须了解的 7 个性能指标
  12. Python3个人学习笔记--每天一点一滴成长!
  13. .10-浅析webpack源码之graceful-fs模块
  14. pandas数组(pandas Series)-(2)
  15. Breathing During Sleep
  16. 工程脚本插件方案 - c集成Python基础篇
  17. 算法笔记_193:历届试题 连号区间数(Java)
  18. (转)ALSA配置文件(alsa.conf, asoundrc, asound.conf)及其自动加载 And HDMI Adiuo
  19. sqli-labs学习笔记 DAY2
  20. win 7 64 安装 tensorflow

热门文章

  1. windows server 2012以上版本离线安装 net framework3.5 方法
  2. 》》》Java利用aspose-words将word文档转换成pdf(破解 无水印)
  3. 前端导出文件流[object Object]
  4. vue打包后打开index.html文件显示空白页问题
  5. IMX6Ull驱动
  6. js中的this的指向问题
  7. 【NPDP专项练习】第五章 工具与绩效度量
  8. 手机安装python环境
  9. WPF_MVVM框架(5)
  10. 解决 Outlook 的 Teams 会议加载项ID/链接等问题