logstash split插件的使用(将一个事件拆分成多个事件)
2024-10-21 11:31:06
kafka中的原始数据格式(1条数据)
{
"body": {
"cwd": "/home/test/",
"monitor": {
"proc_num": 2,
"procs": [{
"cmd": "",
"cpu_usage_rate": 2.0,
"mem_usage_rate": 3.0,
"pid": 4976,
"procname": "test-name"
}, {
"cmd": "/home/test2",
"cpu_usage_rate": 5.0,
"mem_usage_rate": 6.0,
"pid": 4977,
"procname": "test-name2"
}],
"timestamp": 1547124214814
},
"os_tag": "Linux",
"system": {
"connection": {
"haddr": "00:50:56:B3:7E:7A",
"ip": "192.168.21.80",
"name": "ens160"
},
"cpu": ["Intel Xeon", "Intel Xeon", "Intel Xeon", "Intel Xeon"],
"memory": {
"swap_total": "7918841856",
"total": "15600787456"
},
"uname": "Linux Linux 3.10.0-862.el7.x86_64 x86_64 x86_64",
"vendor": "CentOS 7.5.1804"
}
},
"meta": {
"request_id": "3-14865"
}
}
logstash处理后的数据格式(2条数据)
{
"hostname": "test",
"procs": {
"mem_usage_rate": 2.0,
"cpu_usage_rate": 3.0,
"pid": 4976,
"cmd": "",
"procname": "test-name"
},
"@timestamp": "2019-01-11T02:08:57.225Z",
"memory": {
"total": "3975188480",
"swap_total": "4177522688"
},
"connection": {
"ip": "192.168.31.182",
"name": "ens160",
"haddr": "00:50:56:B3:7E:35"
},
"proc_num": 4
}
{
"hostname": "test",
"procs": {
"mem_usage_rate": 5.0,
"cpu_usage_rate": 6.0,
"pid": 4976,
"cmd": "test",
"procname": "test-name"
},
"connection": {
"ip": "192.168.31.182",
"name": "ens160",
"haddr": "00:50:56:B3:7E:35"
},
"proc_num": 4
}
logstash的配置
input {
kafka {
bootstrap_servers=> "192.168.31.92:9092,192.168.31.93:9092,192.168.31.94:9092"
group_id => "test_group"
topics =>"test_topic"
auto_offset_reset => "earliest"
type => "test_type"
consumer_threads => 1
codec => "json"
}
}
filter{
if !([body][monitor][procs]) {
drop { }
}
mutate {
remove_field => ["body[cwd]","body[os_tag]","body[system][filesystem]","body[system][cpu]","body[system][disk]",
"body[system][has_docker]","body[system][if]","body[system][uname]","body[system][vendor]","meta","url","body[configuration]"]
}
date {
match => ["body[monitor][timestamp]","UNIX_MS"]
remove_field => ["body[monitor][timestamp]"]
}
mutate {
add_field => {
"client_id" => "%{params[client_id]}"
"system" => "%{body[system]}"
"monitor" => "%{body[monitor]}"
}
remove_field => ["body","params"]
}
json {
source => "system"
remove_field => ["system"]
}
json {
source => "monitor"
remove_field => ["monitor"]
}
if ([procs]) {
split {
field => "procs"
}
}
}
output {
elasticsearch {
hosts => ["192.168.21.80:9200"]
index => "test_index"
codec => "json"
}
}
最新文章
- Myeclipse开发环境下文件中出现的提示错误与解决方法:The import javax.servlet cannot be resolved?
- 自定义RadioButton样式
- iOS中文网址路径转换URLEncode
- PHP操作Mongodb之高级查询篇
- sort,ksort,asort的区别
- 【BZOJ】【3442】学习小组
- JDBC 与ODBC的区别
- (转)OS X Mountain Lion 系统配置 Apache+Mysql+PHP 详细教程
- Cloud Foundry中warden的网络设计实现——iptable规则配置
- UVA 839 (13.08.20)
- Scrapy详解
- 【转】Cookie/Session机制详解
- js转盘游戏
- Vue2 学习笔记5
- CGI、FAST-CGI、PHP-CGI、PHP-FPM的关系
- 简单说一下UWP中的JumpList
- 坑人的 Javascript 模块化编程 sea.js
- 创建一个Maven Web应用程序
- python openpyxl.md
- vue上传文件
热门文章
- 1082 射击比赛 (20 分)C语言
- C# 获取WebBrowser内容的高度
- springboot2 整合redis
- 洛谷p1502窗口的星星 扫描线
- git 查看修改账号密码
- DirectX11 Windows Windows SDK--28 计算着色器:波浪(水波)
- SqlServer分页存储过程(多表查询,多条件排序),Repeater控件呈现数据以及分页
- Java入门 - 语言基础 - 16.数组
- spring-cloud-gateway报错
- Mysql 8+ 版本完全踩坑记录