设为首页 加入收藏

TOP

Flume官方文档的理解和配置操作
2019-01-19 14:09:07 】 浏览:104
Tags:Flume 官方 文档 理解 配置 操作

Flume官方文档的理解和配置操作

摘要

Flume是一个分布式的,可靠的,并且具有高可用性的一个服务系统,用于高效地收集、聚合、移动一个大数量的日志文件, 它有一个基于数据流类型的简单且灵活的的架构。 由于它的容错可调 机制和许多故障转移和恢复机制,使得这个架构具有较高的容错性和健壮性。 它使用一个简单的可扩展的数据模型,可以让它在线分析应用程序。

一、概述

Apache Flume是一个分布式的、可靠的、高可用性的一个系统,用于从许多不同来源的一个集中的数据存储中高效的收集、聚合、移动一个大数据量的日志文件(分布式的海量日志的高效收集、聚合、移动/传输的一个框架),特点:简单灵活,是基于一个流式数据处理框架。
Apache Flume 不仅限于日志数据的聚合,由于数据资源是可以定制的,Flume可以用于大量的数据传输服务,包括且不限于网络层流量传输的数据,而且还可以传输社交媒体生成的数据数据, 电子邮件消息数据和几乎任何的数据源。
Apache Flume是Apache软件基金会的顶级项目。
目前发行有两个版本,0.9.x本和1.x版本。
推荐用户们使用1.x版本,以便于利用最新的架构,性能得到了改进且具有配置灵活性。

二、系统需求

1.Java运行环境(jre)1.8版本及以上
2.使用的内存足够用于配置资源
3.磁盘空间足够用于磁盘空间的配置(source5s,channels,sinks)
4.目录权限——读写权限的目录使用代理

三、 体系结构

数据流模型

这里写图片描述
一个Flume数据可以定义为一个单位的字节负载的数据流,并且可以选择字符串属性。一个Flume代理机是一个主机进程从外部源得到数据发送到目的数据源去。
一个Flume source消费数据,是从外部数据源中获取数据 例如一个web服务器,外部源按照格式发送数据到flume ,便于flume识别目标来源。例如,一个Avro flume sources可以用于接收来自Avro客户端的数据或者其他类型的flume代理机, 然后接受到后,AvroChannel发送到Avro Sink。一个类似的流可以被定义用于一个Thrift Flume sources用于接受来自Thrift客户端的数据或者客户段任何语言写好的Flume Thrift协议。一个、、当一个FLume sources接收到一个数据,就把数据储存在一个或者多个Channel中。Channael相当于被动存储这些数据知道这些数据被一个进程发送至Flume sink中去。文件通道就是一个例子,它依赖于本地稳健系统,Sink删除通道中的数据并把它们存储在想HDFS一样的存储结构中,或者将它发送到下一个flume source中等待下一个flume代理机来处理。Source和sink 异步的对channel中的数据进行操作。
互相不影响,channel被动地被处理数据。

理解:

Source:数据从哪里去。去源头读数据,写到Channel中
从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等
Channel:中间管道(内存或磁盘中)。链接Source和Sink,类似一个队列或缓冲区
channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等.
Sink:写到哪里去。从Channel拉数据,写到HDFS中,或把数据写入到下一个agent的source中。
sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.

四、复杂的流

Flume允许用户构建多重流动的数据流, 在这些流被传输到最后重点是,用户可以它进行多重的控制。 它还允许扇入 和扇出流,允许进行路线的选择以便其中一条路线出现故障。

五、可靠性

一个数据的代理是在一个通道中, 然后数据被传输到下一个代理机或者终端存储器 例如HDFS。数据仅在他们存到下一个代理机的通道或者到达终端存储器时会被当前的通道移除。这便是为何flume提供的简答的端到端交付数据,信息传输拥有可靠性。
Flume利用类似事务方法保证的传输数据的可靠性,数据源和sinks封装一个事务用于分别检索放在提供的交易通道中,这将保证数据的集合是通过可靠地点到点的流传输。在这样一个多重流传输情况下,之前传到终端存储器的sink和从hop取到的sources都有他们传输进程用来保证数据安全的存储在通道和hop当中。

六、可恢复性

