情景介绍

公司2000W的数据从mysql 迁移至elasticsearch,以提供微服务。本文基于elasticsearch-py bulk操作实现数据迁移。相比于elasticsearch-dump,自由度更大,并能够进行数据处理。

API 原理

让我们先来看一下官方文档给出的栗子

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

我们可以通过kibana试验一下

elasticsearch-py

elasticsearch-py 官方文档

这里实际上我使用的是es-py的接口,栗子如下

def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"_type": "document",
"doc": {"word": word}, # field1: "value1"
} bulk(es, gendata())

实际操作

涉及到数据读取,以及批量的大小。一般建议是1000-5000个文档,如果你的文档很大,可以适当减少队列,大小建议是5-15MB,默认不能超过100M

import re

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql es = Elasticsearch()
conn = pymysql.connect('127.0.0.1',"root","root","literature",charset='utf8') def read(conn,tableName):
cursor = conn.cursor()
sql = "show columns from {};".format(tableName)
cursor.execute(sql)
columns = [i[0] for i in cursor.fetchall()] select = "select * from {};".format(tableName)
nums = cursor.execute(select)
for i in range(nums):
yield {k:v for k,v in zip(columns,cursor.fetchone())} def bulk_insert(d):
actions = []
for i in d:
_id = i.get('id')
# 数据处理逻辑
i['autor'] = i.get('autor').split(',')
i['artkeyword'] = re.sub(r'[\[\]\d]',"",str(i.get('artkeyword',""))).strip(';').split(';')
i['dateofpublication'] = i.get('dateofpublication').strftime('%Y-%m-%d') # 注意需要将datetime格式转换成字符串类型
i['dateofsummery'] = i.get('dateofsummery').strftime('%Y-%m-%d %H:%M:%S') # 注意需要将datetime格式转换成字符串类型
#
action = {
"_index":"literature",
"_type":"_doc",
"_id":_id,
}
action.update(i)
actions.append(action)
if len(actions) == 500:
helpers.bulk(es,actions)
actions = []
if (len(actions) > 0):
helpers.bulk(es, actions) if __name__ == "__main__":
d = read(conn,"literature_info")
bulk_insert(d)
conn.close()

最新文章

  1. Redis五种数据结构简介
  2. IE9下css hack写法
  3. css3 tween
  4. Laxcus大数据管理系统单机集群版
  5. 一、spring——helloWorld
  6. 3.5 EF Code First总结
  7. 单一职责原则(SRP)
  8. PHP持续保有长连接,利用flush持续更新浏览器UI,下载进度条实现
  9. Oracle 插入超4000字节的CLOB字段的处理方法
  10. SetupFactory +添加frm2.0工具
  11. poj 1386 Play on Words(有向图欧拉路+并查集)
  12. C++类与对象(05)
  13. 【BZOJ3436】小K的农场(差分约束)
  14. memcached 详解
  15. python安装过程中的一些问题
  16. Hystrix 配置参数全解析
  17. C语言 · 8皇后问题
  18. [Mac]如何让两个窗口各占半个屏幕
  19. 关于引用外部类要用static 的问题
  20. Java基础(2)面向对象和封装,对象的创建和使用、java对象的内存图

热门文章

  1. Java 静态工厂模式的使用
  2. css伪类实现行号自动填充
  3. MySQL通过SHOW TABLE STATUS查看库中所有表的具体信息
  4. python的列表元素输出
  5. G1 垃圾收集器架构和如何做到可预测的停顿(阿里)
  6. 爬虫 crawlSpider 分布式 增量式 提高效率
  7. Python3面向对象编程实例
  8. Luogu P5416 [CTSC2016]时空旅行
  9. 基于web公交查询系统----管理员公交站点管理页面实现
  10. Loj #2568. 「APIO2016」烟花表演