利用logstash从mysql同步数据到ElasticSearch
2024-10-19 11:45:48
前面一篇已经把logstash和logstash-input-jdbc安装好了。
下面就说下具体怎么配置。
1.先在安装目录bin下面(一般都是在bin下面)新建两个文件jdbc.conf和jdbc.sql
2.配置jdbc.conf
input {
stdin {
}
jdbc {
# 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "123456"
# 下载连接数据库的驱动包,建议使用绝对地址
jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar" jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8"} #使用其它字段追踪,而不是用时间
#use_column_value => true //这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true, 切记切记切记,我是用update_time来追踪的
#追踪的字段
tracking_column => update_time
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这里说是必须指定初始值,我没指定默认是1970-01-01 08:00:00
last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" //这里的lastrun文件夹和.logstash_jdbc_last_run是自己创建的 jdbc_default_timezone => "Asia/Shanghai" //设置时区
#statement => SELECT * FROM goods WHERE update_time > :last_sql_value //这里要说明一下如果直接写sql语句,前面这种写法肯定不对的
27 ,加上引号也试过也不对,所以我直接写在jdbc.sql文件中
statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql" #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
schedule => "* * * * *"
type => "std"
}
} filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # 要导入到的Elasticsearch所在的主机 hosts => "127.0.0.1:9200" # 要导入到的Elasticsearch的索引的名称 index => "goods" # 类型名称(类似数据库表名) document_type => "spu" # 主键名称(类似数据库主键) document_id => "%{id}"
} stdout { # JSON格式输出 codec => json_lines }
}
3.配置jdbc.sql
select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value
4.我们来看下 .logstash_jdbc_last_run文件中的内容(网上讲述该配置的时候都没讲到里面具体的内容写法,导致很多人很迷惑,其中我就是)
前面的---具体什么意思,我也不太清楚。
5.启动jdbc.conf配置,开始同步数据
第一次:因为时间是从1970年开始的所以会全部同步一遍。相当于全量同步了
从第二次开始,会从上次最新的一次时间同步,既新增和修改都会同步
遇到的问题:
1.ES中8小时时差的问题?
解决方法:从源头解决问题
在jdbc.conf配置文件中只要是有关时间的字段都手动+8小时
39 filter {
40 json {
41 source => "message"
42 remove_field => ["message"]
43 }
44 // date类型不能省略,不然会报错, 就是把当前字段+8小时后赋值给新的字段,然后再取新字段的值赋值给老的字段,再把新的字段删除
45 date {
46 match => ["message","UNIX_MS"]
47 target => "@timestamp"
48 }
49 ruby {
50 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
51 }
52 ruby{
53 code => "event.set('@timestamp',event.get('timestamp'))"
54 }
55 mutate{
56 remove_field => ["timestamp"]
57 }
58
59 date {
60 match => ["message","UNIX_MS"]
61 target => "create_time"
62 }
63 ruby {
64 code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
65 }
66 ruby {
67 code => "event.set('create_time',event.get('@create_time'))"
68 }
69 mutate {
70 remove_field => ["@create_time"]
71 }
72
73 date {
74 match => ["message","UNIX_MS"]
75 target => "update_time"
76 }
77 ruby {
78 code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
79 }
80 ruby {
81 code => "event.set('update_time',event.get('@update_time'))"
82 }
83 mutate {
84 remove_field => ["@update_time"]
85 }
86 }
总结:主要是配置,有什么问题,先检查配置文件。
最新文章
- 1Z0-053 争议题目解析577
- HTML5全局属性和事件详解
- C++软件添加dump调试打印日志
- C# 排序算法记录
- IIS网站部署错误总结
- Eclipse Android源代码新下载方法及关联
- Sql Server 查看所有存储过程或视图的位置及内容
- hdu4635(强连通缩点)
- Oracle得知(十五):分布式数据库
- 傲梅分区助手专业版 v6.2 中文免费版
- 云信推送通知 APN invalid Token
- sql decimal &; float &; celling 介绍
- ASP.NET Web服务(ASMX)学习和代理生成
- 在dropwizard中使用feign,使用hystrix
- Javascript 实现[网红] 时间轮盘
- python3 installed 安装 pip3
- 第一次打开app
- oracle执行计划相关
- LOJ#2170. 「POI2011」木棍 Sticks
- Java网络编程一:基础知识详解
热门文章
- 自学Java第三个星期的总结
- 在centos上搭建JavaWeb环境(jdk+mysql+tomcat)
- QT开发基础教程
- twiested 及其他轮子
- The POM for XXX is invalid, transitive dependencies (if any) will not be available解决方案
- TCP 的那些事儿(上)(转)
- opencv学习之路(9)、对比度亮度调整与通道分离
- fread和fseek的用法
- topcoder srm 687 div1
- Python3基础 dict setdefault 根据键查找值,找不到键会添加