设为首页 加入收藏

TOP

flume学习之二 flume安装和使用方法
2018-11-28 18:00:34 】 浏览:26
Tags:flume 学习 之二 安装 使用方法
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Love_JavaProgram/article/details/51173196

1、 先来个简单的:单节点 Flume 配置

telnet:example.conf

./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=INFO,console

PS:-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。。。

-c/--conf 后跟配置目录,-f/--conf-file 后跟具体的配置文件,-n/--name 指定agent的名称

开一个 shell 终端窗口,telnet 上配置中侦听的端口,就可以发消息看到效果了:

[root@10.10.73.58]$ telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

hello word

OK

Flume 终端窗口此时会打印出如下信息,就表示成功了:

2016-02-29 20:12:00,719 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting

2016-02-29 20:12:00,735 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

2016-02-29 20:12:22,744 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 64 0D hello word. }

2、单节点 Flume直接写入 HDFS

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 100000

agent1.channels.ch1.transactionCapacity = 100000

agent1.channels.ch1.keep-alive = 30

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

#agent1.sources.avro-source1.channels = ch1

#agent1.sources.avro-source1.type = avro

#agent1.sources.avro-source1.bind = 127.0.0.1

#agent1.sources.avro-source1.port = 44444

#agent1.sources.avro-source1.threads = 5

#define source monitor a file

agent1.sources.avro-source1.type = exec

agent1.sources.avro-source1.shell = /bin/bash -c

agent1.sources.avro-source1.command = tail -n +0 -F /home/storm/tmp/id.txt

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.threads = 5

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = hdfs

agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.1.111:8020/flumeTest

agent1.sinks.log-sink1.hdfs.writeFormat = Text

agent1.sinks.log-sink1.hdfs.fileType = DataStream

agent1.sinks.log-sink1.hdfs.rollInterval = 0

agent1.sinks.log-sink1.hdfs.rollSize = 1000000

agent1.sinks.log-sink1.hdfs.rollCount = 0

agent1.sinks.log-sink1.hdfs.batchSize = 1000

agent1.sinks.log-sink1.hdfs.txnEventMax = 1000

agent1.sinks.log-sink1.hdfs.callTimeout = 60000

agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = avro-source1

agent1.sinks = log-sink1

启动如下命令,就可以在 hdfs 上看到效果了。

../bin/flume-ng agent --conf ../conf/ -f flume_directHDFS.conf -n agent1 -Dflume.root.logger=INFO,console

PS:实际环境中有这样的需求,通过在多个agent端tail日志,发送给collector,collector再把数据收集,统一发送给HDFS存储起来,当HDFS文件大小超过一定的大小或者超过在规定的时间间隔会生成一个文件。
Flume 实现了两个Trigger,分别为SizeTriger(在调用HDFS输出流写的同时,count该流已经写入的大小总和,若超过一定大小,则创建新的文件和输出流,写入操作指向新的输出流,同时close以前的输出流)和TimeTriger(开启定时器,当到达该点时,自动创建新的文件和输出流,新的写入重定向到该流中,同时close以前的输出流)。

3、来一个常见架构:多 agent 汇聚写入 HDFS

4、在各个webserver日志机上配置 Flume Client

# clientMainAgent

clientMainAgent.channels = c1

clientMainAgent.sources = s1

clientMainAgent.sinks = k1 k2

# clientMainAgent sinks group

clientMainAgent.sinkgroups = g1

# clientMainAgent Spooling Directory Source

clientMainAgent.sources.s1.type = spooldir

clientMainAgent.sources.s1.spoolDir =/dsap/rawdata/

clientMainAgent.sources.s1.fileHeader = true

clientMainAgent.sources.s1.deletePolicy =immediate

clientMainAgent.sources.s1.batchSize =1000

clientMainAgent.sources.s1.channels =c1

clientMainAgent.sources.s1.deserializer.maxLineLength =1048576

# clientMainAgent FileChannel

clientMainAgent.channels.c1.type = file

clientMainAgent.channels.c1.checkpointDir = /var/flume/fchannel/spool/checkpoint

clientMainAgent.channels.c1.dataDirs = /var/flume/fchannel/spool/data

clientMainAgent.channels.c1.capacity = 200000000

clientMainAgent.channels.c1.keep-alive = 30

clientMainAgent.channels.c1.write-timeout = 30

clientMainAgent.channels.c1.checkpoint-timeout=600

# clientMainAgent Sinks

# k1 sink

clientMainAgent.sinks.k1.channel = c1

clientMainAgent.sinks.k1.type = avro

# connect to CollectorMainAgent

clientMainAgent.sinks.k1.hostname = flume115

clientMainAgent.sinks.k1.port = 41415

# k2 sink

clientMainAgent.sinks.k2.channel = c1

clientMainAgent.sinks.k2.type = avro

# connect to CollectorBackupAgent

clientMainAgent.sinks.k2.hostname = flume116

clientMainAgent.sinks.k2.port = 41415

# clientMainAgent sinks group

clientMainAgent.sinkgroups.g1.sinks = k1 k2

# load_balance type

clientMainAgent.sinkgroups.g1.processor.type = load_balance

clientMainAgent.sinkgroups.g1.processor.backoff = true

clientMainAgent.sinkgroups.g1.processor.selector = random

../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n clientMainAgent -Dflume.root.logger=DEBUG,console

5、在汇聚节点配置 Flume server

# collectorMainAgent

collectorMainAgent.channels = c2

collectorMainAgent.sources = s2

collectorMainAgent.sinks =k1 k2

# collectorMainAgent AvroSource

#

collectorMainAgent.sources.s2.type = avro

collectorMainAgent.sources.s2.bind = flume115

collectorMainAgent.sources.s2.port = 41415

collectorMainAgent.sources.s2.channels = c2

# collectorMainAgent FileChannel

#

collectorMainAgent.channels.c2.type = file

collectorMainAgent.channels.c2.checkpointDir =/opt/var/flume/fchannel/spool/checkpoint

collectorMainAgent.channels.c2.dataDirs = /opt/var/flume/fchannel/spool/data,/work/flume/fchannel/spool/data

collectorMainAgent.channels.c2.capacity = 200000000

collectorMainAgent.channels.c2.transactionCapacity=6000

collectorMainAgent.channels.c2.checkpointInterval=60000

# collectorMainAgent hdfsSink

collectorMainAgent.sinks.k2.type = hdfs

collectorMainAgent.sinks.k2.channel = c2

collectorMainAgent.sinks.k2.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}

collectorMainAgent.sinks.k2.hdfs.filePrefix =k2_%{file}

collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_

collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp

collectorMainAgent.sinks.k2.hdfs.rollSize = 0

collectorMainAgent.sinks.k2.hdfs.rollCount = 0

collectorMainAgent.sinks.k2.hdfs.rollInterval = 240

collectorMainAgent.sinks.k2.hdfs.writeFormat = Text

collectorMainAgent.sinks.k2.hdfs.fileType = DataStream

collectorMainAgent.sinks.k2.hdfs.batchSize = 6000

collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000

collectorMainAgent.sinks.k1.type = hdfs

collectorMainAgent.sinks.k1.channel = c2

collectorMainAgent.sinks.k1.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}

collectorMainAgent.sinks.k1.hdfs.filePrefix =k1_%{file}

collectorMainAgent.sinks.k1.hdfs.inUsePrefix =_

collectorMainAgent.sinks.k1.hdfs.inUseSuffix =.tmp

collectorMainAgent.sinks.k1.hdfs.rollSize = 0

collectorMainAgent.sinks.k1.hdfs.rollCount = 0

collectorMainAgent.sinks.k1.hdfs.rollInterval = 240

collectorMainAgent.sinks.k1.hdfs.writeFormat = Text

collectorMainAgent.sinks.k1.hdfs.fileType = DataStream

collectorMainAgent.sinks.k1.hdfs.batchSize = 6000

collectorMainAgent.sinks.k1.hdfs.callTimeout = 60000

../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n collectorMainAgent -Dflume.root.logger=DEBUG,console

上面采用的就是类似 cs 架构,各个 flume agent 节点先将各台机器的日志汇总到Consolidation 节点,然后再由这些节点统一写入 HDFS,并且采用了负载均衡的方式,你还可以配置高可用的模式等等。

6、使用log4j发送log到flume

1)flume配置:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

#a1.sources.r1.type = netcat

a1.sources.r1.bind=10.10.73.58

a1.sources.r1.port=44446

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2)启动

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,LOGFILE

3)log4j配置:

log4j.properties,也可以使用使用log4j.xml

### set log levels ###

log4j.rootLogger=INFO ,Console, file, flume

log4j.logger.per.flume=INFO

#Console

log4j.appender.Console=org.apache.log4j.ConsoleAppender

log4j.appender.Console.Target=System.out

log4j.appender.Console.layout=org.apache.log4j.PatternLayout

log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n

#log4j.appender.Console.layout.ConversionPattern=%m%n

log4j.logger.com.test=Console

#log4j.logger.com.gongpb.framework.exception=Console

### flume ###

log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.flume.layout=org.apache.log4j.PatternLayout

log4j.appender.flume.Hostname=10.10.73.58

log4j.appender.flume.Port=44446

### file ###

log4j.appender.file=org.apache.log4j.DailyRollingFileAppender

log4j.appender.file.Threshold=INFO

log4j.appender.file.File=/opt/logs/test.log

log4j.appender.file.Append=true

