最近做一个新需求,用户发布了动态,前台需要查询,为了用户读取信息响应速度更快(MySQL很难实现或者说实现起来很慢),所以在用户动态发布成功后,利用消息机制异步构建 redis缓存 和 elasticsearch索引 。

开发环境

rabbitMQ服务端,docker安装

拉取rabbit-mq镜像
docker pull hub.c..com/library/rabbitmq:3.6.-management 运行镜像
docker run -d --name rabbitmq --publish : --publish : --publish : --publish : --publish : --publish : hub.c..com/library/rabbitmq:3.6.-management 后台地址:
http://192.168.1.8:15672

消息生产端(PHP):

composer 安装 rabbitmq客户端
composer require php-amqplib/php-amqplib 生产广播消息官方demo
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_publisher_fanout.php

应用中代码

<?php
/**
* User: szliugx@gmail.com
* Date: 2018/6/18
* Time: 下午1:54
*/ namespace App\ThirdParty\Message; use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage; class AmqpPublisher
{
public function send($content)
{
$exchange = 'message.fanout.exchange';
// 创建连接
$connection = new AMQPStreamConnection(
config('app.mq_host'),
config('app.mq_port'),
config('app.mq_user'),
config('app.mq_pass'),
config('app.mq_vhost')
);
$channel = $connection->channel();
/*
name: $exchange
type: fanout
passive: false // don't check is an exchange with the same name exists
durable: false // the exchange won't survive server restarts
auto_delete: true //the exchange will be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, 'fanout', false, true, false);
$messageBody = $content;
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain'));
$channel->basic_publish($message, $exchange);
// 关闭通道
$channel->close();
// 关闭连接
$connection->close();
}
}

消息消费端(Java):

引入maven依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置广播队列信息

package cn.taxiong.release.config;

import cn.taxiong.release.constant.QueueConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* RabbitMQFanout模式配置
*
* @author szliugx@gmail.com
* @create 2018-06-18 下午4:04
**/
@Slf4j
@Configuration
public class RabbitMQFanoutConfig { @Bean
public Queue createFanoutQueueCache() {
log.info( "创建了FanoutQueue cache 缓存 队列" );
return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME);
} @Bean
public Queue createFanoutQueueIndex() {
log.info( "创建了FanoutQueue index 缓存 队列" );
return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME);
} @Bean
public FanoutExchange fanoutExchangeRelease() {
log.info( "创建了fanoutExchange交换机" );
return new FanoutExchange( QueueConstants.MESSAGE_FANOUT_EXCHANGE);
} @Bean
public Binding fanoutExchangeCacheQueueBinding() {
log.info( "将FanoutQueue cache 队列绑定到交换机fanoutExchange" );
return BindingBuilder.bind( createFanoutQueueCache() ).to( fanoutExchangeRelease() );
} @Bean
public Binding fanoutExchangeIndexQueueBinding() {
log.info( "将FanoutQueue index 队列绑定到交换机fanoutExchange" );
return BindingBuilder.bind( createFanoutQueueIndex() ).to( fanoutExchangeRelease() );
}
}

队列常量信息

package cn.taxiong.release.constant;

/**
* 队列常量
*
* @author szliugx@gmail.com
* @create 2018-06-14 下午7:02
**/
public interface QueueConstants {/**
* 消息交换
*/
String MESSAGE_FANOUT_EXCHANGE = "message.fanout.exchange"; /**
* 发布缓存消息队列名称
*/
String MESSAGE_QUEUE_RELEASE_CACHE_NAME = "message.release.cache.queue"; /**
* 发布索引消息队列名称
*/
String MESSAGE_QUEUE_RELEASE_INDEX_NAME = "message.release.index.queue";
}

缓存(cache)服务消费消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; /**
* 消息消费
*
* @author szliugx@gmail.com
* @create 2018-06-14 下午7:14
**/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME)
public class MessageConsumer { @Autowired
private OperateReleaseService operateReleaseService; @RabbitHandler
public void handler(@Payload String message) {
// operateReleaseService.storeReleaseRedisCache(message);
log.info("缓存cache消息消费1:{}", message);
}
}

索引(index)服务消费消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; /**
* 消息消费
*
* @author szliugx@gmail.com
* @create 2018-06-14 下午7:14
**/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME)
public class MessageConsumer2 { @Autowired
private OperateReleaseService operateReleaseService; @RabbitHandler
public void handler(@Payload String message) {
log.info("索引消息 index 消费2:{}", message);
}
}

最新文章

  1. 敏捷开发与jira之燃烧图
  2. Postgresql-xl 调研
  3. nyoj 925 国王的烦恼(最小生成树)
  4. [Android Pro] StarUML 版本破解
  5. BizTalk动手实验(十二)WCF-Oracle适配器使用
  6. 【转】发布一个基于NGUI编写的UI框架
  7. Eclipse 的 Debug 介绍与技巧
  8. JavaWeb学习记录(十七)——JSP九大隐式对象
  9. 2014 ACM/ICPC Asia Regional Anshan Online
  10. 分享:Perl打开与读取文件的方法
  11. str_replace使用array替换
  12. 转载 C#中静态类和非静态类比较
  13. Silverlight中DataGrid的显示指定列、修改默认列名和格式化日期数据和小数数据
  14. C++代码覆盖率工具Coverage Validator
  15. Oracle EBS-SQL (OM-4):检查发运网络.sql
  16. 配置系统引导启动SuperScoekt
  17. 【原创】整合Spring4+Hibernate4+Struts2时NullPointerException问题解决
  18. 初探Google Guava
  19. 第四周学习总结-HTML
  20. (String中)正则表达式使用如下

热门文章

  1. 微信小程序:WXSS 样式
  2. HeyWeGo第三周项目总结
  3. NOIP 2018 兔纸旅游记
  4. Android中的代理模式
  5. 浅谈Android Studio3.0更新之路(遇坑必入)
  6. vijos 1082
  7. HDU 4826 (分类DP)
  8. 探究JS中对象的深拷贝和浅拷贝
  9. C primer plus 5 读书笔记3
  10. bzoj1074