flume启动命令详解
Usage: . /flume- ng < command> [ options] . . .
commands:
help display this help text #显示帮助信息
agent run a Flume agent #启动flume代理
avro- client run an avro Flume client #启动avro代理
version show Flume version info #显示版本信息
global options:
-- conf, - c < conf> use configs in < conf> directory #指定配置资源的目录,后面跟路径
-- classpath, - C < cp> append to the classpath #追加一个classpath
-- dryrun, - d do not actually start Flume, just print the command #不运行flume指令,只进行打印信息的操作
-- plugins- path < dirs> colon- separated list of plugins. d directories. See the
plugins. d section in the user guide for more details.
Default: $FLUME_HOME/ plugins. d #插件目录,默认$FLUME_HOME/ plugins. d
- Dproperty= value sets a Java system property value #设置一个JAVA的系统属性
- Xproperty= value sets a Java - X option #设置一个JAVA- X的选项
agent options: #启动agent的相关配置
-- name, - n < name> the name of this agent ( required) #配置agent的名称
-- conf- file, - f < file> specify a config file ( required if - z missing) #指定配置文件
-- zkConnString, - z < str> specify the ZooKeeper connection to use ( required if - f missing) #以zk为配置中心,指定zookeeper的连接
-- zkBasePath, - p < path> specify the base path in ZooKeeper for agent configs
-- no- reload- conf do not reload config file if changed
-- help, - h display help text
avro- client options: #启动avro Client的相关配置
-- rpcProps, - P < file> RPC client properties file with server connection params
-- host, - H < host> hostname to which events will be sent #绑定sourceIP地址
-- port, - p < port> port of the avro source #绑定source端口号
-- dirname < dir> directory to stream to avro source #指定source监听的目录
-- filename, - F < file> text file to stream to avro source ( default : std input) #指定source监听的文件
-- headerFile, - R < file> File containing event headers as key/ value pairs on each new line #
-- help, - h display help text
Either -- rpcProps or both -- host and -- port must be specified.
Note that if < conf> directory is specified, then it is always included first
in the classpath.
Agent的启动:
./flume-ng
agent --conf …/conf --conf-file …/conf/hey01.conf --name a1
-Dflume.root.logger=INFO,console
./flume-ng agent -》指定启动的服务端类型
–conf …/conf -》指定配置资源的文件夹
–conf-file …/conf/hey01.conf -》指定配置文件
–name a1 -》定义agent的名字
-Dflume.root.logger=INFO,console -》定义sink为logger时,的输出类型
Avro-Client的启动(当source=avro时):
./flume-ng avro-client --conf …/conf --host 0.0.0.0 --port 44444 --filename /home/nums.txt
./flume-ng avro-client -》启动avro客户端
–conf …/conf -》指定配置资源的文件夹
–host 0.0.0.0 -》指定source IP地址
–port 44444 -》指定source端口号
–filename /home/nums.txt -》指定用avro监听的文件
Flume组件说明
1 Event
一个具有有效荷载的字节数据流和可选的字符串属性集。一条日志在flume中会被转换成一个JSON格式的串来传递,这个JSON串就是一个FlumeEvent,具体的格式为{header:{头信息},body:日志内容},所以简单来说,一条日志在一个Flume就对应一个JSON串,即,一个FlumeEvent。
2 Agent
一个进程承载从外部源事件流到下一个目的地的过程。包含Source、Channel和 Sink,多个Agent之间还可以连接
形成复杂的日志流动的网络。
3 Source
用来收集数据源,接受日志并封装成Event,传输到Channel。
4 Sink
目的地传送槽,获取Agent里面的数据,即消费Channel中的数据,并传送到目的地。
4 Channel
传输通道,被动 接受Source传来的Event数据,暂时存储,相当于对采集到的数据进行简单的缓存,等待 Sink消费。一个channel仅能对一个sink,但是在使用了Processor的前提下,一个channel可以对一个sinkgroup,也就是可以面向多个processor进行操作。
只有在sink将channel中的数据成功发送出去之后,channel才会将临时的event删除,这种机制保证了数据传输的可靠性和安全性。
5 Selector
选择器,主要用在实现扇出过程中实现按照指定方式分发数据。选择器可以工作在 复制 多路复用(路由)模式 下,默认情况下,不配置Selector,则扇出采用复制机制。
6 Interceptor
拦截器可以拦截Event,允许或不允许Event通过,或在允许通过的时,改变Event内容,这种改变包括改变Event的体或头信息。
拦截器可以手动开发,只要实现org.apache.flume.interceptor.Interceptor接口,在其中编写拦截规则即可。
Flume也内置了很多拦截器,可以直接使用。可以同时配置多个拦截器组合成拦截器链,依次拦截Event。
7 Processor
处理器,是Flume用于实现失败恢复、负载均衡的组件,其中包含失败恢复和负载均衡两种模式。
Flume常用范例说明
1 Source
1.1 Avro 序列化信息
监听AVRO端口来接受来自外部AVRO客户端的事件流,是实现多级流动、扇出流、扇入流等效果的基础。另外也可以接受通过flume提供的Avro客户端发送的日志信息。
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
1.2 Exec 将命令的输出作为源
a1.sources.r1.type = exec
a1.sources.r1.command = ping www.baidu.com
1.3 HTTP
a1.sources.r1.type = http
a1.sources.r1.port = 44444
通过curl命令测试
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://hadoop01:44444
1.4 Spooling Directory
将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入信道,它会被重命名或可选的直接删除。
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/src/flume/data
2 Sink
2.1 logger
a1.sinks.k1.type = logger
2.2 File Roll 在本地文件系统中存储事件,每隔指定时长生成文件保持这段时间内收集到的日志信息
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/fresult
2.3 HDFS
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata
a1.sinks.k1.hdfs.fileType = DataStream
2.4 Avro
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 44444
3 Channel
3.1 Memory
事件将被存储在内存中的具有指定大小的队列中。速度快,但断电会丢失数据。非常适合那些需要高吞吐量但是可以容忍极端情况下会丢失数据的场景下。
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
3.2 File
事件将被存储在磁盘中的文件中,特点是速度慢,但断电不会丢失数据,非常适合那些需要高可靠性 可恢复,但性能要求不高的场景。
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/chk
a1.channels.c1.dataDirs = /home/centos/flume/data
4 Selector
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = gender
a1.sources.r1.selector.mapping.male = c1
a1.sources.r1.selector.mapping.female = c2
a1.sources.r1.selector.default = c1
5 Interceptor
5.1 Timestamp Interceptor
时间戳拦截器,拦截到Event后,允许通过,但在头信息中增加时间戳头信息。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.header = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true
5.2 Host Interceptor
主机名拦截器,拦截下Event后,允许通过,但在头信息中增加主机名或IP头信息。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.useIP = true
a1.sources.r1.interceptors.i1.hostHeader = host
5.3 Static Interceptor
静态拦截器,拦截下Event之后,允许通过,但要增加上指定的头和值。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i2
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = country
a1.sources.r1.interceptors.i2.value = China
5.4 UUID Interceptor
UUID拦截器,拦截下Event之后,允许通过,但要在头上增加上一个UUID唯一表示作为头。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i3
a1.sources.r1.interceptors.i3.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i3.headerName = id
a1.sources.r1.interceptors.i3.preserveExisting = true
a1.sources.r1.interceptors.i3.prefix = ""
5.5 Search and Replace Interceptor
搜索和替换拦截器,拦截下Event后,通过正则匹配日志中的体,将符合正则的部分替换为指定的内容。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = \\d
a1.sources.r1.interceptors.i1.replaceString = *
a1.sources.r1.interceptors.i1.charset = UTF-8
5.6 Regex Filtering Interceptor
正则过滤拦截器,拦截下Event之后,利用正则匹配日志的体,根据是否匹配决定是否保留或是否取出当前Event。
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^\\d.*$
a1.sources.r1.interceptors.i1.excludeEvents = true
6 Processor
6.1 失败恢复机制
失败恢复机制下,Processor将会维护一个sink们的优先表。sink们可以被配置一个优先级,数字越大优先级越高。事件将永远将只会发往优先级最高的Sink。只要有一个Sink存活,整个过程仍然可以进行。如果没有指定优先级,则优先级顺序取决于sink们的配置顺序,先配置的默认优先级高于后配置的。
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
6.2 负载均衡机制
Processor的负载均衡机制提供了在多个sink之间实现负载均衡的能力,它维护了一个活动sink的索引列表。
通过Processor动态切换channel在SinkGroup中对Sink的指向,实现数据的负载均衡方式分发。
支持轮询 或 随机方式 的负载均衡,默认值是轮询方式,可以通过配置指定。
负载均衡模式下,如果某个中心服务器宕机,则Processor会将该中心服务器Sink剔除SinkGroup组,并将之前发送失败的数据发给其他仍然存活的Sink,所以可以认为Processor的负载均衡机制自带失败恢复的能力。
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.maxTimeOut = 10000
– 注意,flume默认批处理,即使是轮询,隔得时间过长或过短,都不一定能实现ABABA模式
Flume事务机制
Flume会使用两个独立的事务分别从source到channel ,以及channel到sink 的事件传递。即一旦事务中所有事件全部传递到channel且提交成功,那么source就将该文件标记为完成,否则就进行回滚,同理,如果某种原因使事务从channel到sink的传递过程无法记录,那么事务将会回滚,所有事件都会保持在channel中,等待重新传递。
1 At-least-once提交方式
Flume在传送事务时,保证至少一次到达(at-least-once),也就是说可能重复出现。如果上次处理过程中,有些数据已被处理,但是事务还没有提交(在输出之后提交之前发生故障),则这些时间会被重试,出现重复。
除了at-least-once,还有at-most-once 和 exactly-once:
At most once —Messages may be lost but are never redelivered.
At least once —Messages are never lost but may be redelivered.
Exactly once —this is what people actually want, each message is delivered once and only once
一些传统企业会要求精确的一次到达,但是一次到达需要使用2PC两阶段提交协议,这样的协议开销非常大。所以如有需求,会在数据的其他处理环节对重复数据进行去重,通常是采用MR或Hive处理。
2 批量处理
为了提高效率,Flume尽可能的以事务为单位来处理事件,而不是逐一基于事件进行处理。批处理的设置尤其有利于提高file channle的效率,这样整个事务只需要写入一次本地磁盘,或者调用一次fsync,速度回快很多。