Agent(代理)中的组件包括 Source、Channel、Sink。
Source 组件可以处理各种类型、各种格式的日志数据。
Flume 中常用的 source:
常用类别 | 描述 |
---|---|
avro | 监听Avro端口并从外部Avro客户端流接收 Event(事件) |
exec | Exec source在启动时运行一个给定的Unix命令,tail -F file |
netcat | 监听给定的端口,并将每一行文本转换为 Event |
soopling directpry | 监控整个目录,读取目录中所有的文件,不支持递归 ,不支持断点续传 |
taildir | 可以监控多个文件或者文件夹的变化,并支持记录读取进度 |
Channel 是位于 Source 和 Sink 之间用于存放 Event 的缓冲区。
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。
Flume 中常用的 Channel:
常用类别 | 描述 |
---|---|
Memary | Event 存储在内存中的队列 |
File | Event 存储在文件中,无需担心数据丢失 |
KafKa | Event 存储在 Kafka 集群中。Kafka 提供了高可用性 |
Sink 将 Channel 中的 Event 批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Flume 中常用的 sink:
常用类别 | 描述 |
---|---|
logger | 控制台输出,日志级别为INFO。通常用于测试/调试 |
hdfs | Event 事件写入HDFS |
avro | 发送到 Sink 的 Flume Event 被转换为 Avro 事件,并发送到配置的主机名/端口 |
HBase | Event 事件写入HBase |
Agent 的组件可以自定义搭配,可以使用不同的 Source、Sink、Channel。
下面罗列几种简单的搭配方式。
exec2logger 表示 exec to logger,2 代表 to,是将 two 音译为 to。常见的有 p2p、log4j 等,都是拟声为数字。
可以先启动 Hadoop 集群:start-dfs.sh
。
复杂的 Flume 启动命令为:
flume-ng agent --conf <Flume配置路径> --conf-file <agent配置文件的绝对路径> --name <agent名称> -logger=INFO,console
以上命令可简写为:
flume-ng agent -c <Flume配置路径> -f <agent配置文件的绝对路径> -n <agent名称> -logger=INFO,console
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 exec |
command | - | 配置执行的命令,比如:tail -F file |
1️⃣ Step1:在 flume 目录下自定义一个目录(我的是 agents 目录),添加配置文件
f
# exec 读取日志文件 将数据发送到 logger sink# Name the components on this agent
exec2logger.sources = r1
exec2logger.sinks = k1
exec2logger.channels = c1# Describe/configure the source
exec2logger.pe = exec
exec2logger.sources.r1mand = tail -F /# Describe the sink
exec2logger.pe = logger# Use a channel which buffers events in memory
exec2logger.pe = memory
exec2logger.channels.c1.capacity = 1000
exec2logger.ansactionCapacity = 100# Bind the source and sink to the channel
exec2logger.sources.r1.channels = c1
exec2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume exec2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n exec2logger -logger=INFO,console
3️⃣ Step3:测试,并查看控制台输出(其实不用touch创建该文件,>>
是进行流的重定向,如果文件不存在就直接新建了)
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 spooldir |
spoolDir | - | 配置成需要读取的文件(绝对路径) |
1️⃣ Step1:添加配置文件
# spooldir 读取同目录中的多个日志文件 将数据发送到 logger sink# Name the components on this agent
spooldir2logger.sources = r1
spooldir2logger.sinks = k1
spooldir2logger.channels = c1# Describe/configure the source
spooldir2logger.pe = spooldir
spooldir2logger.sources.r1.spoolDir = /root/log# Describe the sink
spooldir2logger.pe = logger# Use a channel which buffers events in memory
spooldir2logger.pe = memory
spooldir2logger.channels.c1.capacity = 1000
spooldir2logger.ansactionCapacity = 100# Bind the source and sink to the channel
spooldir2logger.sources.r1.channels = c1
spooldir2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume spooldir2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n spooldir2logger -logger=INFO,console
3️⃣ Step3:测试,当文件内的内容输出完成后,会将该文件后缀标记为 COMPLETED
表示已完成的文件。
4️⃣ Step4:如果我们向完成后的文件输入内容,就无法再获取文件变化了。
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 TAILDIR |
filegroups | - | 配置文件组 |
filegroups.<filegroupName> | - | 为文件组配置对应的文件或目录 |
positionFile | ~/.flume/taildir_position.json | tailDir记录文件文件的绝对路径 |
1️⃣ Step1:添加配置文件
# tailDir 读取同目录中的多个日志文件 将数据发送到 logger sink# Name the components on this agent
taildir2logger.sources = r1
taildir2logger.sinks = k1
taildir2logger.channels = c1# Describe/configure the source
taildir2logger.pe = TAILDIR
taildir2logger.sources.r1.filegroups = g1 g2
taildir2logger.sources.r1.filegroups.g1 = /
taildir2logger.sources.r1.filegroups.g2 = /root/log/.*.txt# Describe the sink
taildir2logger.pe = logger# Use a channel which buffers events in memory
taildir2logger.pe = memory
taildir2logger.channels.c1.capacity = 1000
taildir2logger.ansactionCapacity = 100# Bind the source and sink to the channel
taildir2logger.sources.r1.channels = c1
taildir2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume taildir2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n taildir2logger -logger=INFO,console
3️⃣ Step3:测试1,向文件追加内容时,检测文件变化
4️⃣ Step4:测试2,检测文件夹中的 .txt
文件变化
5️⃣ Step5:taildir记录完文件不会在文件后缀添加 COMPLETED
,而是将记录的位置放在一个json文件中
6️⃣ Step6:格式化该 json 文件,发现一个数组中存储了 json 格式的数据,每个 json 会存储一个唯一标识 inode
、读取的位置 pos
、以及文件的绝对路径 file
[{"inode":134337114,"pos":7,"file":"/"},{"inode":134333732,"pos":12,"file":"/root/"}
]
7️⃣ Step7:向 /
中追加内容,并再次启动 Flume,Flume 会将读取的位置加载一遍,然后接着读,并将新的读取位置记录在 json 中
面试题:日志采集通常用哪种Source?
答:通常使用 tailDir 类型的 Source,之前我们试过 exec、spoolDir,后来应为应用场景当中分的目录比较多,而且为了每次读取能接着上次继续读所以选择了 tailDir,用 tailDir 的主要原因是 tailDir会在 json 文件中记录的位置和读取的文件,如果以后 agent 挂掉时,只需重新启动,就能接着上一次读取的进度继续读
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 memory |
capacity | 100 | channel 中能够存放 Event 的最大数量 |
transactionCapacity | 100 | channel 每次向 sink 发送 Event 的最大数量 |
capacity的估算依据:估算 channel 可能缓存的数据量(通常 2w~20w)与 agent 可用 Jvm 内存(通常20MB)。
默认最大内存为20MB,如果一条日志长度为 512B,那么总共能容纳 4w 多条数据(20x1024x2=40960),但通常 capacity 不会设置太满,太满容易内存溢出。
❓ 默认值既然怎么小,那如何增大 Jvm 内存容量?
⭐️可以通过修改 Flume 启动的配置项实现扩容,vim /flume-1.9.0/bin/flume-ng
可以查看 Flume 的启动脚本并修改 JAVA_OPTS
项:
数据安全性较高的情况下会使用 File 类型的 Channel,避免因程序崩溃而丢失数据。
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 file |
checkpointDir | ~/.flume/file-channel/checkpoint | 检查点文件的绝对路径 |
程序崩溃后再启动时,会先读取 checkpointDir
配置的文件,Sink 将上次未读取完的 Event 从该文件中继续读取。
在 INFO 级别记录事件,通常用于测试/调试目的。所以通常配置 Fume 时,先使用 logger 将 Event 输出到控制台,来测试 Source 是否有问题,然后再修改 Sink 类型。
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 logger |
常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 file |
hdfs.path | - | HDFS 存放日志的路径 |
hdfs.fileType | SequenceFile | 配置文件类型,可配置为 DataStream 或 CompressedStream |
hdfs.writeFormat | Writable | 格式化类型,可配置为 Text |
hdfs.useLocalTimeStamp | false | 配置是否使用本地时间戳 |
30 | 配置滚动周期,单位 s | |
1024 | 配置文件到达的指定大小 | |
10 | 配置向文件写入多少条数据,配置为0表示不依据该项 |
1️⃣ Step1:添加配置文件
# taildir 读取同目录中的多个日志文件 将数据发送到 logger sink# Name the components on this agent
taildir2hdfs.sources = r1
taildir2hdfs.sinks = k1
taildir2hdfs.channels = c1# Describe/configure the source
taildir2hdfs.pe = TAILDIR
taildir2hdfs.sources.r1.filegroups = g1 g2
taildir2hdfs.sources.r1.filegroups.g1 = /
taildir2hdfs.sources.r1.filegroups.g2 = /root/log/.*.txt# Describe the sink
taildir2hdfs.pe = hdfs
taildir2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M
# 将hdfs输出文件类型改为数据流
taildir2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
taildir2hdfs.sinks.k1.hdfs.writeFormat = Text
taildir2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory
taildir2hdfs.pe = memory
taildir2hdfs.channels.c1.capacity = 30000
taildir2hdfs.ansactionCapacity = 3000# Bind the source and sink to the channel
taildir2hdfs.sources.r1.channels = c1
taildir2hdfs.sinks.k1.channel = c1
2️⃣ Step2:一条一条写数据太麻烦,所以写个死循环脚本向文件中输入数据,运行一会就停掉,并查看该文件的大小
[root@node1 ~]# vim printer.sh
[root@node1 ~]# chmod a+x printer.sh
[root@node1 ~]# cat printer.sh
#!/bin/bash
while true
doecho '123123123123123123123123123123123123123123123123123123123123' >>
done
[root@node1 ~]# ll -h
总用量 17M
drwxr-xr-x. 3 root root 65 11月 21 16:51 log
-rw-r--r--. 1 root root 14M 11月 21 23:
-rwxr-xr-x. 1 root root 134 11月 21 23:22 printer.sh
3️⃣ Step3:启动HDFS start-dfs.sh
,启动 Flume taildir2hdfs,可以看到以及已经在疯狂发送日志了
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n taildir2hdfs -logger=INFO,console
4️⃣ Step4:查看web页面发现 /flume/
下有了一个由当前时间命名的文件,进去却发现全是小文件
5️⃣ Step5:flume 一个快的大小为 128MB ,为了能充分利用空间还需要添加配置信息,
# 设置文件的滚动条件 -------------------------------
# 设置滚动周期 单位s
taildir2hdfs.sinks.llInterval = 300
# 设置文件达到指定大小,128MB为134,217,728B,但一般不会放满
taildir2hdfs.sinks.llSize = 130000000
# 设置文件中写入多少条记录
taildir2hdfs.sinks.llCount = 0
6️⃣ Step6:修改后重新运行 Flume ,并在web端查看变化
当前需要两台服务器,如果只有一台配置了Flume,可以通过 scp -rq <file> <hostname>:<path>
将文件发送至其他服务器。
scp -rq /opt/flume-1.9.0/ node2:/opt/
实现如下拓扑结构:
avro sink 常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 avro |
hostname | - | 将类型配置为主机名或 IP 地址 |
port | - | 配置端口号 |
avro source常用配置项 | 默认值 | 描述 |
---|---|---|
type | - | 将类型配置为 avro |
bind | - | 将类型配置为需要监听的主机名或 IP 地址 |
port | - | 配置监听端口号 |
1️⃣ Step1:node1主机,添加配置文件
# taildir 读取同目录中的多个日志文件 将数据发送到 avro sink# Name the components on this agent
taildir2avro.sources = r1
taildir2avro.sinks = k1
taildir2avro.channels = c1# Describe/configure the source
taildir2avro.pe = TAILDIR
taildir2avro.sources.r1.filegroups = g1 g2
taildir2avro.sources.r1.filegroups.g1 = /
taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt# Describe the sink
taildir2avro.pe = avro
taildir2avro.sinks.k1.hostname = node2
taildir2avro.sinks.k1.port = 12345# Use a channel which buffers events in memory
taildir2avro.pe = memory
taildir2avro.channels.c1.capacity = 30000
taildir2avro.ansactionCapacity = 3000# Bind the source and sink to the channel
taildir2avro.sources.r1.channels = c1
taildir2avro.sinks.k1.channel = c1
2️⃣ Step2:node2 主机,添加配置文件
# spooldir读取同目录中的多个日志文件 将数据发送到logger sink# Name the components on this agent
avro2hdfs.sources = r1
avro2hdfs.sinks = k1
avro2hdfs.channels = c1# Describe/configure the source
avro2hdfs.pe = avro
avro2hdfs.sources.r1.bind = node2
avro2hdfs.sources.r1.port = 12345# Describe the sink
avro2hdfs.pe = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
avro2hdfs.sinks.llInterval = 300
avro2hdfs.sinks.llSize = 130000000
avro2hdfs.sinks.llCount = 0# Use a channel which buffers events in memory
avro2hdfs.pe = memory
avro2hdfs.channels.c1.capacity = 30000
avro2hdfs.ansactionCapacity = 3000# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1
3️⃣ Step3:启动 node2 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n avro2hdfs -logger=INFO,console
4️⃣ Step4:查看node2 的 12345 端口是否被监听,确定已在监听状态
5️⃣ Step5:运行一会 node1 的 printer.sh
脚本,启动 node1 的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/f -n taildir2avro -logger=INFO,console
6️⃣ Step6:查看 node2 控制台输出,node1 以成功向 node2 的12345端口发送数据了
7️⃣ Step7:查看web端是否有数据提交
已经跑通!
启动程序的时候,有时候会占用终端,关闭终端就会关闭程序,所以有时我们需要提将程序提交到后台执行,那就需要在命令后面添加一个 &
符号。
如果程序中有内容输出到控制台,而有不想一直在控制台输出,可以在命令前面添加 nohup
命令,将输出的内容输入到文件中,实现程序静默。
本文发布于:2024-02-05 06:24:47,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170726203363824.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |