Grok正则提取日志

环境延续我上一篇ELK单机版的filebeat-->redis-->logstash-->elasticsearch-->kibana环境,详情请参考:

Elasticsearch + Logstash + Kibana +Redis +Filebeat 单机版日志收集环境搭建

正则表达式

普通正则表达式

. 任意一个字符

* 前面一个字符出现0次或者多次

[abc] 中括号内任意一个字符

[^abc] 非中括号内的字符

[0-9] 表示一个数字

[a-z]   小写字母

[A-Z] 大写字母

[a-zA-Z] 所有字母

[a-zA-Z0-9] 所有字母+数字

[^0-9] 非数字

^xx 以xx开头

xx$ 以xx结尾

\d 任何一个数字

\s 任何一个空白字符

扩展正则表达式,在普通正则符号再进行了扩展

? 前面字符出现0或者1次

+ 前面字符出现1或者多次

{n} 前面字符匹配n次

{a,b} 前面字符匹配a到b次

{,b} 前面字符匹配0次到b次

{a,} 前面字符匹配a或a+次

(string1|string2) string1或string2

在Kibana的grokdebugger上进行测试

在编写grok提取正则配置前可以在Kibana的grokdebugger上进行测试:

比如我想提取一个如下的Nginx日志:

192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"

那么可以按照正则表达式和grok语法在grokdebugger进行如下测试:

可以看到我的Grok成功提取了我想要的内容,我的Grok匹配规则如下:

(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"

(?<字段名>正则)表示将匹配的内容提取为字段,其他不用提取为字段的地方原样写上或者用正则匹配即可。

在配置文件中引入Grok提取规则

vim ./logstash_grok.conf
# logstash_grok.conf内容
input {
   redis {
        host => '192.168.1.4'
        port => 6379
        key => "queue"
        data_type => "list"
  }
}
filter {
    grok {
        match => {
            "message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
        }
    }
}
output {
  elasticsearch {
    hosts => ["http://192.168.1.4:9200"]
  }
}

启动logstah:

/opt/es/logstash-7.2.0/bin/logstash -f ./logstash_grok.conf 

测试是否成功:

echo '192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >> example.log

在Kinaba中配置index,方便查看:

点击Management-->Kibana-->Index patterns会出现如下提示:

点击Create Index Pattern按照如下步骤创建新的Kibana索引:

输入索引匹配规则,如logstash*会匹配所有logstash开头的ES索引数据,然后点击下一步选择时间字段用于按时间戳筛选数据:

选择@timestamp将会根据@timestamp字段的时间过滤数据,点击Create index pattern即可在Discover页面选择你创建的Kibana索引来查看数据:

可以看到我配置的Kibana起作用了。这里要注意的是Kibana采用的是UTC时间,比东八区时间要快8个小时,所以在筛选时间范围的时候我将范围往后扩大了一点,否则可能看不到数据。

展开最上方的记录可以看到我刚才插入的数据被成功提取了:

去除字段

通过上面的图可以看到Grok配置的确起作用了,但是Logstash给我们添加了很多字段,有时候这些字段是不必要的,这时候就要可以使用remove_field配置来去除一些字段。

修改配置文件如下:

# logstash_grok.conf内容
input {
redis {
host => '192.168.1.4'
port => 6379
key => "queue"
data_type => "list"
}
}
filter {
grok {
match => {
"message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
}
remove_field => ["message","@version","path"]
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
}
}

重启重复上述测试,数秒后查看Kibana:

Logstash确实为我们去除了我们不需要的字段。

使用日志时间而非插入时间

由于日志输出和消息队列同步以及ELK中的数据传输都是需要时间的,使用ES自动生成的时间戳可能和日志产生的时间并不一致,而且由于时区问题会产生更大的影响,所以需要自定义时间字段,而非使用自动生成的时间字段。

更改配置文件如下:

# logstash_grok.conf内容
input {
redis {
host => '192.168.1.4'
port => 6379
key => "queue"
data_type => "list"
}
}
filter {
grok {
match => {
"message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
}
remove_field => ["message","@version","path"]
}
date {
        match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
    }
} output {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
}
}

重复上述测试,扩大时间范围只2018年某月可以看到有一条2019年2月的数据:

提取json格式的数据

Grok对于提取非结构化的数据是很方便的,但是对于json格式的数据如果还用Grok来提取未免也太麻烦了点,毕竟采用json这种半结构化数据来输出日志本来就是为了方便处理。还好Logstash早就考虑到了这点,并提供了json格式数据的提取规则。

