现如今,大数据技术的发展和应用有着巨幅的增长,Hadoop和相关平台推动起一波数据分析浪潮,今天产生的数据将用来预测明天发生的事情,我们该如何跨过这个海量数据新时代的门槛?又该如何获取更有价值的数据并将其收入囊中?

Apache Flume正是用来应对这个挑战的,无论数据来自什么企业,或是多大量级,通过部署Flume,可以确保数据都安全、及时地到达你的大数据平台,然后你就可以将精力集中在如何洞悉数据上。那么,下面我将带你揭开Flume的神秘面纱。

一、概述

Flume由Cloudera公司开发,是一种提供高可用、高可靠、分布式海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于采集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接收方的能力,用一句话概括,Flume是实时采集日志的数据采集引擎。

二、Flume特性

(1)可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。

(2)可扩展性

Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。

三、Flume架构

Flume主要分为Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application,数据从源头经过Agent的这几个组件最后到达目的地。一个Flume服务可同时运行多个Agent,大致架构如下图:

数据源种类有很多,可以来自directory、http、kafka等,Flume提供了Source组件用来采集数据源。Source种类如下:

(1)spooling directory source:采集目录中的日志

(2)htttp source:采集http中的日志

(3)kafka source:采集kafka中的日志

采集到的日志需要进行缓存,Flume提供了Channel组件用来缓存数据。Channel种类如下:

(1)memory channel:缓存到内存中(最常用)

(2)JDBC channel:通过JDBC缓存到关系型数据库中

(3)kafka channel:缓存到kafka中

缓存的数据最终需要进行保存,Flume提供了Sink组件用来保存数据。Sink种类如下:

(1)HDFS sink:保存到HDFS中

(2)HBase sink:保存到HBase中

(3)Hive sink:保存到Hive中

(4)kafka sink:保存到kafka中

四、适用场景及案例分析

日志—>Flume—>实时计算(如kafka/MQ Storm/Spark Streaming)、日志—>Flume—>离线计算(如ODPS、HDFS、HBase)、日志—>Flume—>ElasticSearch等。

下面结合一个大数据实时处理系统(Flume Kafka Spark Streaming Redis)阐述下Flume在实际应用中所扮演的重要角色。

该实时处理系统整体架构如下:

比如我们要实时统计用户在某个网站上的PV(页面浏览量)、UV(独立访客),那么,对于Flume而言,它的作用就是在于采集用户数据,并且将其发送到kafka集群中指定的topic上。

在我们的场景中,需要配置三个Flume Agent,其中两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都发送到另外一个Flume Agent上。

部署在Web服务器上的两个Flume Agent添加配置文件flume-sink-avro.conf,其配置内容如下:

配置完成后,启动Flume Agent,即可对日志文件进行监听:

Flume Consolidation Agent添加配置文件flume-source_avro-sink_kafka.conf,其配置内容如下:

配置完成后,启动Flume Agent,即可对avro的数据进行监听:

完成上述操作后,如果在Web服务器上有新增的日志数据,就会被我们的Flume程序监听到,并且最终会传输到Kafka的f-k-s topic中,通过Flume强大的数据采集功能,为整个实时处理系统提供了数据保障,之后就可以进行后续的一系列操作。

另外,想要利用Flume采集到更有价值、更符合各自业务需求的数据,我们不得不谈到Flume的事务及拦截器的功劳。

 1、事务

事务保证了数据的可用性(有别于数据库中的事务)。下图的数据流是spooling directory source-> memory channel-> kafka sink,其中memory channel维护了两个事务,分别是PUT事务和Take事务。

1)PUT事务

(1)批量数据循环PUT到putList中;

(2)Commit,把putList队列中的数据offer到queue队列中,然后释放信号量,清空(clear)putList队列;

(3)Rollback,清空(clear)putList队列。

2)Take事务

(1)检查takeList队列大小是否够用,从queue队列中poll;

(2)Event到takeList队列中;

(3)Commit,表明被Sink正确消费掉,清空(clear)takeList队列;

(4)Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部。

 2、拦截器

阿里云-推广AD

