设为首页 加入收藏

TOP

Flume Source组件实战—Avro、Spool、Exec(详细图文)
2019-03-25 14:04:22 】 浏览:342
Tags:Flume Source 组件 实战 Avro Spool Exec 详细 图文
版权声明:本文为夏天小厨原创文章,转载引用请注明出处 https://blog.csdn.net/a_drjiaoda/article/details/84954593

在Flume原理剖析和安装部署章节,我们最后提到NetCat Source的实例,实现了监听一个指定的网络端口,只要在应用程序向这个端口里面写数据,这个NetCat Source组件就能获取到信息。本章内容继续讲解Flume Source的其他几个常用组件,Avro Source、Spool Source、Exec Source,后两者是属于自动监控读取文件的source组件。因为本文只测试Source组件,从而这里默认的Channel是 MemoryChannel,sink为LoggerSink。

一、Avro Source实战

Avro Source可以定制avro-client发送一个指定的文件给Flume agent,Avro源使用Avro RPC机制,Flume主要的RPC Source也是 Avro Source,它使用Netty-Avro inter-process的通信(IPC)协议来通信,因此可以用java或JVM语言发送数据到Avro Source端。它的配置文件主要包含三个参数:

  • type: Avro source的别名是avro,也可以使用完整类别名称,org.apache.flume.source.AvroSource;
  • bind: 绑定的IP地址或主机名。使用0.0.0.0绑定机器所有端口
  • port: 绑定监听端口端口

利用Avro Source可以实现多级流动、扇出流、扇入流等效果,下面放一张flume官方经典图,多级流动,如下图所示。另外也可以接受通过flume提供的Avro客户端发送的日志信息(本文的示例就是使用avro-client发送日志信息)。

1、cd /usr/flume/conf 在conf目录下新建avro.conf ,vi avro.conf 。如果你之前已经写过了netcat.conf 你可以使用命令 cp netcat.conf avro.conf 之后修改avro里面的内容即可,不需要每次都进行全段全段的撰写,节约时间。

#example.conf: A single-node flume configuration
#It is Just for test flume spool Example

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定制source,绑定channel、主机以及端口
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

#Describe the sink
a1.sinks.k1.type = logger

#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

整个配置文件内容分为三个大部分:1、从整体上描述代理agent中sources、sinks、channels所涉及到的组件;2、详细描述agent中每一个source、sink与channel的具体实现:即在描述source的时候,需要指定source到底是什么类型的,即这个source是接受文件的、还是接受http的、还是接受thrift的;对于sink则是描述使用什么类型的sink,本例中使用的是Logger Sink,将数据写入日志文件;对于channel需要指定是内存等;3、通过channel将source与sink连接起来。

2、使用命令,启动flume agent

[root@master bin]# flume-ng agent -n a1 -c /usr/flume/conf/ --conf-file /usr/flume/conf/avro.conf -Dflume.root.logger=INFO,console

参数解释:

-n 指定agent名称,要与配置文件一致(等价于--name)

-c 指定配置文件位置(等价于 --conf)

-f 指定agent配置文件(等价 --conf-file)

-Dflume.root.logger=INFO,console 设置日志等级

3、成功启动之后,如下所示,系统程序界面会一直在等待状态,等待接收avro-client发送的日志数据:

4、我们使用SSH 远程连接工具Secure CRT 重新Clone一下新Session,启动avro-client,这里需要指定发给给agent的文件具体位置,并指定端口

[root@master ~]# flume-ng avro-client -c /usr/flume/conf/ -H master -p 4141 -F /usr/flume/logs/avro_test.log

5、最后我们来查看一下结果,在运行上面的命令之后,agent界面中会显示avro_test.log中的文件内容,下图的字比较小,为了更好的理解,我把几张图拼在一起。

二、Spool Source实战

Spool Source监听一个指定的目录,即只要应用程序想这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channel。等待sink处理完之后,标记该文件已完成处理,文件名添加.completed后缀。虽然是自动监控整个目录,但是只能监控文件,如果以追加的方式向已被处理的文件中添加内容,source并不能识别;换言之,如果在被spool source监控的目录下出现了一个新文件,则立即会被处理。

但值得注意的是:(1)拷贝到spool目录下的文件不可以再打开编辑。(2)spool目录下不可包含相应的子目录,即无法监控子目录的文件夹变动。

1、添加配置文件内容cd /usr/flume/conf 在conf目录下新建spool.conf ,vi spool.conf 。如果你之前已经写过了avro.conf 你可以使用命令 cp avro.conf spool.conf之后修改spool里面的内容即可,不需要每次都进行全段全段的撰写,节约时间。

#example.conf: A single-node flume configuration
#It is Just for test flume spool Example

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#指定source 绑定channel和需要监控的文件夹
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /usr/flume/logs
a1.sources.r1.fileHeader = true

#Describe the sink
a1.sinks.k1.type = logger

#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、启动。使用命令,启动flume agent

[root@master bin]# flume-ng agent -n a1 -c /usr/flume/conf/ --conf-file /usr/flume/conf/spool.conf -Dflume.root.logger=INFO,console  

参数选项就不在这里做过多解释,如有需要,请参考Avro Source实战内容

3、成功启动之后,如下所示,系统程序界面会一直处于监控状态,监控文件夹下文件是否出现变动。在这个界面中明显已经开始读完一个名为avro_test.log的文件,headers为正在读的文件的位置,body为一行的内容,但是显示的只有16位,这里面用黄色框标出的即这个文件夹中的内容,即我们在上面测试用的avro_test.log文件当时存放的正式logs文件夹下。

4、为了测试其工作方式,我们在其他会话窗口创建一个test_spool.log文件,放入到logs文件夹下,并观察agent窗口的变化。

从上图可以看出,测试用的test_spool.log一旦放入到被监测的logs文件夹下之后,会立即被处理,并且被打上处理完成的标记。Tips:我们在测试的时候会通过vi命令来创建一个文件,也可以echo "spool test" >> test_spool.log 以追加的方式写入新内容;或者使用cat命令,cat其他文件的内容追加到文件中。

三、Exec Source实战

监听一个指定的命令,获取一条命令的结果作为它的数据源,source组件从这个命令的结果中取数据。常用的是tail -F 【file】指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 ,EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容

1、添加配置文件内容cd /usr/flume/conf 在conf目录下新建exec.conf ,vi exec.conf

#example.conf: A single-node flume configuration
#It is Just for test flume exec Example

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/flume/logs/exec_tail_test.log
a1.sources.r1.channels = c1

#Describe the sink
a1.sinks.k1.type = logger

#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、启动。使用命令,启动flume agent

[root@master ~]# flume-ng agent -n a1 -c /usr/flume/conf/ -f /usr/flume/conf/exec.conf -Dflume.root.logger=INFO,console

3、成功启动之后,如下所示,系统程序界面会一直处于监控状态,监控该命令下文件是否出现变动,从上面的日志信息可以看出,相关组件进程均已成功启动。

4、如果要使用tail命令,必选使得file足够大才能看到输出内容,因此为了让agent界面能够监控到tail之后的结果,我们往监控的文件中循环插入100条数据:

5、查看agent中的数据

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇通过kafka,flume消费自己生产的.. 下一篇flume 简单案例 将一个节点的中一..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目