转载自:https://mp.weixin.qq.com/s?__biz=MzU4MjQ0MTU4Ng==&mid=2247491814&idx=1&sn=3b0f1a3477a79b60b79456958574127f&chksm=fdbaeffbcacd66ede96069fa77ed87f1ba4c94a1db957aa32bff6b5972cf37a9480d6167f6e9&scene=178&cur_album_id=1837018771551485956#rd

fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和 HTTP 请求内容。数据被收集后按照用户配置的解析规则,形成一系列 event。每一个 event 包含如下内容:

tag = xxx
time = xxx
record = {
"key1": "value1",
"key2": "value2"
}

其中:

  • tag:为数据流的标记。fluentd 中可以具有多个数据源,解析器,过滤器和数据输出。他们之前使用 tag 来对应。类似于数据流按照 tag 分组。数据流向下游的时候只会进入 tag 相匹配的处理器。
  • time:event 产生的时间,该字段通常由日志内的时间字段解析出来。
  • record:日志的内容,为 JSON 格式。

fluentd 支持多种数据的解析过滤和输出操作。其中常用的有:

  • tail 输入:增量读取日志文件作为数据源,支持日志滚动。
  • exec 输入:定时执行命令,获取输出解析后作为数据源。
  • syslog 输出:解析标准的 syslog 日志作为输入。
  • forward 输入:接收其他 fluentd 转发来的数据作为数据源。
  • dummy:虚拟数据源,可以定时产生假数据,用于测试。
  • regexp 解析器:使用正则表达式命名分组的方式提取出日志内容为 JSON 字段。
  • record_transformer 过滤器:人为修改 record 内的字段。
  • file 输出:用于将 event 落地为日志文件。
  • stdout:将 event 输出到 stdout。如果 fluentd 以 daemon 方式运行,输出到 fluentd 的运行日志中。
  • forward:转发 event 到其他 fluentd 节点。
  • copy:多路输出,复制 event 到多个输出端。
  • kafka:输出 event 到 Kafka。
  • webhdfs:输出 event 到 HDFS。
  • elasticsearch:输出 event 到 HDFS。

接下来以官网介绍为基础,穿插自己的理解,介绍下 fluentd 的使用方法。

安装启动方法

官网安装步骤链接:https://docs.fluentd.org/installation/install-by-rpm

下面是精简的在 CentOS 下的安装步骤。打开 shell,执行如下命令:

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh

systemctl start td-agent

可以安装并启动 fluentd。

1.配置文件位置

编辑 fluentd 配置文件的方法:

vim /etc/td-agent/td-agent.conf

2.修改运行用户和组

默认来说 fluentd 使用 td-agent 用户启动。如果需要修改 fluentd 的用户,需要执行:

vim /usr/lib/systemd/system/td-agent.service

文件内容如下所示:

