前面一篇已经把logstash和logstash-input-jdbc安装好了。

下面就说下具体怎么配置。

1.先在安装目录bin下面(一般都是在bin下面)新建两个文件jdbc.conf和jdbc.sql

2.配置jdbc.conf

 input {
stdin {
}
jdbc {
# 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "123456"
# 下载连接数据库的驱动包,建议使用绝对地址
jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar" jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8"} #使用其它字段追踪,而不是用时间
#use_column_value => true //这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true, 切记切记切记,我是用update_time来追踪的
#追踪的字段
tracking_column => update_time
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这里说是必须指定初始值,我没指定默认是1970-01-01 08:00:00
last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" //这里的lastrun文件夹和.logstash_jdbc_last_run是自己创建的 jdbc_default_timezone => "Asia/Shanghai" //设置时区
#statement => SELECT * FROM goods WHERE update_time > :last_sql_value //这里要说明一下如果直接写sql语句,前面这种写法肯定不对的
27                                                 ,加上引号也试过也不对,所以我直接写在jdbc.sql文件中
statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql" #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
schedule => "* * * * *"
type => "std"
}
} filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # 要导入到的Elasticsearch所在的主机 hosts => "127.0.0.1:9200" # 要导入到的Elasticsearch的索引的名称 index => "goods" # 类型名称(类似数据库表名) document_type => "spu" # 主键名称(类似数据库主键) document_id => "%{id}"
} stdout { # JSON格式输出 codec => json_lines }
}

3.配置jdbc.sql

 select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value

4.我们来看下 .logstash_jdbc_last_run文件中的内容(网上讲述该配置的时候都没讲到里面具体的内容写法,导致很多人很迷惑,其中我就是)

前面的---具体什么意思,我也不太清楚。

5.启动jdbc.conf配置,开始同步数据

第一次:因为时间是从1970年开始的所以会全部同步一遍。相当于全量同步了

从第二次开始,会从上次最新的一次时间同步,既新增和修改都会同步

遇到的问题:

1.ES中8小时时差的问题?

解决方法:从源头解决问题

在jdbc.conf配置文件中只要是有关时间的字段都手动+8小时

 39 filter {
40 json {
41 source => "message"
42 remove_field => ["message"]
43 }
44    // date类型不能省略,不然会报错, 就是把当前字段+8小时后赋值给新的字段,然后再取新字段的值赋值给老的字段,再把新的字段删除
45 date {
46 match => ["message","UNIX_MS"]
47 target => "@timestamp"
48 }
49 ruby {
50 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
51 }
52 ruby{
53 code => "event.set('@timestamp',event.get('timestamp'))"
54 }
55 mutate{
56 remove_field => ["timestamp"]
57 }
58
59 date {
60 match => ["message","UNIX_MS"]
61 target => "create_time"
62 }
63 ruby {
64 code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
65 }
66 ruby {
67 code => "event.set('create_time',event.get('@create_time'))"
68 }
69 mutate {
70 remove_field => ["@create_time"]
71 }
72
73 date {
74 match => ["message","UNIX_MS"]
75 target => "update_time"
76 }
77 ruby {
78 code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
79 }
80 ruby {
81 code => "event.set('update_time',event.get('@update_time'))"
82 }
83 mutate {
84 remove_field => ["@update_time"]
85 }
86 }

总结:主要是配置,有什么问题,先检查配置文件。

最新文章

  1. 1Z0-053 争议题目解析577
  2. HTML5全局属性和事件详解
  3. C++软件添加dump调试打印日志
  4. C# 排序算法记录
  5. IIS网站部署错误总结
  6. Eclipse Android源代码新下载方法及关联
  7. Sql Server 查看所有存储过程或视图的位置及内容
  8. hdu4635(强连通缩点)
  9. Oracle得知(十五):分布式数据库
  10. 傲梅分区助手专业版 v6.2 中文免费版
  11. 云信推送通知 APN invalid Token
  12. sql decimal & float & celling 介绍
  13. ASP.NET Web服务(ASMX)学习和代理生成
  14. 在dropwizard中使用feign,使用hystrix
  15. Javascript 实现[网红] 时间轮盘
  16. python3 installed 安装 pip3
  17. 第一次打开app
  18. oracle执行计划相关
  19. LOJ#2170. 「POI2011」木棍 Sticks
  20. Java网络编程一:基础知识详解

热门文章

  1. 自学Java第三个星期的总结
  2. 在centos上搭建JavaWeb环境(jdk+mysql+tomcat)
  3. QT开发基础教程
  4. twiested 及其他轮子
  5. The POM for XXX is invalid, transitive dependencies (if any) will not be available解决方案
  6. TCP 的那些事儿(上)(转)
  7. opencv学习之路(9)、对比度亮度调整与通道分离
  8. fread和fseek的用法
  9. topcoder srm 687 div1
  10. Python3基础 dict setdefault 根据键查找值,找不到键会添加