前言

之前介绍了使用devTools进行索引库数据的crud,这里使用的是java程序,使用中间件activeMQ进行数据库和索引库数据的同步。主要是用来完成对数据库的修改来完成对索引库的同步。

正文

前提准备:

1. 索引信息:

结构化的索引,在索引的setting中,使用的是ik分词器,级别是ik-max-word。

mapping映射信息中,使用的dynamic = false,如果不能匹配到结构化索引的字段,则不进行数据的添加和更新。这样的好处是:不添加多余的不需要索引的字段,并且如果添加的字段多的话可以过滤掉无用的而不报错。对数据库添加的很多字段,只添加需要索引的结构化的映射信息。

2. elasticsearch信息

使用的是一台机器上的三个elasticsearch服务端的集群,分别使用的是9301,9300,9302的tcp端口。

创建项目进行数据库的操作

1. 对数据的删除

boolean flag = bookMapper.deleteByPrimaryKey(book.getId()) > 0 ? true : false;
logger.info("删除数据返回的结果:" + flag);
//将消息放入队列中,esQueueDelete:发送消息的类型,消息内容:book.getId().toString()
if (flag) {
producer.sendMessage(esQueueDelete, book.getId().toString());
}
return flag;

2. 对数据的修改

boolean flag = bookMapper.updateByPrimaryKeySelective(book) > 0 ? true : false;
logger.info("更新数据返回的结果:" + flag);
//将消息放入队列中,esQueueUpdate:消息的类型,消息体:将更新的数据转换成json放入消息体
if (flag) {
producer.sendMessage(esQueueUpdate,JSONObject.toJSONStringWithDateFormat(book, Constant.DATETIME_FORMATTION));
}
return flag;

3. 对数据的添加

boolean flag = bookMapper.insertSelective(book) > 0 ? true : false;
logger.info("插入的数据返回的id是{}", book.getId());
/**
* 将数据插入到消息队列中。
* 如果索引创建的时候dynamic字段是false,可以直接传整对象,
* 如果不是,请自行过滤之后传入对象,否则会报错或者添加需要无用字段并被索引。
*/
if (flag) {
producer.sendMessage(esQueueCreate, JSONObject.toJSONStringWithDateFormat(book, Constant.DATETIME_FORMATTION));
}
return flag;