log4j.appender.file.DatePattern='.'yyyy-MM-dd

log4j.appender.file.layout=org.apache.log4j.PatternLayout

log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

4)发送程序:

package com.test.flume;

import java.util.Date;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class WriteLog {

protected static final Logger logger = LoggerFactory.getLogger(WriteLog.class);

public static void main(String[] args) throws Exception {

// String configFile = WriteLog.class.getResource("/").getPath() + "/log4j.properties";

// PropertyConfigurator.configure(configFile);

while (true) {

logger.info(String.valueOf(new Date().getTime()));

Thread.sleep(1000);

}

}

}

7、可能遇到的问题

1)OOM问题:

flume 报错:

java.lang.OutOfMemoryError: GC overhead limit exceeded

或者:

java.lang.OutOfMemoryError: Java heap space

Exception inthread"SinkRunner-PollingRunner-DefaultSinkProcessor"java.lang.OutOfMemoryError: Java heap space

Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易OOM,因此需要你在flume-env.sh中添加JVM 启动参数:

JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

然后在启动 agent 的时候一定要带上-c conf选项,否则flume-env.sh里配置的环境变量不会被加载生效。

2)JDK 版本不兼容问题:

2014-07-07 14:44:17,902 (agent-shutdown-hook) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:504)] Exception while closing hdfs://192.168.1.111:8020/flumeTest/FlumeData. Exception follows.

java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses.

at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180)

at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto.getSerializedSize(ClientNamenodeProtocolProtos.java:30108)

at com.google.protobuf.AbstractMessageLite.toByteString(AbstractMessageLite.java:49)

at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.constructRpcRequest(ProtobufRpcEngine.java:149)

at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:193)

把你的 jdk7 换成 jdk6 试试。

3)小文件写入 HDFS 延时的问题

其实上面 3.2 中已有说明,flume 的 sink 已经实现了几种最主要的持久化触发器:

比如按大小、按间隔时间、按消息条数等等,针对你的文件过小迟迟没法写入 HDFS 持久化的问题,

那是因为你此时还没有满足持久化的条件,比如你的行数还没有达到配置的阈值或者大小还没达到等等,

可以针对上面 3.2 小节的配置微调下,例如:

agent1.sinks.log-sink1.hdfs.rollInterval = 20

当迟迟没有新日志生成的时候,如果你想很快的 flush,那么让它每隔 20s flush 持久化一下,agent 会根据多个条件,优先执行满足条件的触发器。

下面贴一些常见的持久化触发器:

# Number of seconds to wait before rolling current file (in 600 seconds)

agent.sinks.sink.hdfs.rollInterval=600

# File size to trigger roll, in bytes (256Mb)

agent.sinks.sink.hdfs.rollSize = 268435456

# never roll based on number of events

agent.sinks.sink.hdfs.rollCount = 0

# Timeout after which inactive files get closed (in seconds)

agent.sinks.sink.hdfs.idleTimeout = 3600

agent.sinks.HDFS.hdfs.batchSize = 1000

4)数据重复写入、丢失问题

FlumeHDFSsink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,然后重试。但由于HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题导致写入失败,Transaction回滚,然后重写这10000条记录成功,就会导致第一次写入的5000行重复。这些问题是HDFS 文件系统设计上的特性缺陷,并不能通过简单的Bugfix来解决。我们只能关闭批量写入,单条事务保证,或者启用监控策略,两端对数。

Memoryexec的方式可能会有数据丢失,fileend to end的可靠性保证的,但是性能较前两者要差。

end to endstore on failure方式 ACK确认时间设置过短(特别是高峰时间)也有可能引发数据的重复写入。

5)tail 断点续传的问题:

可以在 tail 传的时候记录行号,下次再传的时候,取上次记录的位置开始传输,类似:

agent1.sources.avro-source1.command = /usr/local/bin/tail -n +$(tail -n1 /home/storm/tmp/n) --max-unchanged-stats=600 -F /home/storm/tmp/id.txt | awk 'ARNGIND==1{i=$0;next}{i++; if($0~/文件已截断/)i=0; print i >> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n -

需要注意如下几点:

1)文件被rotation 的时候,需要同步更新你的断点记录“指针”,

2)需要按文件名来追踪文件,

3flume挂掉后需要累加断点续传“指针”

4flume挂掉后,如果恰好文件被 rotation,那么会有丢数据的风险,

只能监控尽快拉起或者加逻辑判断文件大小重置指针。

(5)tail 注意你的版本,请更新coreutils包到最新。

6)在 Flume 中如何修改、丢弃、按预定义规则分类存储数据?

这里你需要利用 Flume 提供的拦截器(Interceptor)机制来满足上述的需求了。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume从kafka导数据到hdfs 下一篇Flume使用大全之kafka source-kaf..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目