elasticsearch bulk
2024-09-01 23:43:30
情景介绍
公司2000W的数据从mysql 迁移至elasticsearch,以提供微服务。本文基于elasticsearch-py bulk操作实现数据迁移。相比于elasticsearch-dump,自由度更大,并能够进行数据处理。
API 原理
让我们先来看一下官方文档给出的栗子
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
我们可以通过kibana试验一下
elasticsearch-py
elasticsearch-py 官方文档
这里实际上我使用的是es-py的接口,栗子如下
def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"_type": "document",
"doc": {"word": word}, # field1: "value1"
}
bulk(es, gendata())
实际操作
涉及到数据读取,以及批量的大小。一般建议是1000-5000个文档,如果你的文档很大,可以适当减少队列,大小建议是5-15MB,默认不能超过100M
import re
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql
es = Elasticsearch()
conn = pymysql.connect('127.0.0.1',"root","root","literature",charset='utf8')
def read(conn,tableName):
cursor = conn.cursor()
sql = "show columns from {};".format(tableName)
cursor.execute(sql)
columns = [i[0] for i in cursor.fetchall()]
select = "select * from {};".format(tableName)
nums = cursor.execute(select)
for i in range(nums):
yield {k:v for k,v in zip(columns,cursor.fetchone())}
def bulk_insert(d):
actions = []
for i in d:
_id = i.get('id')
# 数据处理逻辑
i['autor'] = i.get('autor').split(',')
i['artkeyword'] = re.sub(r'[\[\]\d]',"",str(i.get('artkeyword',""))).strip(';').split(';')
i['dateofpublication'] = i.get('dateofpublication').strftime('%Y-%m-%d') # 注意需要将datetime格式转换成字符串类型
i['dateofsummery'] = i.get('dateofsummery').strftime('%Y-%m-%d %H:%M:%S') # 注意需要将datetime格式转换成字符串类型
#
action = {
"_index":"literature",
"_type":"_doc",
"_id":_id,
}
action.update(i)
actions.append(action)
if len(actions) == 500:
helpers.bulk(es,actions)
actions = []
if (len(actions) > 0):
helpers.bulk(es, actions)
if __name__ == "__main__":
d = read(conn,"literature_info")
bulk_insert(d)
conn.close()
最新文章
- Redis五种数据结构简介
- IE9下css hack写法
- css3 tween
- Laxcus大数据管理系统单机集群版
- 一、spring——helloWorld
- 3.5 EF Code First总结
- 单一职责原则(SRP)
- PHP持续保有长连接,利用flush持续更新浏览器UI,下载进度条实现
- Oracle 插入超4000字节的CLOB字段的处理方法
- SetupFactory +添加frm2.0工具
- poj 1386 Play on Words(有向图欧拉路+并查集)
- C++类与对象(05)
- 【BZOJ3436】小K的农场(差分约束)
- memcached 详解
- python安装过程中的一些问题
- Hystrix 配置参数全解析
- C语言 · 8皇后问题
- [Mac]如何让两个窗口各占半个屏幕
- 关于引用外部类要用static 的问题
- Java基础(2)面向对象和封装,对象的创建和使用、java对象的内存图