[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target [Service]
User=td-agent
Group=td-agent
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.log
Environment=TD_AGENT_OPTIONS=
EnvironmentFile=-/etc/sysconfig/td-agent
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120 [Install]
WantedBy=multi-user.target

修改Service部分User和Group配置项可以更改 fluentd 进程的用户和组。

3.检测配置文件是否正确的方法

在 shell 中运行:

/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf

观察输出,如果有错误会给出对应提示。

4.数据流逻辑

fluentd 以 tag 值为基准,决定数据的流经哪些处理器。

数据的流向为:source -> parser -> filter -> output

input 配置

1.tail

增量读取日志文件。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。

tail 针对日志滚动的支持:tail 方式采用跟踪文件 inode 的方式进行。比如日志名为app.log,如果日志发生滚动,被重命名为app.log.1。文件重命名的时候 inode 是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail 会跟踪旧文件的 inode 一段时间(rotate_wait配置),这段时间过去之后,tail 不再监听app.log.1,开始监听新的app.log文件。

tail 方式的示例配置:

<source>
@type tail
path /var/log/httpd-access.log
pos_file /var/log/td-agent/httpd-access.log.pos
tag apache.access
<parse>
@type apache2
</parse>
</source>

注意:如果文件发生修改会输出全量文件内容。

配置项解释

  • tag:数据源的 tag 值。*号可以扩展为 path(/替换为.)。例如
path /path/to/file
tag foo.*

tag 会被扩展为foo.path.to.file

  • path:配置读取的路径。可以使用*或者是strftime。例如:
path /path/to/%Y/%m/%d/*

如果今天是 2020 年 1 月 2 日,fluentd 会读取/path/to/2020/01/02目录下的内容。也可以配置多个路径,使用逗号分隔:

path /path/to/a/*,/path/to/b/c.log
  • exclude_path:排除部分目录或文件,使用数组格式配置。
path /path/to/*
exclude_path ["/path/to/*.gz", "/path/to/*.zip"]
  • refresh_interval:多长时间刷新一次文件监听列表,配合*使用才有意义。

  • pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。注意,不要在多个 source 之间共用 pos file,否则会出现问题。

  • pos_file_compaction_interval:pos file 文件压缩时间间隔。用于压缩 pos file 中不再监听的记录,不可解析的记录以及重复的记录。

  • parse 标签:用于指定 log 的解析器(必须的配置项)。例如:

# json
<parse>
@type json
</parse> # regexp
<parse>
@type regexp
expression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$
</parse>
  • path_key:如果配置此项,监控文件的 path 会在 event 中,此项的 key 为path_key。例如:
path /path/to/access.log
path_key tailed_path

生成的数据如下所示:

{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}
  • rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait。

exec

周期性执行命令,抽取命令输出为 event。

示例配置:

<source>
@type exec
command cmd arg arg
<parse>
keys k1,k2,k3
</parse>
<extract>
tag_key k1
time_key k2
time_format %Y-%m-%d %H:%M:%S
</extract>
run_interval 10s
</source>

以上命令的含义为每 10 秒钟执行cmd arg arg命令,提取命令执行结果,以空白字符分隔三个字段的值为 k1,k2,k3。其中 k1 的值作为 tag,k2 作为时间字段,使用%Y-%m-%d %H:%M:%S格式。

一个例子,周期获取系统的平均负载。配置方法如下:

<source>
@type exec
tag system.loadavg
command cat /proc/loadavg | cut -d ' ' -f 1,2,3
run_interval 1m
<parse>
@type tsv
keys avg1,avg5,avg15
delimiter " "
</parse>
</source>

输出的日志格式为:

2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}

syslog

连接 rsyslog。可以作为 rsyslog 的接收端。

一个配置的例子:

<source>
@type syslog
port 5140
bind 0.0.0.0
tag system
</source>

fluentd 打开 5140 端口监听 rsyslog 发来的 log。

rsyslog 配置文件/etc/rsyslog.conf设置为:

# Send log messages to Fluentd
*.* @127.0.0.1:5140

fluentd 解析到的 event 格式如下:

tag = "#{@tag}.#{facility}.#{priority}"
time = 1353436518,
record = {
"host": "host",
"ident": "ident",
"pid": "12345",
"message": "text"
}

dummy

专用于测试的数据源。周期产生假数据。

配置举例:

<source>
@type dummy
dummy {"hello":"world"}
</source>

dummy 常用参数:

  • tag: 标记值
  • size:每次发送的 event 数量
  • rate:每秒产生多少个 event
  • auto_increment_key:自增键名。如果配置了此项,会有一个 key 为该配置项值的自增键
  • suspend:重启后自增值是否重新开始
  • dummy:测试数据内容

forward

用于接收其他 fluentd forward 过来的 event。

示例配置:

<source>
@type forward
port 24224
bind 0.0.0.0
</source>

output 配置

1.file

输出 event 为文件。默认每天输出一个日志文件。

示例配置:

<match pattern>
@type file
path /var/log/fluent/myapp
compress gzip
<buffer>
timekey 1d
timekey_use_utc true
timekey_wait 10m
</buffer>
</match>

包含的参数类型:

  • path:path 支持 placeholder,可以在日志路径中嵌入时间,tag 和 record 中的字段值。例如:
path /path/to/${tag}/${key1}/file.%Y%m%d
<buffer tag,time,key1>
# buffer parameters
</buffer>

注意:buffer 标签后面的内容为 buffer chunk key。Buffer 根据这些 key 分段。

  • append:flush 的 chuck 是否追加到已存在的文件后。默认为 false,便于文件的并行处理。
  • format 标签,用来规定文件内容的格式,默认值为 out_file。
  • inject 标签,用来为 event 增加 time 和 tag 等字段。
  • add_path_suffix:是否增加 path 后缀
  • path_suffix:path 后缀内容,默认为.log。
  • compress:采用什么压缩格式,默认不压缩。
  • recompress:是否在 buffer chunk 已经压缩的情况再次压缩,默认为 false。

forward

将 event 转发到其他的 fluentd 节点。如果配置了多个 fluentd 节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用 copy。

配置示例:

<match pattern>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s <server>
name myserver1
host 192.168.1.3
port 24224
weight 60
</server>
<server>
name myserver2
host 192.168.1.4
port 24224
weight 60
</server>
... <secondary>
@type file
path /var/log/fluent/forward-failed
</secondary>
</match>

server 标签内可以配置如下字段:

  • host
  • name
  • port
  • shared_key
  • username
  • password
  • standby 标记 server 为备用,只有其他 node 不可用的时候才会启用 standby 的 node
  • weight 负载均衡的权重配置

copy

多路输出(复制 event 到多个输出端)

示例配置

<match pattern>
@type copy
<store>
@type file
path /var/log/fluent/myapp1
...
</store>
<store>
...
</store>
<store>
...
</store>
</match>

其中每一个 store 是一路输出。

重要参数:

  • copy_mode:复制模式。可选值有

    • no_copy:每路输出共享 event。
    • shallow:浅拷贝,如果不修改嵌套字段可以使用。
    • deep:深拷贝,使用msgpack-ruby方式。
    • marshal:深拷贝,使用marshal方式。
  • store 标签的 ignore_error 参数:如果被标记 ignore_error 的 store 出现错误,不会影响其他的 store。官网的例子为:

<match app.**>
@type copy
<store>
@type plugin1
</store>
<store>
@type plugin2
</store>
</match>

假如 plugin1 出现错误,plugin2 也不会执行。如果在 plugin1 的 store 添加上 ignore_error 参数,如下所示:

<match app.**>
@type copy
<store ignore_error>
@type plugin1
</store>
<store>
@type plugin2
</store>
</match>

上述情况 plugin2 的运行不受影响。通常为不重要的 store 添加 ignore_error 参数。

http

通过 http 请求的方式发送 event。payload 的格式由 format 标签决定。

示例配置:

<match pattern>
@type http endpoint http://logserver.com:9000/api
open_timeout 2 <format>
@type json
</format>
<buffer>
flush_interval 10s
</buffer>
</match>

该例子使用 http 方式将 event 发送到http://logserver.com:9000/api,使用 post 方式,连接超时时间为 2 秒。输出格式为 json,每 10 秒钟输出一次。

注意:如果使用 JSON 的方式发送,HTTP 请求的 content-type 为 application/x-ndjson (newline-delimited JSONs)。如果用 spring mvc 接收会提示不支持。可以使用HTTPServletRequest接收 request body。

stdout

标准输出的模式,如果使用后台模式运行 fluentd,输出到 fluentd 的日志。多用于 debug 的时候。

配置方法:

<match pattern>
@type stdout
</match>

elasticsearch

输出 event 到 elasticsearch。

示例配置:

<match my.logs>
@type elasticsearch
host localhost
port 9200
logstash_format true
</match>

可选参数:

  • host:单个 elasticsearch 节点地址
  • port:单个 elasticsearch 节点的端口号
  • hosts:elasticsearch 集群地址。格式为 ip1:port1,ip2:port2...
  • user 和 password:elasticsearch 的认证信息
  • scheme:使用 https 还是 http。默认为 http 模式
  • path:REST 接口路径,默认为空
  • index_name:index 名称
  • logstash_format:index 是否使用 logstash 命名方式(logstash-%Y.%m.%d),默认不启用
  • logstash_prefix:logstash_format 启用的时候,index 命名前缀是什么。默认为logstash

kafka

把 event 输出到 kafka。

示例配置如下:

<match pattern>
@type kafka2 # list of seed brokers
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>
use_event_time true # buffer settings
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer> # data type settings
<format>
@type json
</format> # topic settings
topic_key topic
default_topic messages # producer settings
required_acks -1
compression_codec gzip
</match>

重要的参数为:

  • brokers:Kafka brokers 的地址和端口号
  • topic_key:record 中哪个 key 对应的值用作 Kafka 消息的 key
  • default_topic:如果没有配置 topic_key,默认使用的 topic 名字
  • format 标签:确定发送的数据格式
  • use_event_time:是否使用 fluentd event 的时间作为 Kafka 消息的时间。默认为 false。意思为使用当前时间作为发送消息的时间
  • required_acks:producer acks 的值
  • compression_codec:压缩编码方式

webhdfs

event 通过 REST 方式写入到 HDFS。

HADOOP 启用 webhdfs 的方法

core-site.xml

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://10.180.210.172:9000</value>
</property>
</configuration>

hdfs-site.xml

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
</property> <property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.support.broken.append</name>
<value>true</value>
</property>
</configuration>

最后执行$HADOOP_HOME/sbin/httpfs.sh start命令启动 webhdfs 支持。

注意:此时 webhdfs 的端口号为 50070。

示例配置和参数

示例配置:

<match access.**>
@type webhdfs
host namenode.your.cluster.local
port 50070
path "/path/on/hdfs/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
<buffer>
flush_interval 10s
</buffer>
</match>

注意:需要保证 HDFS 的目标目录具有写入权限。debug 过程发现 fluentd 请求 webhdfs 没有使用 user proxy,HDFS 认为操作的用户为 dr.who,无法创建文件。为了解决这个问题,设置 HDFS 目标目录的权限为 777。

重要参数:

  • host:namenode 的地址
  • port:namenode 的端口号
  • path:写入文件路径。可以使用占位符或者 ruby 表达式。可以使用如下方式表示时间:
\%Y: year including the century (at least 4 digits)
\%m: month of the year (01..12)
\%d: Day of the month (01..31)
\%H: Hour of the day, 24-hour clock (00..23)
\%M: Minute of the hour (00..59)
\%S: Second of the minute (00..60)

输出参数:

  • timekey:多久输出一次文件到 HDFS。如果 path 中没有配置占位符,默认为 86400(1 天)。如果指定了和时间相关的占位符,则文件输出周期自动和最小的时间占位符单位一致
  • timekey_wait:允许等待来迟日志的最长时间
  • flush_interval:flush 间隔时间,默认为不设置
  • flush_at_shutdown:关闭的时候是否 flush。如果使用内存类型的 buffer,需要配置为 true

parser 配置

1.regexp

使用正则表达式命名分组的方式从日志(一行或多行)中提取信息。可以通过 time_key 指定 event 的 time 字段的名字。名字为 time 字段名的分组内容会被抽取为 event 时间。

一个在线测试正则表达式的工具:http://fluentular.herokuapp.com/

基本配置格式:

<parse>
@type regexp
expression /.../
</parse>

正则表达式可以添加额外的参数:忽略大小写:/.../i 多行匹配:/.../m。注意,此时.匹配新行 同时使用忽略大小写和多行匹配:/.../im

一个例子,示例配置如下:

<parse>
@type regexp
expression /^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$/
time_key logtime
time_format %Y-%m-%d %H:%M:%S %z
types id:integer
</parse>

如下的数据:

[2013-02-28 12:00:00 +0900] alice engineer 1

会被解析为:

time:
1362020400 (2013-02-28 12:00:00 +0900) record:
{
"name" : "alice",
"title": "engineer",
"id" : 1
}

filter 配置

1.record_transformer

record_transformer 用来修改 event 的结构,增加或修改字段。

一个 record_transformer 的例子:

<filter foo.bar>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
tag ${tag}
</record>
</filter>

这个 filter 匹配 tag 为foo.bar的 source。event 增加了两个新的字段:hostname 和 tag。

其中 hostname 这里使用了 ruby 表达式。tag 使用了字符串插值。

如果数据为:

{ "message": "hello world!" }

会被转换为:

{
"message": "hello world!",
"hostname": "db001.internal.example.com",
"tag": "foo.bar"
}

可以通过添加 enable_ruby 配置,在${}中使用 ruby 表达式。

例如:

<filter foo.bar>
@type record_transformer
enable_ruby
<record>
avg ${record["total"] / record["count"]}
</record>
</filter>

如下输入:

{ "total": 100, "count": 10 }
``` 会被转换为:
```
{ "total": 100, "count": 10, "avg": "10" }
``` 注意,可以启用auto_typecast true配置实现自动类型转换。 修改字段的例子:
```
<filter foo.bar>
@type record_transformer
<record>
message yay, ${record["message"]}
</record>
</filter>
``` 如下输入:
```
{ "message": "hello world!" }
``` 会被修改为:
```
{ "message": "yay, hello world!" }
``` 可以在表达式中配置 tag_parts 变量,引用 tag 的第 n 部分。如下所示: ```
<filter web.*>
@type record_transformer
<record>
service_name ${tag_parts[1]}
</record>
</filter>
``` 如果遇到 tag 为web.auth的数据:
```
{ "user_id": 1, "status": "ok" }
``` 会被转换为:
```
{ "user_id": 1, "status": "ok", "service_name": "auth" }
``` 2.record 标签 record 标签的语法为:
```
<record>
NEW_FIELD NEW_VALUE
</record>
``` 表达式中可以配置如下变量:
- record:获取 record 中某些字段的内容。例如record["count"]
- tag:获取 tag 的内容
- time:获取日志的时间戳
- hostname:获取主机名字,和#{Socket.gethostname}作用一样
- tag_parts[N]:tag 以.分隔,获取 tag 的第 N 部分
- tag_prefix[N]:获取 tag 的 0-N 部分
- tag_suffix[N]:获取 tag 的 N-结尾部分 例如 tag 为debug.my.app,tag_parts[1]返回my。tag_prefix和tag_suffix的结果如下: ```
tag_prefix[0] = debug tag_suffix[0] = debug.my.app
tag_prefix[1] = debug.my tag_suffix[1] = my.app
tag_prefix[2] = debug.my.app tag_suffix[2] = app
``` 2.配置文件使用通配符和扩展 `<match>`和`<filter>`标签可以使用通配符和扩展。 tag 以`.`为分隔符,分隔为多个部分。 fluentd 支持的通配符和扩展有:
- `*`:只匹配一个部分。比如`a.*`匹配a.b,但是不匹配a或a.b.c。
- `**`:匹配 0 个或多个部分。比如`a.**`匹配a,a.b和a.b.c。
- `{X,Y,Z}`:匹配 X 或 Y 或 Z。
- `#{expression}`:使用嵌入的 ruby 表达式。有一些快捷变量可以直接使用,例如`#{hostname}`和`#{worker_id}`。
- `${..}`:使用变量值,tag,record 可以使用如下的方式指定默认值。例如:`#{ENV["FOOBAR"] || use_default}`。如果 FOOBAR 环境变量不存在,则使用use_default这个值。 注意:match 标签的匹配过程是有顺序的。比如说下面的例子:
```
<match **>
@type blackhole_plugin
</match> <match myapp.access>
@type file
path /var/log/fluent/access
</match>
``` 因为上面的 match 总是能被匹配到,下面的 match 永远没有机会执行。 ## Buffer buffer 为 fluentd 很关键的配置,意为缓冲区。可以决定收集的数据存入什么介质,多长时间输出一次等。 buffer 标签必须配置在 match 标签内(即在输出端配置)。 buffer 具有一个`@type` 属性,用来配置 buffer 的储存介质:
```
<buffer>
@type file
</buffer>
``` @type` 有两个值:
- file:存入文件
- memory:存入内存,这个是默认值 buffer 标签后面可以跟随 chunk keys,用来决定 buffer 以 record 的什么字段来分段存放。例如:
```
<buffer ARGUMENT_CHUNK_KEYS>
# ...
</buffer>
``` 注意:
- 可以指定多个 buffer chunk keys,使用逗号分隔。
- 如果没有配置 chunk key,所有的 event 都会写入同一个 chunk file,直到 buffer 滚动。 buffer 如果使用 time 作为 chunk key,可以按照时间对 buffer 进行分段。其中:
- timekey:时间的跨度
- timekey_wait:flush 延迟时间,用于等待迟到的数据 官网的例子如下: ```
<match tag.**>
# ...
<buffer time>
timekey 1h # chunks per hours ("3600" also available)
timekey_wait 5m # 5mins delay for flush ("300" also available)
</buffer>
</match> # Time chunk key: events will be separated for hours (by timekey 3600) 11:59:30 web.access {"key1":"yay","key2":100} ------> CHUNK_A 12:00:01 web.access {"key1":"foo","key2":200} --|
|---> CHUNK_B
12:00:25 ssh.login {"key1":"yay","key2":100} --|
``` 部分经常用到的配置参数:
- timekey_use_utc:使用国际标准时间还是当地时间,默认是使用当地时间。
- timekey_zone:指定时区。
- chunk_limit_size:chunk 大小限制,默认 8MB。
- chunk_limit_records:chunk event 条数限制。
- total_limit_size:总 buffer 大小限制。
- chunk_full_threshold:chunk 大小超过 chunk_limit_size * chunk_full_threshold 时会自动 flush。
- queued_chunks_limit_size:限制队列中的 chunk 数目,防止频繁 flush 产生过多的 chunk。
- compress:压缩格式,可使用 text 或 gzip。默认为 text。
- flush_at_shutdown:关闭时候是否 flush。对于非持久化 buffer 默认值为 true,持久化 buffer 默认值为 false。
- flush_interval:多长时间 flush 一次。
- retry_timeout:重试 flush 的超时时间。在这个时间后不再会 retry。
- retry_forever:是否永远尝试 flush。如果设置为 true 会忽略 retry_timeout 的配置。
- retry_max_times:重试最大次数。
- retry_type:有两个配置值:retry 时间间隔,指数级增长或者是固定周期重试。
- retry_wait:每次重试等待时间。
- retry_exponential_backoff_base:retry 时间指数扩大倍数。
- retry_max_interval:最长 retry 时间间隔。
- retry_randomize:是否随机 retry 时间间隔。 ## 配置文件重用 可以通过`@include` 配置文件路径方式,引用其他配置文件片段到 fluentd 主配置文件中。 配置文件路径可以使用绝对路径或相对路径。相对路径的基准路径为 fluentd 主配置文件所在的路径。 `@include`可以出现在主配置文件的任何位置。 ## Docker 日志输出到 fluentd 通过配置 fluentd logging driver 的方式实现。该 driver 发送的 log 信息包含:
![](https://img2022.cnblogs.com/blog/794174/202206/794174-20220624103918550-1205185008.png) 1.全局配置方式 修改/etc/docker/daemon.json,增加如下内容:
```
{
"log-driver": "fluentd",
"log-opts": {
"fluentd-address": "fluentdhost:24224"
}
}
``` 然后重启 docker daemon 使配置生效。 也可以通过添加`--log-driver`和`--log-opt`参数的方式指定某个 container 使用 fluentd logging driver。如下所示:
```
docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224
``` 可以通过在`--log-opt`后指定 tag 的方式,确定 source 的 tag。 Docker 官网参考链接:https://docs.docker.com/config/containers/logging/fluentd/ ## 配置实例 1.实例 1 采集/root/my.txt文件(内容格式为 key value),并发送到http://localhost:9090/。 fluentd 的配置文件如下:
```
<source>
@type tail
path /root/my.txt
pos_file /root/my.txt.pos
tag my
<parse>
@type regexp
expression /(?<key>\w+)\s(?<value>\w+)/
</parse>
</source> <match my>
@type http endpoint http://localhost:9090/
open_timeout 2
http_method post <format>
@type json
</format>
<buffer>
flush_interval 3s
</buffer>
</match>
``` 2.实例 2 提取用户操作记录,打印到 fluentd 日志。
```
<source>
@type tail
# 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_history
path "#{ENV["HISTFILE"] || /root/.bash_history}"
pos_file /root/.bash_history.pos
tag history
<parse>
@type none
</parse>
</source> <filter history>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter> <match history>
@type stdout
</match>
``` 3.实例 3 收集用户操作记录转发到另一个 fluentd 节点,同时将数据发送到 Kafka 和存入 HDFS。 数据流为:fluentd 采集端 -> fluentd 收集端 -> kafka 和 HDFS 示例用户操作记录数据为:
```
root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130 tail -f /var/log/command.his.log
``` 采集节点的配置:
```
<source>
@type tail
path /var/log/command.his.log
pos_file /var/log/command.his.log.pos
tag history
<parse>
@type regexp
# 使用正则解析日志文件
expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/
time_key time
</parse>
</source>
<filter history>
@type record_transformer
<record>
# event内容增加hostname这一行
hostname ${hostname}
</record>
</filter> <match history>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<buffer>
# 1秒钟向另一个fluentd节点转发一次
flush_interval 1s
</buffer>
<server>
name myserver1
host 10.180.210.172
port 24225
weight 60
</server>
</match>
``` fluentd 收集节点的配置:
```
<source>
@type forward
port 24225
bind 0.0.0.0
tag remote
</source> <match remote>
# 使用copy方式,分两路输出
@type copy
<store>
@type kafka2 brokers 10.180.210.172:9092
use_event_time true <buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer> <format>
@type json
</format> default_topic history required_acks -1
</store>
<store>
@type webhdfs
host 10.180.210.172
port 50070
path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
<buffer>
flush_interval 60s
</buffer>
</store>
</match>
```

最新文章

  1. Ubuntu16.04安装Screenlets
  2. scala-协变、逆变、上界、下界
  3. web设计经验&lt;三&gt;值得你深入了解的交互设计5大支柱
  4. spring中文乱码问题
  5. MySQL性能参数详解 - max_connect_errors
  6. 解决Windows时间同步失败问题!系统时间同步设置!
  7. 简易CSS3 Tab菜单 Tab切换滑块动画
  8. codeforces 660C Hard Process
  9. ThreadLocal 笔记
  10. openStack工具集
  11. Python 自带IDLE中调试程序
  12. scala map
  13. 记录idea maven项目打包部署web项目mapper扫描失败
  14. 分布式代码管理系统Git实践
  15. setting.py
  16. 自定义select标签箭头样式
  17. 【python小工具】linux 低权限密码记录 提权小套路
  18. net core体系-web应用程序-4net core2.0大白话带你入门-2asp.net core新建项目
  19. emacs 集成astyle
  20. 洛谷P1126 机器人搬重物【bfs】

热门文章

  1. Learning Latent Graph Representations for Relational VQA
  2. 用Bootstrap4写了一个WordPress主题Writing
  3. IDEA的概述和IDEA的安装
  4. 微信开发在Pc端调用公众号粉丝发送过来的图片素材
  5. DNS 系列(三):如何免受 DNS 欺骗的侵害
  6. 4-10 CS后台项目练习-3 || Redis
  7. gdb调试器在windows下的相关内容
  8. 类型转换_float()函数
  9. File类创建删除功能的方法和File类遍历目录功能
  10. React报错之Objects are not valid as a React child