1.背景

1.1 简介

Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助业务做解析,丰富,转换和缓冲来自各种来源的数据。

Logstash 是一个数据流引擎

  • 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
  • 在几分钟内建立数据流管道
  • 具有水平可扩展及韧性且具有自适应缓冲
  • 不可知的数据源
  • 具有 200 多个集成和处理器的插件生态系统
  • 使用 Elastic Stack 监视和管理部署

Logstash 几乎可以摄入各种类别的数据

它可以摄入日志,文件,指标或者网路真实数据。经过 Logstash 的处理,变为可以使用的 Web Apps 可以消耗的数据,也可以存储于数据中心,或变为其它的流式数据。

Logstash 相关概念

  • Logstash 实例是一个正在运行的 Logstash 进程。建议在 Elasticsearch 的单独主机上运行 Logstash,以确保两个组件有足够的计算资源可用。
  • 管道(pipeline)是配置为处理给定工作负载的插件集合。一个 Logstash 实例可以运行多个管道。(彼此独立)
  • 输入插件(input plugins)用于从给定的源系统中提取或接收数据。 Logstash 参考指南中提供了支持的输入插件列表:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
  • 过滤器插件(filter plugin)用于对传入事件应用转换和丰富。 Logstash 参考指南中提供了支持的过滤器插件列表:Filter plugins | Logstash Reference [8.3] | Elastic
  • 输出插件(output plugin)用于将数据加载或发送到给定的目标系统。 Logstash 参考指南中提供了支持的输出插件列表:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
Logstash 包含3个主要部分: 输入(inputs),过滤器(filters)和输出(outputs)。 你必须定义这些过程的配置才能使用 Logstash,尽管不是每一个都必须的。在有些情况下,可以甚至没有过滤器。在过滤器的部分,它可以对数据源的数据进行分析,丰富,处理等。

1.2 学习参考

1.3 本例测试版本

[root@dev1613 study]# sudo -u logstash ../bin/logstash  --version
Using bundled JDK: /opt/logstash/jdk
logstash 7.12.1

2.功能应用

2.1 基础测试

输入测试命令,../bin为当前执行命令所在文件夹,与logstash安装后bin的相对目录位置。
sudo -u logstash ../bin/logstash -e 'input { stdin { } } output { stdout {} }'
执行命令后,输出结果如图:

2.2 Logstash解析日志文件

最原始的 Log 数据,经过 Logstash 的处理,可以把非结构化的数据变成结构化的数据。甚至可以使用 Logstash 强大的 Filter 来对数据继续加工。最终将加工后的数据存储下来,用于分析和搜索。

日志原始内容

2022-07-06 18:48:37.453 ERROR 14677 --- [ dispatcher 108] c.a.c.s.dashboard.metric.MetricFetcher   : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104506000&endTime=1657104512000&refetch=false>: socket timeout
2022-07-06 18:48:44.439 ERROR 14677 --- [ dispatcher 109] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104513000&endTime=1657104519000&refetch=false>: socket timeout
2022-07-06 18:48:51.514 ERROR 14677 --- [ dispatcher 110] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104520000&endTime=1657104526000&refetch=false>: socket timeout

Logstash配置文件

编写日志解析配置文件,并解析时间,错误级别,错误行,错误信息。提取出来变为结构化数据。编写配置文件如下:
配置相关节点参考官方文档:《plugins-inputs-file》
input {
file {
path => "/opt/logstash/study/outlog.log"
start_position => "beginning"
stat_interval => "3"
type => "sentinel-log"
}
} filter {
grok {
match => ["message","%{TIMESTAMP_ISO8601:datetime} %{LOGLEVEL:loglevel} %{NUMBER:textid} %{GREEDYDATA:errormsg}"]
}
json {
source => "request"
}
}
output {
stdout { codec => rubydebug }
}

Grok日志解析在线测试

基于elastic在线网页,可编写解析日志测试demo。

日志解析结构化输出

