需求:input为json,output为ES,需使用filter提取json中的某个字段,并执行加法、加法、乘法等算法操作

思路:mutate过滤器+ruby过滤器实现

避坑:根据ES及Logstash版本 参考官网API

配置:

input

{"timestamp": 1538545228,"rect_id": 205,"serial_no":"OSDC5W9O","location_id":"214_332_123","device_item_id": 113,"device_id": 13,"tenant_id": 2324,"mac_address": "sd3lk2l4l2b4","slave_id": "004","iot_id":"8bf6f72267d951cb87ffd72f982959e1","metric": {"device_status": 0,"g_sensor": 0,"weight": 420,"humidity": 50,"env_temp": 50,"pig_temp": 45,"sow_action": 3,"boar_action": 3,"distance_1": 120,"distance_2": 100,"distance_vertical": 80,"NH3": 0.05,"CO2": 0.1,"illumination": 8.1}}

output

{
"timestamp" => 1538582828,
"rect_id" => 200,
"device_id" => 13,
"@version" => "1",
"tenant_id" => 2324,
"device_item_id" => 113,
"@timestamp" => 2018-10-05T00:45:39.857Z,
"iot_id" => "8bf6f72267d951cb87ffd72f982959e1",
"type" => "logstash-kafka",
"metric" => {
"device_status" => 0,
"g_sensor" => 0,
"humidity" => 50,
"distance_1" => 120,
"weight" => 420,
"pig_temp" => 45,
"sow_action" => 3,
"boar_action" => 3,
"distance_vertical" => 80,
"CO2" => 0.1,
"illumination" => 8.1,
"distance_2" => 100,
"env_temp" => 50,
"NH3" => 0.05
},
"serial_no" => "OSDC5W9O",
"mac_address" => "sd3lk2l4l2b4",
"location_id" => "214_332_123",
"timestamp_origin" => 1538554028,
"slave_id" => "004"
}
input {
kafka {
bootstrap_servers => ["xxxxxx:9092,xxxxxx:9092"] #替换为自己的Kafka集群地址
client_id => "logstash-172.19.100.180"
group_id => "logstash-dev"
auto_offset_reset => "latest" #smallest
consumer_threads => 5
decorate_events => true
topics => ["zds-iot-topic"]
type => "logstash-kafka"
codec => "json"
}
}
filter {
if [type] == "logstash-kafka" {
mutate {
copy => { "timestamp" => "timestamp_origin" }
}
# Add 8 hours
ruby {code => "event.set('timestamp', event.get('timestamp').to_i + 28800)"}
}
} output {
if [type] == "java_log" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "javaapp_log_index"
}
}
if [type] == "logstash-kafka" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "iot_data"
document_type => "sensor"
}
}
stdout { }
}

最新文章

  1. linux下python调用c模块
  2. ndk学习9: 动态使用共享库
  3. VTK 6 和 VTK 5 的不同
  4. 设置DataGridView 显示自己添加编辑的列名,不动态显示数据库本身
  5. BootStrap2学习日记22---点击展开
  6. Azure支持docker简介以及使用指南
  7. 配置RHadoop与运行WordCount例子
  8. 解决 nginx 返回数据不完整的方法
  9. C互质个数
  10. 爬虫框架Scrapy 之(一) --- scrapy初识
  11. Vertica系列: Vertica DB连接负载均衡
  12. ACM-ICPC 2018 南京赛区网络预赛 L 【分层图最短路】
  13. 腾讯地图打开地图选取位置 withMap
  14. Gym 101775B - Scapegoat - [贪心+优先队列]
  15. 多任务Forth系统内存布局
  16. Parallel Programming for FPGAs 学习笔记(1)
  17. 735. Asteroid Collision彗星相撞后的消失数组
  18. Javascript高级编程学习笔记(11)—— 垃圾回收机制
  19. centos 报错 “Job for iptables.service failed because the control process exited with error code.”的解决办法
  20. php 之数组

热门文章

  1. [POJ] The Triangle
  2. GDI/GDI+这些破事
  3. processlist中最哪些状态要引起关注
  4. leetcode705
  5. Python2处理字符集问题
  6. 利用fetch进行POST传参
  7. JAVA集合中的迭代器的遍历
  8. 【LA3713 训练指南】宇航员分组 【2-sat】
  9. selenium3 下载、配置
  10. 搭建yum本地源_阿里云CentOS服务器初始化设置