修改配置文件以提取json数据:

# logstash_json.conf
input {
redis {
host => '192.168.1.4'
port => 6379
key => "queue"
data_type => "list"
}
}
filter {
json {
source => "message"
remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
}
}

测试是否配置成功:

以logstash_json.conf启动Logstash:

 /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_json.conf

向example.log追加一条json数据:

echo '{"@timestamp":"24/Feb/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log

数秒后在kibana查看数据:

可以看到我们的配置是生效的。

filebeat监视多个日志

修改filebeat配置文件:

vim ./filebeat-7.2.0-linux-x86_64/filebeat.yml
# filebeat.yml内容
filebeat.inputs:
- type: log
tail_files: true
backoff: "1s"
paths:
- /opt/es/example.log
fields:
type: example
fields_under_root: true
- type: log
tail_files: true
backoff: "1s"
paths:
- /opt/es/example2.log
fields:
type: example2
fields_under_root: true
output:
redis:
hosts: ["192.168.1.4:6379"]
key: 'queue'

修改logstash配置文件:

vim ./logstash-7.2.0/config/logstash_muti.conf
# logstash_muti.conf内容
input {
redis {
host => '192.168.1.4'
port => 6379
key => "queue"
data_type => "list"
}
}
filter {
if [type] == "example" {
json {
source => "message"
remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}else if [type] == "example2"{
grok {
match => {
"message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<timestamp>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
}
remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
}
}

以新的配置启动filebeat和logstash:

# 以logstash_muti.conf启动logstash
/opt/es/logstash-7.2.0/bin/logstash -f ./logstash_muti.conf
# 以filebeat.yml启动filebeat
./filebeat-7.2.0-linux-x86_64/filebeat -e -c ./filebeat-7.2.0-linux-x86_64/filebeat.yml

新建一个example2.log文件,路径要和filebeat配置的路径一致,然后在终端执行以下命令进行测试:

# 测试example.log收集情况
echo '{"timestamp":"11/Apr/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log
# 测试example2.log收集情况
echo '192.168.237.1 - - [24/Apr/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >>example2.log

数秒后查看kibana数据:

example.log输入产生的数据:

example2.log测试产生的数据:

可以看到我们的配置是成功的。

这里我们把两个日志都输出到了同一个索引下,生产中一般都会加以区分的,一般是每个日志对应一个索引,要实现这种效果可以在logstash的配置文件的output中进行如下配置:

output{
if [type] == "example" {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
index => "example-%{+YYYY.MM.dd}"
}
}
else if [type] == "example2" {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
index => "example2-%{+YYYY.MM.dd}"
}
}

最新文章

  1. JavaScript String对象
  2. 处理Assetbundle依赖关系时想到的一道题
  3. Hive 分组问题
  4. 循序渐进 Jprofiler
  5. C输入输出函数与缓冲区
  6. 21Spring_JdbcTemplatem模板工具类的使用——配置文件(连接三种数据库连接池)
  7. 基于.NET平台常用的框架和开源程序整理
  8. 怎么用visual studio2010编写c++程序|用visual studio2010编写c++程序的步骤
  9. drupal 连表查询+分页
  10. 转载:MAT Memory Analyzer Tool使用示例
  11. Hibernate学习笔记二
  12. UNIX环境高级编程——信号
  13. TEAMWORK1
  14. RabbitMQ 学习专栏
  15. Python Web简单加法器的实现--Python
  16. PAT 1016 部分A+B(15)(C++&JAVA&&Python)
  17. 第10月第6天 lua 闭包
  18. 2015.7.11js-10(无缝滚动)
  19. 不用快捷键就能使用Eclipse的自动完成功能
  20. esxcli software vib 命令为 ESXi 5.x/6.x 主机安装补丁程序 (2008939)

热门文章

  1. 201871010203-陈鹏昱 实验三 结对项目—《D{0-1}KP 实例数据集算法实验平台》项目报告
  2. 2020 OO 第四单元总结 UML
  3. Hibernate(十四篇)
  4. Day03_17_数组
  5. 分解uber依赖注入库dig-使用篇
  6. Python爬虫之使用正则表达式抓取数据
  7. 栈(Stack) --- C# 自定义和微软官方的区别
  8. Mysql连接查询示例语句
  9. ERROR: Failed to Setup IP tables: Unable to enable SKIP DNAT rule
  10. 【Redis】redis异步消息队列+Spring自定义注解+AOP方式实现系统日志持久化