运行命令:sudo -u logstash ../bin/logstash -f study-file-es.conf
运行logstash加载配置文件命令,启动测试输出结构化内容如下:

2.3 Logstash-数据库同步

本例将MySql数据表中的数据,基于修改时间同步到es数据存储中心。

基础数据内容

数据源-mysql数据表建表语句:
CREATE TABLE `study_logstash_es` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`study_code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '编码',
`study_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名称',
`study_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '标签',
`study_level` smallint NOT NULL DEFAULT '0' COMMENT '等级,如1,2,3',
`is_delete` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0 未删除 1 删除',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`operate_user` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作人',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_study_code` (`study_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='学习-logstash同步msql数据到es';
目标源-es索引创建脚本:
PUT /study_logstash_es
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}, "mappings": {
"properties": {
"id": {
"type": "integer"
},
"study_code": {
"type": "text"
},
"study_name": {
"type": "text"
},
"operate_user": {
"type": "text"
},
"study_tag": {
"type": "keyword"
},
"is_delete": {
"type": "integer"
},
"study_level": {
"type": "integer"
},
"mark_time": {
"type": "date",
"format": "epoch_millis"
},
"update_time": {
"type": "date"
}
}
}
}

Logstash配置文件

本例测试的数据库地址,es地址,已经基于xxx脱敏。更多jdbc的配置,请参考官方文档:《plugins-inputs-jdbc》
jdbc_driver_library:为mysql连接包,可在Maven上下载,下载地址参考:《mysql-connector-java.jar 包下载》
input {
jdbc {
jdbc_driver_library => "/opt/logstash/study/mysql-connector-java-8.0.30.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://xxx.xxx.xx.x:3306/study_database?serverTimezone=Asia/Shanghai&allowMultiQueries=true&characterEncoding=utf-8"
jdbc_user => "root"
jdbc_password => "xxxxx"
jdbc_paging_enabled => true
jdbc_page_size => "2"
use_column_value => true
tracking_column => "mark_time"
tracking_column_type => "numeric"
schedule => "* * * * *"
statement => "SELECT id,study_code,study_name,study_tag,study_level,operate_user,update_time,UNIX_TIMESTAMP(update_time) as mark_time from study_logstash_es where UNIX_TIMESTAMP(update_time)>:sql_last_value AND update_time < NOW()"
}
}
output{
elasticsearch{
hosts => ["xxx.xxx.16.4:9200","xxx.xxx.16.xx:9200","192.xxx.xx.xx:9200"]
index => "study_logstash_es"
timeout => 300
user => "xxx"
password => "xxxxx"
}
}

数据同步es

运行命令:sudo -u logstash ../bin/logstash -f study-mysql-es.conf
运行logstash加载配置文件命令,启动运行日志,es同步的数据如下:
es数据查询如下:

2.4 Logstash-kafka消息同步

Logstash的输入项可以监听kafka消息,消费消息记录。
input {
kafka {
bootstrap_servers => "xxx.xxx.xx.4:9092,xxx.xxx.16.4:9093,xxx.xxx.16.4:9094" #kafka服务器地址
topics => "xxxlog"
# batch_size => 5
codec => "json"
group_id => "logstash"
consumer_threads => 3
}
} filter {
# 丢弃所有的header请求
if [request][method] == "HEAD" {
drop { }
}
# 因为[request][querystring]这个玩意中的字段类型可能不一样,所以全部干成字符串
ruby {
code => "event.set('[request][querystring]', event.get('[request][querystring]').to_s) if event.get('[request][querystring]')"
}
if [request][uri] =~ "^/ucenter-admin-view/v3(.*)" {
mutate {
add_field => { "log_source" => "用户中心管理后台" }
add_field => { "log_source_id" => "1" }
}
}
else if [request][uri] =~ "^/ucenter-org-view/v3/(.*)" {
mutate {
add_field => { "log_source" => "用户中心工作台" }
add_field => { "log_source_id" => "2" }
}
}
else if [request][uri] =~ "^/safety-admin-api(.*)" {
mutate {
add_field => { "log_source" => "安全管理平台" }
add_field => { "log_source_id" => "3" }
}
}
else{
mutate {
add_field => { "log_source" => "其他" }
add_field => { "log_source_id" => "0" }
}
} grok {
match => { "[request][uri]" => "%{URIPATH:[request][path]}" }
named_captures_only => false
} }
output{
# stdout {
# codec => json
# }
elasticsearch{
hosts => ["xxx.xxx.xx.4:9200","xxx.xxx.16.13:9200","xxx.xxx.16.14:9200"]
index => "apisixlog"
timeout => 300
user => "elastic"
password => "HApn2xCJMuRlg0UOIV0P"
}

3.总结

  1. Logstash基于 输入(inputs),过滤器(filters)和输出(outputs)可以方便快捷的处理数据,将一些非结构化数据,处理为结构化数据。Logstash支持数据中转,数据同步等场景的应用。本例只是简要测试,在实际业务使用时,可基于某一个输入插件/输出插件参考官方文档,结合项目使用。
  1. 在做一些数据同步工作时,出于性能等各方面考虑,如同步数据表到es中,除了Logstash这种方案,也可以参考其他的方案,如alibaba/DataX
  1. Logstash收集大量日志时,存在耗内存的情况,建议参考官方推荐的FileBeat模式。详情参考文档:《开源日志管理方案 ELK 和 EFK 的区别》,《通过Filebeat把日志传入到Elasticsearch》
  1. Logstash在配置文件调整后,启动命令,可能出现如下报错:
删除掉Logstash/data文件下的缓存文件,即可重新启动成功。
  1. Logstash启动命名常用如下:
sudo -u logstash ../bin/logstash -f study-file-es.conf
表示当前窗口启动,关闭或退出命令行时,logstash实例关闭。 sudo -u logstash ../bin/logstash -f study-file-es.conf --config.reload.automatic
表示当前窗口启动,配置文件变化时,不用重新启动实例,可自动加载。关闭或退出命令行时,logstash实例关闭。 sudo -u logstash ../bin/logstash -f study-mysql-es.conf & test.out --config.reload.automatic
表示后台启动,关闭退出命令,实例在后台一直运行。 ps -ef|grep logstash
kill-9 进程号, 关闭对应的实例
  1. Logstash运行日志查看
查看cat logstash-plain.log 文件,可查看Logstash运行日志记录。

最新文章

  1. java学习笔记之IO一()
  2. FreeBSD暂时用9.X系列为宜
  3. Angularjs checkbox的ng属性
  4. javascript学习笔记(1) 简单html语法
  5. agentX各个角色功能
  6. MySQL常用命令总结2
  7. 手写ButterKnife
  8. hadoop is running beyond virtual memory limits问题解决
  9. SignarL服务器端发送消息给客户端的几种情况
  10. coTurn 运行在Windows平台的方法及服务与客户端运行交互流程和原理
  11. 为github添加ssh key
  12. gauss——seidel迭代
  13. 关于centos7字体缺失导致项目验证码丢失报错500问题
  14. VS2012里面使用EF框架的增删改查和分页的方法
  15. Centos7下yum安装配置nginx与php
  16. 解决excel日期变成数字的问题
  17. [jmeter]linux下自动测试环境+持续集成ant+jmeter+Apache(httpd)环境搭建与使用
  18. 无法获得锁 /var/lib/apt/lists/lock - open (11 资源临时不可用)
  19. FastReport.Net使用:[22]地图(Map)控件
  20. GPU编程自学4 —— CUDA核函数运行参数

热门文章

  1. Node.js amqplib 连接 Rabbitmq 学习笔记
  2. 7.脚本三剑客之awk
  3. BUUCTF-另一个世界
  4. js中通过ajax调用网上接口
  5. Excel表函数自动生成SQL
  6. android stdio开发抖音自动点赞案例
  7. Java 常用Set集合和常用Map集合
  8. postgres备份与恢复
  9. Linux串口编程进阶
  10. python开发环境配置(Windows)