设为首页 加入收藏

TOP

flume 一级配置和多级配置详解
2018-11-28 17:47:09 】 浏览:35
Tags:flume 一级 配置 多级 详解
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/nimoyaoww/article/details/80614215

个人不太喜欢过多的介绍理论,理论在任何一个博客中都能查到,如果下个了解flume 的工作原理,请到别处寻找,如果子昂要找到方案的解决办法,恭喜你找对了。同时本人不喜欢专门排版,太浪费时间,还不如利用时间,多研究一下干货。望谅解。

在实际应用中,主要多级flume搭建比较常用,在此仅仅以多级 flume 为例,进行配置和研究。

1 Flume的安装

1.1 安装JDK

具体方法略。网上很多,另觅他处

1.2 下载安装包并解压

$ wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.1.tar.gz

$ tar -xvf flume-ng-1.6.0-cdh5.7.1.tar.gz

$ rm flume-ng-1.6.0-cdh5.7.1.tar.gz

$ mv apache-flume-1.6.0-cdh5.7.1-binflume-1.6.0-cdh5.7.1

1.3 配置环境变量

$ cd /home/hadoop

配置环境变量

$ vim .bash_profile

export FLUME_HOME=/home/hadoop/app/cdh/flume-1.6.0-cdh5.7.1

export PATH=$PATH:$FLUME_HOME/bin

使环境变量生效

$ source .bash_profile

1.4 配置flume-env.sh文件

$ cd app/cdh/flume-1.6.0-cdh5.7.1/conf/

$ cp flume-env.sh.template flume-env.sh

$ vim flume-env.sh

export JAVA_HOME=/home/hadoop/app/jdk1.7.0_79

export HADOOP_HOME=/home/hadoop/app/cdh/hadoop-2.6.0-cdh5.7.1

1.5 版本验证

$ flume-ng version

[root@master conf]# flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f

恭喜你,安装成功,接下来主要配置


2 多级 flume 配置的信息


2.1 一级 flume ,从 网络端口 或者 文件中获取获取日志

在/apache-flume-1.6.0-bin/conf 目录下创建,netcat-logger.conf,文件中的内容如下

[root@slave1 conf]# more netcat-logger.conf

n/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
########
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 从文件中获取日志
# get log from file
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F /opt/123.txt
#a1.sources.r1.channels = c1

#get log from network port 从网路端口中获取日志
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2 一级 flume 配置多个 source ,共同利用一个 channel 和一个 sink 将数据写入 hdfs 中

########
# Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# get log from file
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F /opt/123.txt
#a1.sources.r1.channels = c1

#get log from network port
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r2.type = netcat
a1.sources.r2.bind = localhost
a1.sources.r2.port = 44445

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master

a1.sinks.k1.port = 4141


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

2.3 二级 flume 配置。读取到文件的内容之后,将文件的内容存储到 hdfs 或者控制台


#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置source组件
agent1.sources.source1.type = avro
agent1.sources.source1.channels = channel1
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 4141
#agent1.sources.source1.spoolDir = /home/hadoop/logs/
#agent1.sources.source1.fileHeader = false

# 配置sink组件
# 将日志打印到控制台
#agent1.sinks.sink1.type = logger


# 将文件存储到 hdfs 中国
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
# 文件的路径 文件下沉目标
#agent1.sinks.sink1.hdfs.path =hdfs://master:8022/flume/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.path =hdfs://master:8022/flume/dt=%y-%m-%d
# 文件名的前缀
#agent1.sinks.sink1.hdfs.filePrefix = access_log
#agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
# 5 个事件就往里面写入
agent1.sinks.sink1.hdfs.batchSize= 100


#下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#文件滚动大的大小限制(bytes)
agent1.sinks.sink1.hdfs.rollSize = 1024
#写入多少个 event 数据后滚动文件(事件个数)
agent1.sinks.sink1.hdfs.rollCount = 100
#文件回滚之前等待的时间
agent1.sinks.sink1.hdfs.rollInterval = 60

# 10 分钟就创建文件
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
#用本地时间格式化目录
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1


启动 flume 命令:

flume-ng agent --conf conf --conf-file avro-file_roll.conf --name agent1


3 常见的 source 类型

3.1 : avro source

avro可以监听和收集指定端口的日志,使用avro的source需要说明被监听的主机ip和端口号,下面给出一个具体的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

3.2: exec source