ES项目接受消息进行索引库操作

    /**
* 根据activeMQ消息订阅,自动将数据插入到索引库
* @param receiveMsg 监听接收到的消息
*/
@JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue")
public void autoInsertData(String receiveMsg){
if (StringUtils.isBlank(receiveMsg)){
logger.error("自动插入数据到索引库====>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_CREATE);
}
logger.info("自动插入数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
String id = dataService.insertData(receiveMsg, defaultIndex, defaultType);
logger.info("时间:{},监听消息队列:{},插入到索引库成功,id为:{}",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE), JmsConfig.QUEUE_CREATE, id);
}

需要注意的是:

   @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue"),其中:JmsConfig.QUEUE_CREATE必须是常量,表示监听的消息类型。使用的是containerFactory 是jmsListenerContainerQueue

  直接拿到receiveMsg,就是接收的消息体。

具体插入索引库代码

@Override
public String insertData(String content, String index, String type) {
JSONObject jsonObject = JSONObject.parseObject(content);
String id = jsonObject.get(primaryKey).toString();
IndexResponse indexResponse = transportClient.prepareIndex(index, type, id).setSource(jsonObject.toJSONString(), XContentType.JSON).get();
logger.info("插入数据库的数据是:{},插入索引库返回状态:{},插入的id为{}",content, indexResponse.status(), indexResponse.getId());
return indexResponse.getId();
}

删除和更新类似,直接上代码了。

    /**
* 根据activeMQ消息订阅,自动将数据插入到索引库
* @param receiveMsg 监听接收到的消息
*/
@JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue")
public void autoInsertData(String receiveMsg){
if (StringUtils.isBlank(receiveMsg)){
logger.error("自动插入数据到索引库====>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_CREATE);
}
logger.info("自动插入数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
String id = dataService.insertData(receiveMsg, defaultIndex, defaultType);
logger.info("时间:{},监听消息队列:{},插入到索引库成功,id为:{}",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE), JmsConfig.QUEUE_CREATE, id);
} /**
* 根据订阅的消息,自动从索引库删除数据
* @param receiveMsg
*/
@JmsListener(destination = JmsConfig.QUEUE_DELETE,containerFactory = "jmsListenerContainerQueue")
public void autoDeleteData(String receiveMsg){
if (StringUtils.isBlank(receiveMsg)){
logger.error("自动从索引库删除数据===>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_DELETE);
}
boolean flag = dataService.deleteData(receiveMsg, defaultIndex, defaultType);
logger.info("自动从索引库删除数据===>从消息队列中接收到的消息为:{},删除结果为:{}", receiveMsg, flag);
} /**
* 根据订阅的消息,自动从索引库更新数据
* @param receiveMsg
*/
@JmsListener(destination = JmsConfig.QUEUE_UPDATE,containerFactory = "jmsListenerContainerQueue")
public void autoUpdateData(String receiveMsg){
if (StringUtils.isBlank(receiveMsg)){
logger.error("自动从索引库更新数据===>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_DELETE);
}
logger.info("自动更新数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
boolean flag = dataService.updateData(receiveMsg, defaultIndex, defaultType);
logger.info("自动从索引库更新数据===>从消息队列中接收到的消息为:{},更新结果为:{}", receiveMsg, flag);
} @Override
public String insertData(String content, String index, String type) {
JSONObject jsonObject = JSONObject.parseObject(content);
String id = jsonObject.get(primaryKey).toString();
IndexResponse indexResponse = transportClient.prepareIndex(index, type, id).setSource(jsonObject.toJSONString(), XContentType.JSON).get();
logger.info("插入数据库的数据是:{},插入索引库返回状态:{},插入的id为{}",content, indexResponse.status(), indexResponse.getId());
return indexResponse.getId();
} @Override
public boolean deleteData(String receiveMsg, String index, String type) {
DeleteResponse deleteResponse = transportClient.prepareDelete(index, type, receiveMsg).execute().actionGet();
logger.info("删除索引的结果为:{},删除的索引数据的id为:{}", deleteResponse.status().getStatus(), deleteResponse.getId());
return deleteResponse.status().getStatus() == 200 ? true : false;
} @Override
public boolean updateData(String receiveMsg, String defaultIndex, String defaultType) {
JSONObject jsonObject = JSONObject.parseObject(receiveMsg);
String id = jsonObject.get(primaryKey).toString();
UpdateRequest updateRequest = new UpdateRequest().index(defaultIndex).type(defaultType).id(id).doc(jsonObject.toJSONString(), XContentType.JSON);
UpdateResponse updateResponse = transportClient.update(updateRequest).actionGet();
int status = updateResponse.status().getStatus();
logger.info("更新索引结果:{},更新之后为:{}", status, receiveMsg);
return status == 200 ? true : false;
}

到目前为止,已经可以进行操作数据库同步进行索引库的操作了。但是还是存在很多问题的,主要在于activeMQ的一些问题。以后在解决。

最新文章

  1. Xamarin.ios 目录结构
  2. iOS开发中获取文本的宽高的方式
  3. js 十分精确身份证验证
  4. 使用Android点击按钮跳转页面
  5. Python 之我见
  6. 【HDOJ】5632 Rikka with Array
  7. JavaScript学习总结【12】、JS AJAX应用
  8. BZOJ 1036: [ZJOI2008]树的统计Count( 树链剖分 )
  9. 从零开始用 Flask 搭建一个网站(三)
  10. Unity3D拖尾组件在Ui界面下正常显示
  11. jmeter按比例执行业务场景
  12. [UWP]了解IValueConverter
  13. MariaDB存储引擎
  14. 8.6 GOF设计模式四: 策略模式… Strategy Pattern
  15. c# 关于Threading.ApartmentState
  16. 【python】gevent协程例子
  17. 解决audio和video在手机端无法自动播放问题
  18. SCRAM
  19. Ubuntu做Tomcat服务:insserv: warning: script 'tomcat' missing LSB tags and overrides
  20. sqlyog注册码

热门文章

  1. hihoCoder 1636 Pangu and Stones
  2. Windows 环境下安装 Oracle JDK
  3. Redis Commands(1)
  4. 『cs231n』作业3问题2选讲_通过代码理解LSTM网络
  5. python-day15函数递归
  6. SQL Server数据库 优化查询速度
  7. javascript primise本质——为了简化异步编码而针对异步操作的代理
  8. POJ 2896 另解暴力
  9. 快速切题 sgu116. Index of super-prime bfs+树思想
  10. VMware Station NAT上网模式配置