设为首页 加入收藏

TOP

flume的安装及使用及各种案例
2019-03-12 14:10:43 】 浏览:93
Tags:flume 安装 使用 及各种 案例

1.下载flume-1.6.0(apache-flume-1.6.0-bin.tar.gz)

http://flume.apache.org/download.html

2.上传到集群(本文上传到hadoop04节点)

3.解压安装包

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C apps/

并且修改文件夹名字

mvapache-flume-1.6.0-binflume-1.6.0

4.进入conf文件夹修改配置信息

cd/home/hadoop/apps/flume-1.6.0/conf

5.修改配置文件

[hadoop@hadoop04 conf]$cp flume-env.sh.template flume-env.sh

vimflume-env.sh

6.配置环境变量

vim .bashrc

增加下面的内容

export FLUME_HOME=/home/hadoop/apps/flume-1.6.0/bin

export PATH=$PATH:$FLUME_HOME/bin

source .bashrc

7.测试是否成功

[hadoop@hadoop04 ~]$flume-ng version

8.flume的使用

官方帮助文档点击打开链接

Source:读数据

Avro Source:

监听Avro的一个端口和接收事件从外部的Avro client获取流数据(监听指定的端口,并且在指定端口上接收数据),在linux里面有一个nc命令(可以向指定端口发送数据,就跟ping baidu.com一样,即通过nc向端口发送数据,Avro Source就可以在这个端口接收到数据)

Spooling Directory Source: (日志常用)