有的时候我们希望通过Flume将读取的数据按照业务类型分开存储,或是丢弃或修改一些数据,这时可以考虑使用拦截器Interceptor。

拦截器通过定义类继承org.apache.flume.interceptor.Interceptor接口来实现,用户可以通过该节点定义规则来修改或者丢弃事件。Flume支持链式拦截,通过在配置中指定构建的拦截器类的名称,在source的配置中,拦截器被指定为一个以空格为间隔的列表,它按照指定的顺序调用,一个拦截器返回的事件列表被传递到链中的下一个拦截器,当一个拦截器要丢弃某些事件时,拦截器只需要在返回事件列表时不返回该事件即可,若拦截器要丢弃所有事件,则其返回一个空的事件列表。

public interface Interceptor {

public void initialize();

public Event intercept(Event event);

public List intercept(List events);

public void close();

public interface Builder extends Configurable {

public Interceptor build();

}

}

Flume内置拦截器列举如下:

(1)时间戳拦截器

该拦截器的作用是将时间戳插入到Flume的事件报头中。Source连接到时间戳拦截器的配置:

a1.sources.r1.interceptors=timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false

(2)主机拦截器

该拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。Source连接到主机拦截器的配置:

a1.sources.r1.interceptors=host

a1.sources.r1.interceptors.host.type=host

a1.sources.r1.interceptors.host.useIP=false a1.sources.r1.interceptors.timestamp.preserveExisting=true

(3)静态拦截器

该拦截器的作用是将k/v插入到事件的报头中。Source连接到静态拦截器的配置:

a1.sources.r1.interceptors = static

a1.sources.r1.interceptors.static.type=static

a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false

(4)正则过滤拦截器

该拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。

Source连接到正则过滤拦截器的配置:

a1.sources.r1.interceptors=regex

a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=.* a1.sources.r1.interceptors.regex.excludeEvents=false

其中regex=.*匹配除“\n”之外的任何字符,excludeEvents=false默认收集匹配到的事件;若为true,则会删除匹配到的event,收集未匹配到的。

五、注意事项

1、Flume的停止

使用kill停止Flume进程,不可使用kill -9,因为Flume内部注册了很多钩子函数执行善后工作,如果使用kill -9会导致钩子函数不执行,使用kill时,Flume内部进程会监控到用户的操作,然后调用钩子函数,执行一些善后操作,正常退出。

2、Flume数据丢失问题

Flume可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。另外,Flume有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。

3、Sink从Channel中读取数据的方式

默认情况下,Sink获取数据的方式是:当Source向Channel发送一条数据的时候,Sink会通过循环的方式获取一条数据,然后再发送给客户端。

Sink可以分为KafkaSink和AvroSink, 它们都是通过循环的方式获取数据,但是 KafkaSink可以通过配置topic进行批量从客户端读取。但原理还是一条一条的从Channel读取数据,只是在Sink中存在缓存机制,当数据量达到某一数量的时候,会将数据批量发送到客户端。

  4、CPU占用过高的问题

若程序运行出现CPU占用过高的现象,则可以在代码中加入休眠sleep,这样的话,就可以释放CPU资源,注意,内存资源不会释放,因为线程还未结束,是可用状态。

六、性能调优

Flume经常被用在生产环境中收集后端产生的日志,一个Flume进程就是一个Agent,要充分发挥Flume的性能最主要的是要调好Flume的配置参数。

Flume agent配置分为三个部分:Source、Channel、Sink。

1、Source

(1)增加Source个数(使用tairDirSource时可增加filegroups个数)可以增大Source读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source以保证Source有足够的能力获取到新产生的数据。

(2)batchSize参数决定Source一次批量传输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。

2、Channel 

(1)type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据;type选择file时Channel的容错性更好,但是性能上会比memory channel差。使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能。

(2)capacity参数决定Channel可容纳最大的event条数;transactionCapacity参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数;transactionCapacity需要大于Source和Sink的batchSize参数;byteCapacity是Channel的内存大小,单位是byte。

3、Sink 

(1)增加Sink的个数可以增加Sink消费event的能力。当然Sink也不是越多越好,够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。

(2)batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能。