1.Logtash遇到的异常和注意点

1.1 logstash使用kafka插件和es集成

如果logstash使用kafka插件和es集成,必须设置kafka插件参数 session_timeout_ms => “10000” max_poll_records => “500” 如果这2个值过高会导致es重复消费,而kafka中的offset偏移不会进行增加

列出我的生产环境中拉取kafka的配置文件

个人认为还有待优化。。。
[www@logstash001 config]$ cat logstash-game-kafka.conf
input {
kafka {
id => "game-kafka-input"
bootstrap_servers => ["10.10.147.43:9092"]
group_id => "logstash"
topics => ["java-error", "java-info"]
codec => "json"
#auto_offset_reset => "latest"
}
} filter {
json {
source => "message"
remove_field => [ "message" ]
} if [jv_class] == "RocketmqClient" or [jv_method] == "sendHeartbeatToAllBroker" {
drop { }
} if [jv_message] =~ "The requested url:.*.ico" { drop { } } mutate {
remove_field => ["@version","[beat][name]","[beat][version]","[beat][hostname]"]
} if "beats_input_codec_json_applied" in [tags] {
mutate {
remove_tag => ["beats_input_codec_json_applied"]
}
} date {
match => [ "jv_time" ,"ISO8601" ]
} mutate {
gsub => [
"nx_upstream_host", "-", "0",
"nx_upstream_time", "-", "0",
"nx_upstream_status", "-", "0",
"nx_upstream_connect_time", "-", "0"
]
}
mutate {
convert => {"nx_upstream_time"=>"float"}
convert => {"nx_upstream_response_length"=>"integer"}
convert => {"nx_upstream_connect_time"=>"float"}
}
} output {
elasticsearch {
hosts => ["http://10.10.147.36:9200","http://10.10.147.37:9200","http://10.10.147.38:9200","http://10.10.147.46:9200","http://10.10.147.47:9200"]
user => xxxxxxxxxxxxxxxxx
password => "xxxxxxxxxxxxxxxxxxxx"
index => "game-%{lb_log_type}-%{+YYYY.MM.dd}--"
}
}

1.2 ”retrying failed action with response code: 429

Logstash提示这样的错误是因为bulk operations queue满了,要么调小flush_size的值,或者增大elasticsearch的thread 增大Elasticsearch的bulk线程池队列 配置文件中增加 threadpool.bulk.queue_size: 1000

1.3 logstash数据插入es中效率太慢

增到配置文件中的batch-size和workers和pipeline的参数

列出一些我的生产环境配置

[www@logstash001 config]$ cat logstash.yml
# Settings file in YAML
path.data: /data/www/logstash pipeline.workers: 32
#pipeline 线程数
#pipeline.output.workers: 16
#
pipeline.batch.size: 50000
#每次发送的事件数
pipeline.batch.delay: 5
#发送延时 config.reload.automatic: true
config.reload.interval: 10s log.level: info
path.logs: /log/logstash xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://xxx.xxx.xxx.xxx:9200"]
xpack.monitoring.elasticsearch.username: "xxxxxxxxxx"
xpack.monitoring.elasticsearch.password: "xxxxxxxxxxxxxxx"

1.4 使用ruby函数进行过滤

logstash5.0使用ruby设置值和取值 例子:code => “event.set(‘server_time’ , Time.now())”设置当前server_time值为当时时间 code => “event.get(‘server_time’)” 获取server_time的值 logstash5.0之前使用ruby 列子:code => “event.[‘se , rver_time’] = Time.now()”设置当前server_time值为当时时间 code => “event.[‘server_time’]”获取server_time的值

1.5 Auto offset commit failed for group clio-consr-biz-go1:

报错提示:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

提交消费组offset失败,无法完成重新分配partition。 网上描述:该问题是因为logstash无法在限定时间内消费完所有的数据,超出了kafka端设定的session timeout,导致session挂掉,且之前消费过的数据offset未能返回给kafka。在kafka端会认为该数据没有正确消费,并进行重新partition。logstash端超时后会重新建立consumer进行数据拉取,而kafka端会因为offset的问题重新发送之前“消费失败”的数据。 解决办法,增加session.timout.ms的值或者减少max.poll.records。 注意session.timout.ms增大的同事也要增加request.timeout.ms参数,而已session.timeout.ms要小于request.timeout.ms

我在kafka 的配置文件增加了上面说的参数:
[www@kafkasrv001 config]$ pwd
/data/soft/kafka/config
[www@kafkasrv001 config]$ vim consumer.properties
session.timout.ms=7000
[www@kafkasrv001 config]$ pwd
/data/soft/kafka/config [www@kafkasrv001 config]$ vim producer.properties
request.timeout.ms=10000

1.6 一定要记得在使用的机器上修改hosts文件!

否则会造成无法连接kafka的情况。因为在某些情况下,连接是直接使用hostname进行的。

1.7 filebeat数据进入logstash

所有filebeat定义的field都是string类型,而进入kafka再进去logstash,kafka会自动识别field转换类型


1.8 Similar Posts

最新文章

  1. VMware Workstation and Hyper-V are not compatible. 解决方案
  2. final 评论 I
  3. 计算机启动boot
  4. Python自动化 【第十六篇】:JavaScript作用域和Dom收尾
  5. Java反射机制<1>
  6. Oracle必须死之奇怪的ORA-06502错误
  7. [Architecture Pattern] Singleton Locator
  8. Support for AMD usage of jwplayer (require js)
  9. HBase -ROOT-和.META.表结构
  10. 343. Integer Break -- Avota
  11. jsp中全局变量和局部变量的设置
  12. 基于EF+MVC+Bootstrap的通用后台管理系统及架构
  13. 教程:安装禅道zentao项目管理软件github上的开发版
  14. Oracle数据库字段数据拆分成多行(REGEXP_SUBSTR函数)
  15. Flack--SQLAlchemy
  16. GreenDao 直接执行SQL的方法
  17. Linux配置防火墙,开启80端口、3306端口
  18. thinkphp save() 跟新失败
  19. 【转】全Javascript的Web开发架构:MEAN和Yeoman【译】
  20. springMVC集成CXF后调用已知的wsdl接口

热门文章

  1. OpenCV & Web Assembly & Web Worker
  2. ES6 & Classes & Interface
  3. js 实现简单的parseInt和parseFloat
  4. dart 匹配基本数组
  5. sql语句的练习,已练习的会以绿色标注!!!
  6. TcaplusDB服务体系揭秘
  7. bootstrap日期范围选择插件daterangepicker详细使用方法
  8. 后端程序员之路 23、一个c++的api framework
  9. 剑指 Offer 62. 圆圈中最后剩下的数字 + 约瑟夫环问题
  10. CCF(通信网络):简单DFS+floyd算法