监控指定文件的数据,在磁盘上去写一个指定的spooling的目录,这个Source可以观测指定的目录的新文件并且把这个事件粘贴到新文件里面。(监控指定的文件夹或者目录,这个文件夹下面是没有子文件夹的,一旦这个目录被监控以后,往该目录下写文件,这个文件将被Spooling DirectorySource读取文件中的内容,读取完之后将文件名增加后缀.COMPLETED

Exec Source:(日志常用)

运行在一个给定的Unix或者Linux命令,当命令执行时候,Exec Source会去读取接收命令所接受到的数据(比如说cat,tail命令),其实就是使用一个linux命令读取数据信息,然后通过Exec Source就可以获取出来,其实也是写日志。(tail命令就是获取

获取最后多少行,获取最新的新增加的)

Exec Source 和Spool Source 比较

1、ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。

2、SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。

3、总结:如果应用无法实现以分钟切割日志文件的话,可以两种 收集方式结合使用。


Channel:消息队列,缓存

Memory Channel:

让内存作为我们中间的缓存,所有的事件数据都放到内存队列里去,可以去配置最大的内存大小,可以实现高速的吞吐,但是无法保证数据的完整性。

JDBC Channel:

Kafaka Channel:

File Channel:文件也可以作为Channel,保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

Spillable Memory Channel:可分割内存的Channel

MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。

Sink:将Channel中的数据写入到目的地,进行相应的存储文件系统,数据库,或者提交到远程服务器。

HDFS Sink:把事件写到HDFS上,支持创建新的文本和序列化文件和压缩格式,还支持回滚(最大的好处就是从HDFS中直接分区导入到Hive中)

Hive Sink:

Logger Sink:将日志事件在信息的级别去显示(可以显示到控制台)

Avro Sink:写到指定的端口去写数据

Thrift Sink:

flume部署种类:

单一代理流程:

多代理流程:

流合并:(多代理的一种)

多路复用:从一个地方收集信息最终写到不同的地方去,(多路复制是接收到的数据都是一样的)

注意:一个sink只能指定一个channel

下面是模板:


  1. #文件名后缀是.properties

  2. #agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义

  3. a1.sources = s1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. #为source指定他的channel

  7. a1.sources.s1.channels = c1

  8. #为sink指定他的channel

  9. a1.sinks.k1.channel = c1

流配置:

单一代理流配置:

Spooling Directory Source,Memory Channel,Logger Sink的案例:


  1. #监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台(Spooling Directory Source,Memory Channel,Logger Sink)

  2. #运行命令:flume-ng agent --conf conf --conf-file case_single.properties --name a1 -Dflume.hadoop.logger=INFO,console

  3. #运行命令解释:flume-ng agent --conf conf --conf-file HDFS上的配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  4. #agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义

  5. a1.sources = s1

  6. a1.sinks = k1

  7. a1.channels = c1

  8. #配置source

  9. a1.sources.s1.type = spooldir

  10. a1.sources.s1.spoolDir = /home/hadoop/flumetest

  11. #配置channel

  12. a1.channels.c1.type = memory

  13. #配置sink

  14. a1.sinks.k1.type = logger

  15. #为source指定他的channel

  16. a1.sources.s1.channels = c1

  17. #为sink指定他的channel

  18. a1.sinks.k1.channel = c1

把上面的代码复制到case_single.properties文本文件中,然后上传到hadoop04家目录下面,并且在hadoop用户家目录下新建一个flumetest的文件夹(mkdir flumetest)

使用命令:[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_single.properties --name a1 -Dflume.hadoop.logger=INFO,console

然后再hadoop04上复制一个新的连接窗口,使用 vim t 在里面写入hello flume 然后保存。执行下面的命令

[hadoop@hadoop04 ~]$ mv t flumetest/

此时hadoop04的flume窗口会自动出现下图所示样子:

到此测试成功!

NetCat TCP Source,Memory Channel,Logger Sink的案例


  1. #通过NetCat TCP Source读取指定端口的输入数据到控制台显示(NetCat TCP Source,Memory Channel,Logger Sink)

  2. #往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的

  3. #运行命令:flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console

  4. #运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  5. a1.sources = s1

  6. a1.sinks = k1

  7. a1.channels = c1

  8. #配置source

  9. a1.sources.s1.type = netcat

  10. a1.sources.s1.bind = 192.168.123.104

  11. a1.sources.s1.port = 55555

  12. #配置channel

  13. a1.channels.c1.type = memory

  14. #配置sink

  15. a1.sinks.k1.type = logger

  16. #为source指定他的channel

  17. a1.sources.s1.channels = c1

  18. #为sink指定他的channel

  19. a1.sinks.k1.channel = c1

复制上面的代码到case_tcp.properties文本文件中,然后从hadoo04节点上传到hadoop用户的根目录下。执行下面的命令

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console

然后再复制hadoop04节点连接会话,输入telnet hadoop04 55555

[hadoop@hadoop04 ~]$ sudo yum install telnet

安装好telnet后使用下面命令发送数据:

[hadoop@hadoop04 ~]$ telnet hadoop04 55555

然后就可以输入数据了

另一端就能接收到:

Avro Source,Memory Channel,Logger Sink的案例


  1. #通过Avro Source读取指定端口的输入数据到控制台显示(Avro Source,Memory Channel,Logger Sink)

  2. #往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的

  3. #运行命令:flume-ng agent --conf conf --conf-file case_avro.properties --name a1 -Dflume.hadoop.logger=INFO,console

  4. #运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  5. a1.sources = s1

  6. a1.sinks = k1

  7. a1.channels = c1

  8. #配置source

  9. a1.sources.s1.type = avro

  10. a1.sources.s1.bind = 192.168.123.104

  11. a1.sources.s1.port = 55555

  12. #配置channel

  13. a1.channels.c1.type = memory

  14. #配置sink

  15. a1.sinks.k1.type = logger

  16. #为source指定他的channel

  17. a1.sources.s1.channels = c1

  18. #为sink指定他的channel

  19. a1.sinks.k1.channel = c1

复制上面的代码到case_avro.properties文本文件中,然后从hadoo04节点上传到hadoop用户的根目录下。执行下面的命令

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_avro.properties --name a1 -Dflume.hadoop.logger=INFO,console

然后再复制hadoop04节点连接会话,输入echo "hello avro" | nc hadoop04 55555

[hadoop@hadoop04 ~]$ sudo yum install nc

输入nc命令查看是否安装成功

再次输入

[hadoop@hadoop04 ~]$ echo "hello avro" | nc hadoop04 55555

先创建一个vim t.log 然后写qwert world

[hadoop@hadoop04 ~]$ flume-ng avro-client -c ./conf -H hadoop04 -p 55555 -F t.log

Exec Source,Memory Channel,hdfs-Sink的案例

准备:

模拟服务器的业务系统不断的写日志,不断的往里面写入数据,最终通过flume写到HDFS上面

只要这个窗口在就不断的往里面写数据

注意这个窗口别关!!

复制一个新的窗口输入下面的命令:

[hadoop@hadoop04 tom]$ tail -F catalina.out

查看是不是正常状态,这个窗口可以结束。

正文:

首先启动整个集群,在HDFS下面建一个flumetest的文件夹

hadoop fs -mkdir /flumetest

然后新建一个case_hdfs.properties的文本文件复制下面的代码:


  1. #使用Exec Source通过Linux命令去取数据,然后不断的写入HDFS中(Exec Source,Memory Channel,hdfs-Sink)

  2. #运行命令:flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console

  3. #运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  4. a1.sources = s1

  5. a1.sinks = k1

  6. a1.channels = c1

  7. #配置source

  8. a1.sources.s1.type = exec

  9. #tail -F 和-f的区别是 -F追踪的是文件名字。-f是追踪id。因为.out文件满了就变成了.out.1了,再会去重建一个.out文件

  10. a1.sources.s1.command = tail -F /home/hadoop/tom/catalina.out

  11. #配置channel

  12. a1.channels.c1.type = memory

  13. #配置sink

  14. a1.sinks.k1.type = hdfs

  15. a1.sinks.k1.hdfs.path = /flumetest/%Y-%m-%d/%H%M

  16. #设置目录回滚(首先根据时间创建个文件夹,写了一分钟后我们就需要重新建文件夹了,所以我们要指定回滚为true)

  17. a1.sinks.k1.hdfs.round = true

  18. a1.sinks.k1.hdfs.roundValue = 1

  19. a1.sinks.k1.hdfs.roundUnit = minute

  20. a1.sinks.k1.hdfs.useLocalTimeStamp = true

  21. a1.sinks.k1.hdfs.filePrefix = taobao

  22. a1.sinks.k1.hdfs.fileSuffix = log

  23. #设置文件的回滚(下面三个条件只要有一个满足都会进行回滚)

  24. #每过多少秒回滚一次,所以一分钟内我们写6个文件(满足)

  25. a1.sinks.k1.hdfs.rollInterval = 10

  26. #文件多大的时候再回滚

  27. a1.sinks.k1.hdfs.rollSize = 1024

  28. #写了多少条就回滚(满足)

  29. a1.sinks.k1.hdfs.rollCount = 10

  30. #设置压缩格式为不压缩

  31. a1.sinks.k1.hdfs.fileType = DataStream

  32. #为source指定他的channel

  33. a1.sources.s1.channels = c1

  34. #为sink指定他的channel

  35. a1.sinks.k1.channel = c1

然后上传到hadoop04的家目录下,执行命令:

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console

flume会不断的在写之前准备阶段的数据到HDFS上面的flumetest文件夹里面,并且,每个文件夹下面有6个文件

单代理多流配置

单代理指的是一个agent,多流指的是多个Source,多个Channel,多个sinks。注意单代理端口不能冲突

多代理流程

分别在hadoop04和hadoop05上跑代码

在hadoop04下面的代码:


  1. #运行命令:flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console

  2. #运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  3. a1.sources = s1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. a1.sources.s1.type = netcat

  7. a1.sources.s1.bind = 192.168.123.104

  8. a1.sources.s1.port = 44455

  9. a1.channels.c1.type = memory

  10. a1.sinks.k1.type = avro

  11. a1.sinks.k1.hostname = 192.168.123.105

  12. a1.sinks.k1.port = 44466

  13. a1.sources.s1.channels = c1

  14. a1.sinks.k1.channel = c1

新建一个名为case_source.properties的文本文件

复制上面的代码并上传至hadoop04上面

在hadoop05上面执行下面的代码:


  1. #运行命令:flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console

  2. #运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中

  3. a1.sources = s1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. a1.sources.s1.type = avro

  7. a1.sources.s1.bind = 192.168.123.105

  8. a1.sources.s1.port =44466

  9. a1.channels.c1.type = memory

  10. a1.sinks.k1.type = logger

  11. a1.sources.s1.channels = c1

  12. a1.sinks.k1.channel = c1

新建一个名为case_sink.properties的文本文件

复制上面的代码并上传至hadoop05上面

注意:先执行hadoop05上面的代码:

[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console

在执行hadoop04上面的代码:

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console

启动之后请回到hadoop05查看日志是不是增加了下图所示

现在开始写入数据:

复制hadoop04的连接并执行下面的命令:

telnet hadoop04 44455

然后输入hello

然后再hadoop05可以看到下图的消息(消息传输没想象中那么快):

多路复制流

目的:把hadoop04的数据一模一样的发送给hadoop05和hadoop03

把下面的代码上传到hadoop04上去:

新建case_replicate.properties 文件:


  1. #2个channel和2个sink的配置文件

  2. a1.sources = s1

  3. a1.sinks = k1 k2

  4. a1.channels = c1 c2

  5. # Describe/configure the source

  6. a1.sources.s1.type = netcat

  7. a1.sources.s1.port = 44455

  8. a1.sources.s1.bind = 192.168.123.104

  9. #默认就是replicating就是复制,multiplexing是复用

  10. a1.sources.s1.selector.type = replicating

  11. a1.sources.s1.channels = c1 c2

  12. # Describe the sink

  13. #发送给hadoop05

  14. a1.sinks.k1.type = avro

  15. a1.sinks.k1.channel = c1

  16. a1.sinks.k1.hostname = 192.168.123.105

  17. a1.sinks.k1.port = 44466

  18. #发送给hadoop03

  19. a1.sinks.k2.type = avro

  20. a1.sinks.k2.channel = c2

  21. a1.sinks.k2.hostname = 192.168.123.103

  22. a1.sinks.k2.port = 44466

  23. # Use a channel which buffers events in memory

  24. a1.channels.c1.type = memory

  25. a1.channels.c1.capacity = 1000

  26. a1.channels.c1.transactionCapacity = 100

  27. a1.channels.c2.type = memory

  28. a1.channels.c2.capacity = 1000

  29. a1.channels.c2.transactionCapacity = 100

把下面的代码上传到hadoop05上面去:

新建case_replicate_s1.properties 文件


  1. # Name the components on this agent

  2. a1.sources = s1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.s1.type = avro

  7. a1.sources.s1.channels = c1

  8. a1.sources.s1.bind = 192.168.123.105

  9. a1.sources.s1.port = 44466

  10. # Describe the sink

  11. a1.sinks.k1.type = logger

  12. a1.sinks.k1.channel = c1

  13. # Use a channel which buffers events in memory

  14. a1.channels.c1.type = memory

  15. a1.channels.c1.capacity = 1000

  16. a1.channels.c1.transactionCapacity = 100

把下面的代码放到hadoop03上面去:

新建case_replicate_s2.properties 文件:


  1. # Name the components on this agent

  2. a1.sources = s1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.s1.type = avro

  7. a1.sources.s1.channels = c1

  8. a1.sources.s1.bind = 192.168.123.103

  9. a1.sources.s1.port = 44466

  10. # Describe the sink

  11. a1.sinks.k1.type = logger

  12. a1.sinks.k1.channel = c1

  13. # Use a channel which buffers events in memory

  14. a1.channels.c1.type = memory

  15. a1.channels.c1.capacity = 1000

  16. a1.channels.c1.transactionCapacity = 100

然后执行

在hadoop05上执行:

[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_replicate_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console

在hadoop03上执行:

[hadoop@hadoop03 ~]$ flume-ng agent --conf conf --conf-file case_replicate_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console

在hadoop04上执行:

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_replicate.properties --name a1 -Dflume.hadoop.logger=INFO,console

同时hadoop03增加显示:

同时hadoop05增加显示:

然后在hadoop04上写数据:

然后观察hadoop03和hadoop05都接受到下图所示同一数据:

多路复用:

在hadoop04上新建 case_multi_sink.properties 文件


  1. #2个channel和2个sink的配置文件

  2. a1.sources = s1

  3. a1.sinks = k1 k2

  4. a1.channels = c1 c2

  5. # Describe/configure the source

  6. a1.sources.s1.type = org.apache.source.http.HTTPSource

  7. a1.sources.s1.port = 44455

  8. a1.sources.s1.host = 192.168.123.104

  9. #默认就是replicating就是复制,multiplexing是复用

  10. a1.sources.s1.selector.type = multiplexing

  11. a1.sources.s1.channels = c1 c2

  12. #state就是发送信息的头部信息,是CZ开头往c1走,默认往c1走

  13. a1.sources.s1.selector.header = state

  14. a1.sources.s1.selector.mapping.CZ = c1

  15. a1.sources.s1.selector.mapping.US = c2

  16. a1.sources.s1.selector.default = c1

  17. # Describe the sink

  18. #发送给hadoop05

  19. a1.sinks.k1.type = avro

  20. a1.sinks.k1.channel = c1

  21. a1.sinks.k1.hostname = 192.168.123.105

  22. a1.sinks.k1.port = 44466

  23. #发送给hadoop03

  24. a1.sinks.k2.type = avro

  25. a1.sinks.k2.channel = c2

  26. a1.sinks.k2.hostname = 192.168.123.103

  27. a1.sinks.k2.port = 44466

  28. # Use a channel which buffers events in memory

  29. a1.channels.c1.type = memory

  30. a1.channels.c1.capacity = 1000

  31. a1.channels.c1.transactionCapacity = 100

  32. a1.channels.c2.type = memory

  33. a1.channels.c2.capacity = 1000

  34. a1.channels.c2.transactionCapacity = 100

在hadoop05上新建case_multi_s1.properties 文件:


  1. # Name the components on this agent

  2. a1.sources = s1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.s1.type = avro

  7. a1.sources.s1.channels = c1

  8. a1.sources.s1.bind = 192.168.123.105

  9. a1.sources.s1.port = 44466

  10. # Describe the sink

  11. a1.sinks.k1.type = logger

  12. a1.sinks.k1.channel = c1

  13. # Use a channel which buffers events in memory

  14. a1.channels.c1.type = memory

  15. a1.channels.c1.capacity = 1000

  16. a1.channels.c1.transactionCapacity = 100

在hadoop03上新建case_multi_s2.properties 文件


  1. # Name the components on this agent

  2. a1.sources = s1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.s1.type = avro

  7. a1.sources.s1.channels = c1

  8. a1.sources.s1.bind = 192.168.123.103

  9. a1.sources.s1.port = 44466

  10. # Describe the sink

  11. a1.sinks.k1.type = logger

  12. a1.sinks.k1.channel = c1

  13. # Use a channel which buffers events in memory

  14. a1.channels.c1.type = memory

  15. a1.channels.c1.capacity = 1000

  16. a1.channels.c1.transactionCapacity = 100

按顺序执行下面的命令:

[hadoop@hadoop03 ~]$ flume-ng agent --conf conf --conf-file case_multi_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console

[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_multi_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console

[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_multi_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console

在hadoop04上发送数据

curl-X POST -d '[{ "headers" :{"state" :"CZ"},"body" : "TEST1"}]' http://localhost:44455

curl-X POST -d '[{ "headers" :{"state" :"US"},"body" : "TEST2"}]' http://localhost:44455

curl-X POST -d '[{ "headers" :{"state" :"cn"},"body" : "TEST2"}]' http://localhost:44455

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume   三大核心组件 下一篇flume读取日志数据写入kafka &nbs..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目