数据存储在通道中,出现故障时可以随时回复,Flume提供了一个持久耐用的文件通道有本地文件系统支持,还有一个记忆通道,简单地将数据存储在记忆队列中,可以加快数据的传输但是任何一个数据遗留在通道中时代理机出现故障,数据无法恢复。

七、配置

7.1 配置一个flume

Flume代理配置存在一个本地配置文件中,这是一个遵循Java属性文件格式的文本文件,可以指定一个或者多个代理的配置位相同的配置文件,配置文件包含每个数据源的性质,说明了他们是如何连接在一起形成数据流的。

7.2配置单个组件

每个组件(source,channel,sink)有一个名称,类型,并且需要设定特定属性的类型和实例化。例如,一个Avro源需要一个主机名(或IP地址)和接受数据的端口号。一个内存通道拥有存一个最大队列的能力,一个HDFS sink需要知道系统URI,按照路径创建文件,设置文件的属性等。所有这些组件的属性都需要有Flume代理机设定。

7.3把组件联合在一起

代理机需要知道每个组件加载是如何连接并形成流。这是通过列出每一个source,sink和channel在代理机中的命名,然后具体地连接这三个组件。例如,一个来自AVro source的代理数据流命名为avroWeb 发送到 名为hdfs-clust1的HDFS sink,这次传输通过名字为file-channel的通道。配置文件将包含这些的名字组件和file-channel avroWeb作为一个公共信道提供给avroWeb源和hdfs-cluster1 sink。

7.4设置一个代理

一个代理使用shell脚本位于flume的bin目录下的名为flume-ng的文件,你需要制定具体的代理名称,配置目录,在命令行配置文件:

现在,代理开始运行source和sink中的配置属性文件

7.5一个简单的例子

这里,我们给出一个示例配置稳健,描述一个单节点flume部署,该配置允许用户生成数据和随后的日志控制台。

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这个配置定义了一个单一的代理叫a1.a1源在端口44444上获取数据,一个通道在内存中缓冲数据,sink发送日志数据到控制台。配置文件名称和各个组件,然后描述了他们的类型和配置参数。给定配置文件定义的叫代理机,当一个给定的flume过程启动标志这名为代理机表明给定的配置文件可用,我们就可以允许flume了。

有了这个配置文件,我们可以启动Flume如下:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

需要注意的是,在一个完成的配置中通常包括一个选项:–conf=. The 目录下的包括一个shell脚本flume-enc.Sh和一个潜在的log4j文件。例如,我们通过一个java option强制访问日志控制台 但是我们没有自定义环境脚本。
从一个单独的终端,我们可以通过端口44444发送flume数据

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

最终flume中孤单将在一个日志信息输出数据
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

至此我们就成功配置了一个flume agent。

7.6自己配置的示例

Windows下配置flume

步骤1:安装java,配置环境变量。
步骤2:安装flume,下载后解压。
这里写图片描述
步骤3:创建配置文件
在解压后的文件:apache-flume-1.7.0-bin/conf下创建一个example.conf,内容如下。
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤4:打开cmd进入到apache-flume-1.7.0-bin\bin目录下,运行如下命令
flume-ng.cmd agent -conf ../conf -conf-file ../conf/example.conf -name a1 -property flume.root.logger=INFO,console
结果如下:
这里写图片描述
这里写图片描述
这里写图片描述
步骤5:另外打开一个cmd窗口,运行如下命令。
telnet localhost 44444
如图输入状态的telnet客户端,输入信息后,flume即监听到并打印日志
这里写图片描述
至此完成了Windows系统下的配置过程。

八、有环境变量配置文件

Flume有能力配置环境变量,举个例子:
a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${NC_PORT}
a1.sources.r1.channels = c1

注意:

它仅仅对value值进行操作,对key值无效。(仅是在右边的等同于对配置行做标记)
这个可以通过使用java系统代理文件读取来设置属性并且实现= org.apache.flume.node.EnvVarResolverProperties.
举个例子
$ NC_PORT=44444 bin/flume-ng agent –conf conf –conf-file example.conf –name a1
Dflume.root.logger=INFO,console
DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties
需要注意的是以上的都是例子,环境变量也可以通过其他方式来配置,包括用过conf/flume-env.sh来配置。
以上说明了flume自带配置环境变量的功能,例子中说明了操作方式和限制,当然我们完全可以使用别的方式配置相应的环境变量。

九、记录原始数据

记录原始数据流是通过接收数据传输路径中的数据,这不是很多开发环境中预期的操作,因为这可能会导致敏感数据泄露或者影响安全相关配置,例如密码和flume日志文件。在编码时,Flume不会记录这些信息。从另一个方面来看,如果数据传输路径故障,Flume会尝试为调试解决故障提供线索。
调试数据传输路径故障的另一个方法是配置一个额外的内存通道来作为记录器 Sink,这个记录器会输出所有的事件数据和 Flume日志。然而在一些情况下,这个办法也有缺陷。
启动配置日志记录和设置java系统属性-Dorg.apache.flume.log.rawdata=true
这些要么再命令行上传递要么再flume-env.sh中的JAVA_OPTS变量中设置它。
要进行数据的记录或者java系统属性的设置-Dorg.apache.flume.log.rawdata=true通过以上描述的两种方式完成。对于大多数的组件,log4j的日志级别必须由DEBUG或者TRACE来配置以使得特定事件的日志记录出现在Flume日志中。
以下是一个既配置日志有记录然是数据的例子,同时通过DEBUG来设置log4j的日志级别,最后由控制台输出结果 :
$ bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

十、Zookeeper的基础配置

Flume支持通过Zookeeper代理配置,这是一个新开发的特色。配置文件需要由Zookeeper来上传,在一个可配置的前缀下。配置文件存储在Zookeeper节点数据中。下面就是Zookeeper节点数怎么样来代理a1和a2配置文件。
- /flume
|- /a1 [Agent config file]
|- /a2 [Agent config file]
一旦配置文件上传了,就开始代理以下的选项
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
这里写图片描述

十一.安装第三方插件

Flume有一个完整的plugin-based 架构。当Flume用sources,channels,sinks,serializers之类的外部资源传输数据时,大量存在实现flume分离式传输。
虽然它总是包含定制的flume组件,通过在jars中增加FLUME_CLASSPATH 变量扩充flume-env.sh文件。Fulme现在支持一种名为plugins.d的特殊目录,该目录可以自动打包在一个特定的插件以一种特定的格式。这可以更简单地管理插件包的问题,并且更简单地调试和排除故障,特别是库依赖输出。

十二、多智能体流设置

这里写图片描述
为了是流数据跨越多重代理和目标地,过去的代理机的sink和当前资源的来源需要向avro类型的sink只想主机名(或IP地址)还有端口来源。

十三、多重复杂流

Flume支持多路传输数据流至一个或者多个目标地。这是通过定义流多路复用器实现的,该多路复用器可以复制或有选择地将事件数据传输到一个或多个通道。
这里写图片描述
上面的示例显示了来自代理“foo”的源,将流展开到三个不同的通道。这个扇形式分流可以复制或多路复用。在复制流的情况下,每个事件都被发送到所有三个通道。多路复用的情况下,事件交付给一个子集事件时可用频道的属性匹配一个预配置的值。例如,如果一个事件属性称为“txnType”设置为“客户”,那么它应该去channel1 channel3,如果是“供应商”然后它应该去channel2,否则channel3。映射可以在代理的配置设置文件。

十四、代理配置

正如前面提到的部分,Flume代理配置是读取一个文件,就像java属性文件格式层次属性设置。

十五丶整合

日志收集中的一个非常常见的场景是大量的日志生成客户机将数据发送给连接到存储子系统的几个客户代理。例如,从几百个web服务器上收集的日志发送到12个代理,这些代理将写入HDFS集群。
这里写图片描述
这可以在Flume配置一定数量的第一层代理avro sink时实现,均指向的avro来源的单一代理(同样可以在这种情况下使用thrift源/汇/客户)。第二个层代理的这个源将接收到的事件合并到一个通道中,该通道被一个接收器所消耗到它的最终目的地。

十六、定义流

要在单个代理中定义流,需要通过通道链接源接收。您需要列出给定代理的源、接收器和通道,然后将源和接收器指向一个通道。源实例可以指定多个通道,但是接收器实例只能指定一个通道。格式如下:
这里写图片描述
例如,一个名为agent_foo的代理正在从外部avro客户端读取数据,并通过内存通道将其发送到HDFS。博客的配置文件。配置可能看起来类似于:
这里写图片描述
这将使事件通过内存通道mem-channel-1从avro- appsrv源流到hdfs-Cluster1-sink。当代理从weblog开始时。配置的配置文件,它将实例化流。

