一、前言

最近在做离线数据导入HBase项目,涉及将存储在Mysql中的历史数据通过bulkload的方式导入HBase。由于源数据已经不在DB中,而是以文件形式存储在机器磁盘,此文件是mysqldump导出的格式。如何将mysqldump格式的文件转换成实际的数据文件提供给bulkload作转换,是需要考虑的一个问题。

二、思路

我们知道mysqldump导出的文件主要是Insert,数据库表结构定义语句。而要解析的对象也主要是包含INSERT关键字记录,这样我们就把问题转换成如何从dmp文件解析Insert语句。接触过dmp文件的同学应该了解,其INSERT语句的结构,主要包含表名、字段名、字段值, 这里面主要包含几个关键字:INSERT INTO, VALUES。我们要做的就是把Values括号后的字段值给解析出来,这个过程需要考虑VALUES后面包含的是多少行的记录,有可能导出的记录Values后面包含多行对应mysql中存储的记录。

在解析文件过程中,我自然想到用Python来写,因为Python在处理文件方面有很多优势,也比较简单。在处理DMP文件这块,考虑到字段值间是用逗号分割的,在python中正好一个模块可以很好的来处理此类格式 ,即大家很熟悉的CSV模块,在处理CSV类型的文件有很多优势。在这里我们把CSV模块有在解析dmp文件,同时加一些解析逻辑,可以很好解决此类问题。

同时,我们要处理的dmp文件是经过压缩的,并且单个文件都比较大,都是Gigbytes的,在读取时需要注意机器内存大小,不能一次读出所有的数据,python也考虑到此类问题,采用的方法是惰性取值,即在真正使用时才从磁盘中加载相应的文件数据。如果想加块解析,还可以采集多进程或多线程的方法。

三、方法

处理流程图如下所示:

代码如下图所示:

 #!/usr/bin/env python
import fileinput
import csv
import sys
import gzip # 设定CSV读取的最大容量
csv.field_size_limit(sys.maxsize) def check_insert(line):
"""
返回语句是否以insert into开头,如果是返回true,否则返回false
"""
return line.startswith('INSERT INTO') or False def get_line_values(line):
"""
返回Insert语句中包含Values的部分
"""
return line.partition('VALUES ')[2] def check_values_style(values):
"""
保证INSERT语句满足基本的条件,即包含(右括号
""" if values and values[0] == '(':
return True
return False def parse_line(values):
"""
创建csv对象,读取INSERT VALUES 字段值
"""
latest_row = [] reader = csv.reader([values], delimiter=',',
doublequote=False,
escapechar='\\',
quotechar="'",
strict=True
) for reader_row in reader:
for column in reader_row:
# 判断字段值是否为空或为NULL
if len(column) == 0 or column == 'NULL':
latest_row.append("")
continue # 判断字段开头是否以(开头,如果是则说明此VALUES后面不只包含一行数据,可能有多行,要分别解析
if column[0] == "(":
new_row = False
if len(latest_row) > 0:
#判断行是否包含),如果包含则说明一行数据完毕
if latest_row[-1][-1] == ")":
# 移除)
latest_row[-1] = latest_row[-1][:-1]
if latest_row[-1] == "NULL":
latest_row[-1] = ""
new_row = True
# 如果是新行,则打印该行
if new_row:
line="}}}{{{".join(latest_row)
print "%s<{||}>" % line
latest_row = [] if len(latest_row) == 0:
column = column[1:] latest_row.append(column)
# 判断行结束符
if latest_row[-1][-2:] == ");":
latest_row[-1] = latest_row[-1][:-2]
if latest_row[-1] == "NULL":
latest_row[-1] = "" line="}}}{{{".join(latest_row)
print "%s<{||}>" % line def main(): filename=sys.argv[1]
try:
#惰性取行
with gzip.open(filename,"rb") as f:
for line in f:
if check_insert(line):
values = get_line_values(line)
if check_values_style(values):
parse_line(values)
except KeyboardInterrupt:
sys.exit(0) if __name__ == "__main__":
main()

四、总结

总的说来,主要是利用Python的CSV模块来解析DMP文件的INSERT语句,如果DMP文件不规整,可能还是有些问题。对于dmp文件很大情况,也是需要考虑解析时间效率问题,可以考虑增加多进程或多线程机制。

最新文章

  1. css元素水平居中和垂直居中的方式
  2. 给 C# 开发者的代码审查清单
  3. js正则表达式中test,exec,match方法的区别
  4. Node.js实现CORS跨域资源共享
  5. *HDU 2108 计算几何
  6. D - Half of and a Half 大数
  7. redis的相关知识
  8. Spring AOP术语
  9. IntelliJ IDEA 15 部署Tomcat及创建一个简单的Web工程
  10. 一步步学习ASP.NET MVC3 (10)——@Ajax,JavaScriptResult(1)
  11. 内置方法+lambda是pythonic的利器
  12. [转]IBInspectable / IBDesignable
  13. BZOJ 2783 JLOI 2012 树 乘+二分法
  14. Issue 5158: Modal dialog present (UnexpectedAlertOpen) issue in IE (Similar issue like 3360)
  15. li点击弹出序号
  16. CDH集群安装&amp;测试总结
  17. redis总结问题
  18. flask下载zip文件报错TypeError
  19. php环境搭建及入门
  20. 【转】iOS学习之iOS禁止Touch事件

热门文章

  1. PCA主成分分析 R语言
  2. js 字符串格式化
  3. java 锁的分类
  4. Cookie 基本操作
  5. 样式缩写&mdash;&mdash;css技巧(一)
  6. webrtc前景如何
  7. HDU 1251 统计难题 (裸的字典树)
  8. VMware虚拟机 安装centos7并设置静态ip 连接外网
  9. css3兼容性问题归纳
  10. CCN与CDN区别