exec可以通过指定的操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取,下面给出一个具体的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

3.3: spooling-directory source

spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取该文件夹中的所有文件,需要注意的是该文件夹中的文件在读取过程中不能修改,同时文件名也不能修改。下面给出一个具体的例子:

agent-1.channels = ch-1

agent-1.sources = src-1


agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true

3.4: syslog source

syslog可以通过syslog协议读取系统日志,分为tcp和udp两种,使用时需指定ip和端口,下面给出一个udp的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

4 常见的 channel 类型


Flume的channel种类并不多,最常用的是memory channel,下面给出例子:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

5 常见的 sink 类型



5.1 logger sink

logger顾名思义,就是将收集到的日志写到flume的log中,是个十分简单但非常实用的sink
简单实用,但是在实际项目中没有太大用处。
只需将类型指定为 logger 就可以了
agent1.sinks.sink1.type = logger

5.2: avro sink

avro可以将接受到的日志发送到指定端口,供级联agent的下一跳收集和接受日志,使用时需要指定目的ip和端口:例子如下:
上面我给出的例子就是用的是这种方法

a1.channels = c1
a1.sinks = k1

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10(目标主机的IP 和 端口号)
a1.sinks.k1.port = 4545

5.3: file roll sink

file_roll可以将一定时间内收集到的日志写到一个指定的文件中,具体过程为用户指定一个文件夹和一个周期,然后启动agent,这时该文件夹会产生一个文件将该周期内收集到的日志全部写进该文件内,直到下一个周期再次产生一个新文件继续写入,以此类推,周而复始。下面给出一个具体的例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

5.4:hdfs sink

hdfs与file roll有些类似,都是将收集到的日志写入到新创建的文件中保存起来,但区别是file roll的文件存储路径为系统的本地路径,而hdfs的存储路径为分布式的文件系统hdfs的路径,同时hdfs创建新文件的周期可以是时间,也可以是文件的大小,还可以是采集日志的条数。具体实例如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute


这里主要对 sink 配置的参数进行详解

有哪个参数不明白,都可以在此查询到

agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d

hdfs 文件名前缀
agent_lxw1234.sinks.sink1.hdfs.filePrefix = log_%Y%m%d_%H

写入的 hdfs 文件名后缀
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .lzo


临时文件名前缀
inUsePrefix


默认值:.tmp 临时文件的文件名后缀
inUseSuffix

agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.fileType = CompressedStream
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0

默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0


默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600


文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
agent_lxw1234.sinks.sink1.hdfs.codeC = lzop


默认值:100
每个批次刷新到HDFS上的events数量;
agent_lxw1234.sinks.sink1.hdfs.batchSize = 100

agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10



fileType

默认值:SequenceFile
文件格式,包括:SequenceFile, DataStream,CompressedStream
当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;



maxOpenFiles
默认值:5000
最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;




默认值:0
当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,
则将该临时文件关闭并重命名成目标文件;
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0


writeFormat
写sequence文件的格式。包含:Text, Writable(默认)


callTimeout
默认值:10000
执行HDFS操作的超时时间(单位:毫秒);


threadsPoolSize
默认值:10
hdfs sink启动的操作HDFS的线程数。


rollTimerPoolSize
默认值:1
hdfs sink启动的根据时间滚动文件的线程数




kerberosPrincipal
HDFS安全认证kerberos配置;


kerberosKeytab
HDFS安全认证kerberos配置;


proxyUser
代理用户


round
默认值:false
是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;


roundValue
默认值:1
时间上进行“舍弃”的值;


roundUnit
默认值:seconds
时间上进行”舍弃”的单位,包含:second,minute,hour
示例:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:
/flume/events/20151016/17:30/00
因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。

timeZone
默认值:Local Time
时区。

useLocalTimeStamp
默认值:flase
是否使用当地时间。

closeTries
默认值:0
hdfs sink关闭文件的尝试次数;
如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。
设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。


retryInterval
默认值:180(秒)
hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.

serializer
默认值:TEXT
序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。


5.5:hbase sink

hbase是一种数据库,可以储存日志,使用时需要指定存储日志的表名和列族名,然后agent就可以将收集到的日志逐条插入到数据库中。例子如下:

a1.channels = c1
a1.sinks = k1

a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1










】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FLume:通过端口监控flume的运行.. 下一篇利用flume增量采集关系数据库的配..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目