十七、 配置单个组件

定义流之后,需要设置每个源、接收器和通道的属性。这是按照相同的层次命名空间方式进行的,您在其中为每个组件特定的属性设置组件类型和其他值:
这里写图片描述
需要为Flume的每个组件设置属性“type”,以了解它需要是什么样的资源。每种源、接收器和信道类型都有自己的一组属性,这些属性是它正常工作所必需的。所有这些都需要根据需要设定。在前面的例子中,我们有一个从avro - AppSrv - source到HDFS - cluster 1 - sink的流,通过内存通道mem - channel - 1。下面是一个示例,显示了这些组件的配置:
这里写图片描述

十八 、在代理中添加多个流

一个单独的Flume代理可以包含几个独立的流。您可以在配置中列出多个源、接收器和通道。这些组件可以连接成多个流:
这里写图片描述
然后,您可以将源和接收器链接到相应的通道(用于源)(用于接收器),以设置两个不同的流。例如,如果需要在代理中设置两个流,一个从外部的avro客户端到外部的HDFS,另一个从尾部的输出到avro sink,那么这里有一个配置:
这里写图片描述

十九、配置多重代理流

设置一个多层的流,你就需要一个avro /thrift第一跳指着接收器sink avro /thrift的下一个hop。这将导致第一个Flume代理将事件转发给下一个Flume代理。例如,如果您正在使用avro客户端定期向本地Flume代理发送文件(每个事件一个文件),那么这个本地代理可以将文件转发给另一个挂载用于存储的代理。
Weblog 代理配置:
这里写图片描述
HDFS 代理配置:
这里写图片描述
在这里,我们将来自weblog代理的avro-forward-sink与hdfs代理的avro-collection-source连接起来。这将导致来自外部appserver源的事件最终存储在HDFS中。

二十、扇出流

正如前面讨论的,Flume支持将流从一个源分散到多个通道。扇出有两种模式,复制和多路复用。在复制流中,事件被发送到所有配置的通道。在多路复用的情况下,事件只被发送到限定通道的一个子集。要扇出流,需要指定源的通道列表和扇出源的策略。这是通过添加可以复制或多路复用的通道“选择器”来实现的。然后进一步指定选择规则,如果它是一个多路复用器。如果没有指定选择器,那么默认情况下它是复制的:
这里写图片描述
多路复用选择具有进一步的属性集来对流进行分叉。这需要为通道指定一个事件属性的映射。选择器检查事件头中的每个配置属性。如果它匹配指定的值,那么该事件将被发送到映射到该值的所有通道。如果没有匹配,则将事件发送到配置为默认的通道集:
这里写图片描述
映射允许为每个值重叠通道。
下面的示例有一个多路复用的流。名为agent_foo的代理具有一个avro源和两个连接到两个接收器的通道:
这里写图片描述
选择器检查一个名为“State”的标题。如果值是“CA”,则发送到mem-channel-1,如果是“AZ”,则发送到file-channel-2,如果是“NY”,则两者都发送。如果“State”标头没有设置或与这三个标头中的任何一个都不匹配,那么它将转到mem-channel-1,该标头被指定为“default”。
选择器还支持可选通道。要为头指定可选通道,配置参数“optional”的用法如下:
这里写图片描述
选择器将首先尝试写入所需的通道,如果这些通道中有一个没有使用事件,则事务将失败。在所有通道上重新尝试事务。一旦所有必需的通道都使用了事件,那么选择器将尝试写入可选通道。任何可选通道使用事件的失败都会被忽略,不会被重试。
如果可选通道和特定标头所需的通道之间有重叠,则认为通道是必需的,通道中的失败将导致重新尝试所需的整个通道集。例如,在上面的示例中,对于头“CA”mem-channel-1,尽管它被标记为required和optional,但是对于头“CA”来说,它被认为是一个必需的通道,而写入该通道的失败将导致在为选择器配置的所有通道上重试该事件。
注意,如果消息头没有任何必需的通道,那么事件将被写入默认通道,并将尝试将其写入到该头的可选通道。如果没有指定必要的通道,指定可选通道仍然会使事件被写入默认通道。如果没有指定通道为默认通道,并且不需要通道,则选择器将尝试将事件写入可选通道。在这种情况下,任何失败都会被忽略。

