filebeat直连elasticsearch利用pipeline提取message中的字段
2024-10-02 03:21:50
这里使用filebeat直连elasticsearch的形式完成数据传输,由于没有logstash,所有对于原始数据的过滤略显尴尬(logstash的filter非常强大)。 但是由于业务需求,还是需要将message(原始数据)中的某些字段进行提取,具体方式如下:
1. /path/目录下建立pipeline.json文件
{
"description" : "test-pipeline",
"processors" : [
{
"grok" :{
"field" : "message",
"patterns" : ["%{DATA:puid}\\\t%{DATA:datatime}\\\t\\\t%{DATA:content}"]
}
}
]
}
2. 将规则上传至elasticsearch中
curl -H "Content-Type: application/json" -XPUT 'http://localhost:9200/_ingest/pipeline/test-pipeline' -d@/path/pipeline.json
3. filebeat.yml中
filebeat.prospectors:
******
******
output.elasticsearch:
hosts: ["localhost:9200"]
# 加入如下行:
pipeline: "test-pipeline"
4. 测试数据
f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 send packet! opcode:3 message is in lua8282
f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 PacketManager::_onReceivedPacket opcode:3 size:27,rec_len:278282
5. elasticsearch中数据结果
[
{
"_score":1,
"_type":"doc",
"_id":"zWmLj2kB7ah0Pw2MmQGw",
"_source":{
"datatime":"2019/02/27 03:38:54",
"log":{
"file":{
"path":"/path/test_1.log"
}
},
"beat":{
"hostname":":",
"name":":",
"version":"6.6.1"
},
"@timestamp":"2019-03-18T06:44:43.224Z",
"host":{
"name":":"
},
"content":"",
"source":"/path/test_1.log",
"puid":"f1b25095cc823e63389ff299622b7e85",
"offset":0,
"input":{
"type":"log"
},
"message":"f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 send packet! opcode:3 message is in lua",
"prospector":{
"type":"log"
}
},
"_index":"test"
},
{
"_score":1,
"_type":"doc",
"_id":"0GmLj2kB7ah0Pw2MmQGw",
"_source":{
"datatime":"2019/02/27 03:38:54",
"log":{
"file":{
"path":"/path/test_1.log"
}
},
"beat":{
"hostname":":",
"name":":",
"version":"6.6.1"
},
"@timestamp":"2019-03-18T06:44:43.224Z",
"host":{
"name":":"
},
"content":"",
"source":"/path/test_1.log",
"puid":"f1b25095cc823e63389ff299622b7e85",
"offset":318,
"input":{
"type":"log"
},
"message":"f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 PacketManager::_onReceivedPacket| ReceivedPacket size:27",
"prospector":{
"type":"log"
}
},
"_index":"test"
}
]
参考:
- https://note.yuchaoshui.com/blog/post/yuziyue/filebeat-use-ingest-node-dealwith-log-then-load-into-elasticsearch
- http://www.axiaoxin.com/article/236/
- https://blog.csdn.net/spring_ming/article/details/62232331
最新文章
- 【转】【51CTO 网+】怎样做一款让用户来电的产品
- Linux(CentOs6.4)安装Git
- SQLiteOpenHelper的使用
- mysql 导入导出方法。
- Git学习笔记(3)——撤销修改和文件的删除
- 使用Oracle的审计功能记录连接数据库登录失败的用户信息
- C#条件编译,发布多平台和多种选择性的项目
- 微信公共服务平台开发(.Net 的实现)8-------处理图片(上传下载发送)
- 用Tupper自我指涉公式造图
- spf13配置问题
- Spring JdbcTemplate批量操作数据库
- Type Unknown error: java.lang.NullPointerException
- 白皮书之C++学习第一天
- Android如何在一个TextView中实现多种文本风格?
- 可能会导致循环或多重级联路径。请指定 ON DELETE NO ACTION 或 ON UPDATE NO ACTION,或修改其他 FOREIGN KEY 约束。
- [Compression] Hadoop 压缩
- SQL Data Compare 对比 SQLserver数据
- JS禁止右键查看源码,禁止复制,复制内容到剪切板
- PAT 甲 1005. Spell It Right (20) 2016-09-09 22:53 42人阅读 评论(0) 收藏
- socket编程—— 服务器遇到Broken Pipe崩溃