1.简略介绍

轻量型日志采集器,用于转发和汇总日志与文件。

2.本文实现的功能

3.事先必备:

至少一台Kafka节点。

4.配置Log4j,自定义代码中日志信息输出格式以及文件名称

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" schema="Log4J-V2.0.xsd" monitorInterval="600">
<Properties>
//存放日志的文件夹名称
<Property name="LOG_HOME">logs</Property>
//日志文件名称
<property name="FILE_NAME">collector</property>
//日志格式
//[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] 日志输入时间,东八区
//[%level{length=5}] 日志级别,debug、info、warn、error
//[%thread-%tid] 当前线程信息
//[%logger] 当前日志信息所属类全路径
//[%X{hostName}] 当前节点主机名。需要通过MDC来自定义。
//[%X{ip}] 当前节点ip。需要通过MDC来自定义。
//[%X{applicationName}] 当前应用程序名。需要通过MDC来自定义。
//[%F,%L,%C,%M] %F:当前日志信息所属的文件(类)名,%L:日志信息在所属文件中的行号,%C:当前日志所属文件的全类名,%M:当前日志所属的方法名
//[%m] 日志详情
//%ex 异常信息
//%n 换行
<property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
[%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>
</Properties>
<Appenders>
//日志输出至控制台
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
//全量日志信息
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log"
filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
//日志级别是warn以上的日志信息
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log"
filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<!-- 业务相关 异步logger -->
<AsyncLogger name="com.sakura.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.sakura.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<Appender-Ref ref="CONSOLE"/>
<Appender-Ref ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>

5.Filebeat安装

#上传Filebeat至任意目录下
cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
## 配置filebeat
vim /usr/local/filebeat-5.6.2/filebeat.yml
##可参考下方配置信息
启动:
## 检查配置是否正确
cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest
## Config OK
## 启动filebeat
/usr/local/filebeat-6.6.0/filebeat &
#查看是否启动成功
ps -ef | grep filebeat
Filebeat配置参考信息
###################### Filebeat Configuration Example #########################
filebeat.prospectors: - input_type: log paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- /usr/local/logs/app-collector.log #日志文件地址
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)。具体以哪种形式进行匹配要根据实际的日志格式来配置。
negate: true # 是否必须匹配到
match: after # 以[开头的多行数据,从第二行开始合并到上一行的末尾
max_lines: 2000 # 最大的行数,多余的不再合并到上一行末尾
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志,提交数据
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
evn: dev - input_type: log paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
evn: dev output.kafka:
enabled: true
hosts: ["192.168.204.139:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true

6.在kafka上创建对应的topic

7.启动kafka、代码程序,最后启动Filebeat。

这个时候一切正常的话,Filebeat就会将数据推送至Kafka。可以进入到kafka的“kafka-logs/{topic-partition}”目录下查看日志文件等,当对程序进行访问时相应的日志信息将会被Filebeat采集推送到Kafka指定的topic上。

8.使用Logstash消费Kafka中的数据

A.安装Logstash

Logstash安装及基础命令:https://www.cnblogs.com/monument/p/12950290.html

B.配置Logstash启动脚本

input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => ## 因为只设置了一个partition,所以消费者线程数设置为1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads =>
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
} } filter { ## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
} if "app-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
} if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
} } ## 测试输出到控制台:
output {
stdout { codec => rubydebug }
} ## elasticsearch,未实现:
output { if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["192.168.11.35:9200"]
# 用户名密码
user => "elastic"
password => ""
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
} if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => ""
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
} }

C.启动Logstash

过程较慢,CPU、内存占用极高。启动完成后可以在控制台(上一步配置的是将消息输出到控制台)看到消息输出。

9.将Logstash消费的数据推送到ElasticSearch

待续(太耗资源了,机器内存有限,装不了这么多节点,待扩容、整理后再写。)。

最新文章

  1. Vue + Webpack + Vue-loader 系列教程(1)功能介绍篇
  2. c#接口与抽象类的区别
  3. CSS Sprite雪碧图应用
  4. Andrew Ng机器学习公开课笔记&ndash;Independent Components Analysis
  5. Keil MDK 5.0发布了
  6. vs: 编译: 计算机中丢失mspdb100.dll
  7. Redhat YUM U盘源配置
  8. 常用JS验证和函数
  9. phpmyadmin数据导入最大限制的解决方法
  10. linux学习笔记&lt;命令介绍&gt;
  11. (Pre sell) ZOPO ZP998 (C2 II) 5.5 inch smart phone True Octa Core MTK6592 1920X1080 FHD screen 401 ppi 2GB/32GB 14.0Mp camera-in Mobile Phones from Electronics on Aliexpress.com
  12. NLTK学习笔记(四):自然语言处理的一些算法研究
  13. 实验四 Android程序设计 实验报告
  14. 使用ajax上传图片,支持图片即时浏览,支持js图片压缩后上传给服务器
  15. Eclipse导入maven项目时,pom-xml文件报错处理方法
  16. 创建一个HTTP接口测试
  17. jQuery下ajax事件的简单分析
  18. Vivado与SDK的联合调试方法-使用ILA
  19. [Node.js]29. Level 6: Socket.io: Setting up Socket.io server-side &amp; Client socket.io setup
  20. JavaScript高级 面向对象(10)--onload与jq中ready的区别

热门文章

  1. Golang协程池(workpool)实现
  2. css自动省略号...,通过css实现单行、多行文本溢出显示省略号
  3. Python优秀开源项目Rich源码解析
  4. HDU3686 Traffic Real Time Query System 题解
  5. jira仪表盘的建立与共享
  6. 【JMICRO】 微服务简介及异步RPC体验
  7. electron设置window系统托盘
  8. Linux系统中到底应该怎么理解系统的平均负载
  9. 机器学习实战基础(二十九):决策树(二)DecisionTreeClassifier与红酒数据集
  10. 浅谈工业4.0背景下的空中数据端口,无人机3D 可视化系统的应用