二十一 、Flume组件

下图展示了Flume各组件功能及扩展的展示
这里写图片描述

21.1 Source

NetCat Source:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入;

type:source的类型,必须是netcat。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

Avro Source:监听一个avro服务端口,采集Avro数据序列化后的数据;

type:avrosource的类型,必须是avro。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

Exec Source:于Unix的command在标准输出上采集数据;

type:source的类型:必须是exec。

command:要执行命令。

现在选择其中的Avro Source进行说明:

监听Avro端口并从外部Avro客户端流接收事件。当与另一个(前一个hop)Flume代理上的内置Avro接收器配对时,它可以创建分层集合拓扑。所需的属性是blod
这里写图片描述
a1代理的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

ipFilterRules的例子:

ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

请注意,匹配的第一个规则将应用于本地主机上的客户端所显示的示例。
这将允许本地主机上的客户端拒绝来自任何其他ip的客户端:名称:localhost,deny:ip:“这将拒绝本地主机上的客户端允许来自任何其他ip的客户机”拒绝:名称:localhost, Allow:ip:“

21.2 sink

HDFS Sink:将数据传输到hdfs集群中。

type:sink的类型 必须是hdfs。

hdfs.path:hdfs的上传路径。

hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData

hdfs.rollInterval:间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。

hdfs.rollSize:文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。

hdfs.rollCount:event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。

hdfs.batchSize:每次往hdfs里提交多少个event,默认为100

hdfs.fileType:hdfs文件的格式主要包括:SequenceFile, DataStream ,CompressedStream,如果使用了CompressedStream就要设置压缩方式。

hdfs.codeC:压缩方式:gzip, bzip2, lzo, lzop, snappy

注:%{host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。

Logger Sink将数据作为日志处理(根据flume中的设置的日志方式来显示)

要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console 。

type:sink的类型:必须是 logger。

maxBytesToLog:打印body的最长的字节数 默认为16

Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。

type:sink的类型:必须是 avro。

hostname:指定发送数据的主机名或者ip

port:指定发送数据的端口

下面以HDFS sink为例进行说明

这个接收器将事件写入Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据数据的运行时间、大小或事件的数量周期性地滚动文件(关闭当前文件并创建新文件)。它还根据事件发生的时间戳或机器等属性对数据进行分段或分区。HDFS目录路径可能包含格式化转义序列,这些转义序列将被HDFS接收器替换为生成一个目录/文件名来存储事件。使用这个接收器需要安装hadoop,以便Flume可以使用hadoop jar与HDFS集群通信。注意,需要一个支持sync()调用的Hadoop版本。
以下是支持的转义序列:这里写图片描述
注意:转义字符串%[localhost]、%[IP]和%[FQDN]都依赖于Java获取主机名的能力,这在某些网络环境中可能会失败。
正在使用的文件将包含“.tmp”。一旦文件关闭,此扩展将被删除。这允许在目录中不包括部分完整的文件。所需的属性是bold
注意,对于所有与时间相关的转义序列,必须在事件的头部(除非是hdfs)中存在一个带有“时间戳”键的头部(useLocalTimeStamp被设置为true)。自动添加这个的一种方法是使用TimestampInterceptor。
这里写图片描述
a1代理的例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上述配置将把时间戳缩短到最后10分钟。例如,2012年6月12日上午11:54:34的事件将导致hdfs路径变成/flume/events/2012-06-12/1150/00。

21.3. channel

Memory Channel使用内存作为数据的存储。

Type channel的类型:必须为memory

capacity:channel中的最大event数目

transactionCapacity:channel中允许事务的最大event数目

File Channel 使用文件作为数据的存储

Type channel的类型:必须为 file

checkpointDir :检查点的数据存储目录

dataDirs :数据的存储目录

transactionCapacity:channel中允许事务的最大event数目

Spillable Memory Channel 使用内存作为channel超过了阀值就存在文件中

Type channel的类型:必须为SPILLABLEMEMORY

memoryCapacity:内存的容量event数

overflowCapacity:数据存到文件的event阀值数

checkpointDir:检查点的数据存储目录

dataDirs:数据的存储目录

下面以Memory Channel为例进行说明:

事件存储在内存队列中,具有可配置的最大大小。对于需要更高吞吐量并准备在代理失败时丢失阶段数据的流来说,这是理想的。所需的属性是bold
这里写图片描述
a1代理的例子:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

21.4. Flume Channel Selector

Multiplexing Channel Selector 根据header的key的值分配channel

selector.type 默认为replicating

selector.header:选择作为判断的key

selector.default:默认的channel配置

selector.mapping.*:匹配到的channel的配置

下面以Multiplexing Channel Selector为例进行说明

所需属性是bold
这里写图片描述
代理a1的例子,它的来源是r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

在上面的配置中,c3是一个可选的通道。对c3的写入失败被简单地忽略。由于c1和c2没有被标记为可选的,因此不向这些通道写入将导致事务失败。

21.5 Flume Sink Processor

Default sink processor
Failover Sink Processor
Load balancing sink processor
Custom sink processors

21.6 Event Serializer

BodyText Serializer
Flume Event” Avro Event Serializer
Avro Event Serializer

21.7 Interceptor

Timestamp Interceptor 时间戳拦截器 在header里加入key为timestamp,value为当前时间。

type:拦截器的类型,必须为timestamp

preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

Host Interceptor 主机名或者ip拦截器,在header里加入ip或者主机名

type:拦截器的类型,必须为host

preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

useIP:如果设置为true则使用ip地址,否则使用主机名,默认为true

hostHeader:使用的header的key名字,默认为host

Static Interceptor 静态拦截器,是在header里加入固定的key和value。

type:avrosource的类型,必须是static。

preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

key:静态拦截器添加的key的名字

value:静态拦截器添加的key对应的value值

下面我们以Timestamp Interceptor为例进行展示

这个拦截器将插入到事件头中,即它处理事件的时间。这个拦截器插入一个标头,标头的键时间戳(或标头属性指定的时间戳)的值是相关的时间戳。如果已经存在于配置中,此拦截器可以保存现有的时间戳。
这里写图片描述
a1代理的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

一些其他的例子

Example 1:
如果Flume事件体包含1:2:2:3 .4foobar5,则使用以下配置:

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

提取的事件将包含相同的主体,但以下标题将被添加1 =>1,2 =>2,3 =>3

Example2:
如果Flume事件主体包含2012-10-18 18:47:57,614,则使用以下配置:

a1.sources.r1.interceptors.i1.regex = ^(:\\n)(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type =   org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

提取的事件将包含相同的主体,但将添加以下头部:timestamp=>1350611220000

Component Summary

下面表格包含了所有组件名称信息
这里写图片描述
这里写图片描述

对于所有组件的介绍,可登录http://flume.apache.org/FlumeUserGuide.html查看所有信息

二十二、Tools(工具)

File Channel Integrity Tool

文件通道完整性工具在文件通道中验证单个事件的完整性,并清除损坏的事件。
工具的运行方式如下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir

其中datadir是要验证的数据目录的逗号分隔列表。
以下是可供选择的选项。
这里写图片描述

Event Validator Tool

事件验证器工具可用于以应用程序特定的方式验证文件通道事件。该工具对每个事件应用用户提供程序验证登录,并删除不确认逻辑的事件。
工具的运行方式如下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中datadir是要验证的数据目录的逗号分隔列表。
以下是可供选择的选项。
这里写图片描述
事件验证器实现必须实现EventValidator接口。建议不要从实现中抛出任何异常,因为它们被视为无效事件。额外的参数可以通过-D选项传递给EventValitor实现。
让我们看一个简单的基于大小的事件验证器示例,它将拒绝指定的大于最大大小的事件。

public static class MyEventValidator implements EventValidator {

private int value = 0;

private MyEventValidator(int val) {
value = val;

}

@Override
public boolean validateEvent(Event event) {
    return event.getBody() <= value;

}

public static class Builder implements EventValidator.Builder {

     private int sizeva lidator = 0;

@Override
public EventValidator build() {
  return new DummyEventVerifier(sizeva lidator);
}

@Override
public void configure(Context context) {
  binaryValidator = context.getInteger("maxSize");
}

}
}
总体而言Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,具备很强的健壮性(能有快速有效地解决故障),以上内容是本人对Flume官网内容的内化理解,如有不当,欢迎留言指正。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume配置及问题处理 下一篇flume中:memory channel,file c..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目