版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_1018944104/article/details/85462011
目录
1、数据收集工具/系统产生背景
2、专业的数据收集工具
2.1、Chukwa
2.2、Scribe
2.3、Fluentd
2.4、Logstash
2.5、Apache Flume
3、Flume概述
3.1、Flume概念
3.2、Flume版本介绍
3.3、Flume数据源和输出方式
4、Flume体系结构/核心组件
4.1、概述
4.2、Flume三大核心组件
4.3、Flume经典部署方案
4.3.1、单Agent采集数据
4.3.2、多Agent串联
4.3.3、多Agent合并串联
4.3.4、多路复用
5、Flume实战案例
5.1、安装部署Flume
5.2、Flume实战案例
5.2.1、采集目录到HDFS
5.2.2、采集文件到HDFS
5.2.3、多路复用采集
5.2.4、多Agent串联采集
5.2.5、高可用部署采集
5.2.6、更多Source 和 Sink组件
6、综合案例
6.1、案例场景/需求
6.2、场景分析
6.3、数据处理流程分析
6.4、需求实现
1、数据收集工具/系统产生背景
Hadoop业务的整体开发流程:
任何完整的大数据平台,一般都会包括以下基本处理过程:
- 数据采集
- 数据ETL
- 数据存储
- 数据计算/分析
- 数据展现
其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出。这其中包括:
- 数据源多种多样
- 数据量大,变化快
- 如何保证数据采集的可靠性
- 如何避免重复数据
- 如何保证数据的质量
我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠, 高性能和高扩展。
总结,数据来源大体包括:
- 业务数据
- 爬虫爬取的网络公开数据
- 购买数据
- 自行采集收集的日志数据
2、专业的数据收集工具
2.1、Chukwa
Apache Chukwa是Apache旗下另一个开源的数据收集平台,它远没有其他几个有名。Chukwa基于Hadoop的HDFS和MapReduce来构建(显而易见,它用Java来实现),提供扩展性和可靠性。Chukwa同时提供对数据的展示,分析和监视。很奇怪的是它的上一次Github的更新事是7年前。可见该项目应该已经不活跃了。
官网:http://chukwa.apache.org/
2.2、Scribe
Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到的应用。它能够从各种日志源上收集日志,存储到一个中央存储系统(可以是NFS,HDFS,或者其他分布式文件系统等)上,以便于进行集中统计分析处理。
官网:https://www.scribesoft.com/
2.3、Fluentd
Fluentd是另一个开源的数据收集框架。Fluentd使用C/Ruby开发,使用JSON文件来统一日志数据。它的可插拔架构,支持各种不同种类和格式的数据源和数据输出。最后它也同时提供了高可靠和很好的扩展性。
官网:https://www.fluentd.org/
2.4、Logstash
Logstash是著名的开源数据栈ELK(ElasticSearch,Logstash,Kibana)中的那个L。几乎在大部分的情况下ELK作为一个栈是被同时使用的。所有当你的数据系统使用ElasticSearch的情况下,Logstash是首选。Logstash用JRuby开发,所以运行时依赖JVM。
官网:https://www.elastic.co/cn/products/logstash
2.5、Apache Flume
Flume是Apache旗下,开源,高可靠,高扩展,容易管理,支持客户扩展的数据采集系统。Flume使用JRuby来构建,所以依赖Java运行环境。Flume最初是由Cloudera的工程师设计 用于合并日志数据的系统,后来逐渐发展用于处理流数据事件。
官网:http://flume.apache.org/
3、Flume概述
3.1、Flume概念
Flume是一个分布式、高可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据,同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力。
1、Apache Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统,和Sqoop同属于数据采集系统组件,但是Sqoop用来采集关系型数据库数据,而Flume用来采集流动型数据。
2、Flume名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数据的采集,它支持从很多数据源聚合数据到HDFS。
3、 一般的采集需求,通过对flume的简单配置即可实现。Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景。
4、Flume最初由Cloudera开发,在2011年贡献给了Apache基金会,2012年变成了Apache的顶级项目。Flume OG(Original Generation)是Flume最初版本,后升级换代成Flume NG(Next/New Generation)。
5、Flume的优势:可横向扩展、延展性、可靠性。
3.2、Flume版本介绍
Flume在0.9.x and 1.x之间有较大的架构调整:
- 1.x版本之后的改称Flume NG
- 0.9.x版本称为Flume OG,最后一个版本是0.94,之后是由Apache进行了重构
- N和O的意思就是new和old的意思
官网文档:http://flume.apache.org/FlumeUserGuide.html
3.3、Flume数据源和输出方式
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日 志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。
Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。最常用的是Kafka。
4、Flume体系结构/核心组件
4.1、概述
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。如下图:
组件 |
功能 |
Agent |
使用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
|
Client |
生产数据,运行在一个独立的线程。 |
Source |
从Client收集数据,传递给Channel。 |
Sink |
从Channel收集数据,运行在一个独立线程上。 |
Channel |
连接Sources 和 Sinks,这个有点像一个队列。 |
Events |
可以是日志记录、avro对象等。 |
4.2、Flume三大核心组件
Event
- Event是Flume数据传输的基本单元。
- Flume以事件的形式将数据从源头传送到最终的目的地。
- Event由可选的header和载有数据的一个byte array构成。
- 载有的数据度flume是不透明的。
- Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
- Header可以在上下文路由中使用扩展
Client
- Client是一个将原始log包装成events并且发送他们到一个或多个agent的实体
- 目的是从数据源系统中解耦Flume
- 在Flume的拓扑结构中不是必须的。
- Client实例
- flume log4j Appender
- 可以使用Client SDK(org.apache.flume.api)定制特定的Client
Agent
- 一个Agent包含source,channel,sink和其他组件。
- 它利用这些组件将events从一个节点传输到另一个节点或最终目的地。
- agent是flume流的基础部分。
- flume为这些组件提供了配置,声明周期管理,监控支持。
Source
- Source负责接收event或通过特殊机制产生event,并将events批量的放到一个或多个
Channel
- 包含event驱动和轮询两种类型。
- 不同类型的Source
- 与系统集成的Source:Syslog,Netcat,监测目录池
- 自动生成事件的Source:Exec
- 用于Agent和Agent之间通信的IPC source:avro,thrift
- source必须至少和一个channel关联
Agent之Channel
- Channel位于Source和Sink之间,用于缓存进来的event
- 当sink成功的将event发送到下一个的channel或最终目的event从channel删除
- 不同的channel提供的持久化水平也是不一样的
- Memory channel:volatile(不稳定的)
- File Channel:基于WAL(预写式日志Write-Ahead logging)实现JDBC
- channel:基于嵌入式database实现
- channel支持事务,提供较弱的顺序保证
- 可以和任何数量的source和sink工作
Agent之Sink
- Sink负责将event传输到下一个Agent或最终目的地,成功后将event从channel移除
- 不同类型的sink
- 存储event到最终目的地终端sink,比如HDFS,HBase
- 自动消耗的sink比如null sink
- 用于agent间通信的IPC:sink:Avro
- 必须作用于一个确切的channel
Iterator
- 作用于Source,按照预设的顺序在必要地方装饰和过滤events
Channel Selector
- 允许Source基于预设的标准,从所有channel中,选择一个或者多个channel
Sink Processor
- 多个sink可以构成一个sink group
- sink processor可以通过组中所有sink实现负载均衡
- 也可以在一个sink失败时转移到另一个
4.3、Flume经典部署方案
4.3.1、单Agent采集数据
4.3.2、多Agent串联
4.3.3、多Agent合并串联
4.3.4、多路复用
5、Flume实战案例
5.1、安装部署Flume
1、Flume的安装非常简单,只需要解压即可,当然,前提是已有Hadoop环境上传安装包到 数据源所在节点上
然后解压tar -zxvf apache-flume-1.8.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)
3、指定采集方案配置文件,在相应的节点上启动flume agent
先用一个最简单的例子来测试一下程序环境是否正常:
1、在$FLUME_HOME/agentconf目录下创建一个数据采集方案,该方案就是从一个网络端口收集数据,也就是创一个任意命名的配置文件如下:netcat-logger.properties文件内容如下:
#定义这个agent中各个组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#描述和配置sink组件:k1
a1.sinks.k1.type = logger
#描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
2、启动agent去采集数据:
在$FLUME_HOME下执行如下命令:
bin/flume-ng agent -c conf -f agentconf/netcat-logger.properties -n a1 - Dflume.root.logger=INFO,console
-c conf指定flume自身的配置文件所在目录 -f conf/netcat-logger.perproties指定我们所描述的采集方案
-n a1指定我们这个agent的名字
|
3、测试
先要往agent的source所监听的端口上发送数据,让agent有数据可采
例如在本机节点,使用telnet localhost 44444命令就可以
如果这个命令的执行过程中发现抛出异常说:command not found
那么请使用:sudo yum -y install telnet这个命令进行telnet的安装
输入两行数据:
hello huangbo
1234
4、Flume-Agent 接收的结果
5.2、Flume实战案例
5.2.1、采集目录到HDFS
采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
根据需求,首先定义以下3大要素:
- 数据源组件,即source——监控文件目录:spooldir。spooldir特性如下:
- 监视一个目录,只要目录中出现新文件,就会采集文件中的内容
- 采集完成的文件,会被agent自动添加一个后缀:.COMPLETED
- 所监视的目录中不允许重复出现相同文件名的文件
- 下沉组件,即sink——HDFS文件系:hdfs sink
- 通道组件,即channel——可用file channel也可以用内存channel
配置文件编写:spooldir-hdfs.properties
#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
#配置source组件
agent1.sources.source1.type =spooldir
agent1.sources.source1.spoolDir =/home/hadoop/logs/
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
#配置sink组件
agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
|
Channel参数解释:
- capacity:默认该通道中最大的可以存储的event数量
- trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
- keep-alive:event添加到通道中或者移出的允许时间
启动:bin/flume-ng agent -c conf -f agentconf/spooldir-hdfs.properties -n agent1
测试:
- 如果HDFS集群是高可用集群,那么必须要放入core-site.xml和hdfs-site.xml文件到$FLUME_HOME/conf目录中
- 查看监控的/home/hadoop/logs文件夹中的文件是否被正确上传到HDFS上
- 在该目录中创建文件,或者从其他目录往该目录加入文件,验证是否新增的文件能被自动的上传到HDFS
5.2.2、采集文件到HDFS
采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到HDFS
根据需求,首先定义以下3大要素:
- 采集源,即source--监控文件内容更新:exec "tail -F file"
- 下沉目标,即sink--HDFS文件系:hdfs sink
- Source和sink之间的传递通道——channel,可用file channel也可以用内存channel
配置文件编写:tail-hdfs.properties
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type =exec
agent1.sources.source1.command =tail -F /home/hadoop/logs/catalina.out
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1
agent1.sinks.sink1.type =hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://myha01/weblog/flume-event/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = tomcat_
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
|
启动:bin/flume-ng agent -c conf -f agentconf/tail-hdfs.properties -n agent1
测试:
- 模拟向指定的日志文件/home/hadoop/logs/catalina.out追加内容
- 验证HDFS上的对应文件是否有新增内容
5.2.3、多路复用采集
作业。
5.2.4、多Agent串联采集
架构设计:从hadoop04的flume agent传送数据到hadoop05的flume agent
如现在我在两台机器上的测试,192.168.123.104和192.168.123.105上面做agent的传递,分别是:
hadoop04:tail-avro.properties
- 使用exec “tail-F /home/hadoop/testlog/welog.log”获取采集数据
- 使用avro sink数据都下一个agent
hadoop05:avro-hdfs.properties
- 使用avro接收采集数据
- 使用hdfs sink数据到目的地
第一步:准备hadoop04
在IP为192.168.123.104的hadoop04上的agentconf下创建一个tail-avro.properties:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop05
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
第二步:准备hadoop05
再在IP为192.168.123.105的hadoop05机器上配置采集方案avro-hdfs.properties:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe k1
a1.sinks.k1.type =hdfs
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
第三步:最终测试
1、首先启动hadoop05机器上的agent
bin/flume-ng agent -c conf -n Dflume.root.logger=INFO,console
2、再启动hadoop04上的agent
bin/flume-ng agent -c conf -n Dflume.root.logger=INFO,console
3、执行一个普通的脚本往hadoop04的/home/hadoop/testlog/date.log中追加数据:
#!/bin/bash
while true
do
echo `date` >> /home/hadoop/testlog/date.log
sleep 1
done
|
4、至此会发现在hadoop04 agent发送的数据会转到hadoop05 agent,然后被sink到了HDFS的对应目录hdfs://myha01/testlog/flume-event/
|
5.2.5、高可用部署采集
第一步:
Flume-NG 的高可用架构图
图中,我们可以看出,Flume的存储可以支持多种,这里只列举了HDFS和Kafka(如:存储 最新的一周日志,并给Storm系统提供实时日志流)。
第二步:节点分配
Flume的Agent和Collector分布如下表所示:
名称 |
Host |
角色 |
Agent1 |
hadoop02 |
日志服务器 |
Agent2 |
hadoop03 |
日志服务器 |
Agent3 |
hadoop04 |
日志服务器 |
Collector1 |
hadoop04 |
AgentMaster1 |
Collector2 |
hadoop05 |
AgentMaster2 |
图中所示,Agent1,Agent2,Agent3数据分别流入到Collector1和Collector2,Flume NG本 身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不 同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置Flume NG集群
第三步:配置信息
在下面单点Flume中,基本配置都完成了,我们只需要新添加两个配置文件,它们是ha_agent.properties和ha_collector.properties,其配置内容如下所示:
ha_agent.properties配置:
#agent name: agent1
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop04
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop05
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
|
ha_collector.properties配置:
#set agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
##当前主机为什么,就修改成什么主机名
a1.sources.r1.bind = hadoop04
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
##当前主机为什么,就修改成什么主机名
a1.sources.r1.interceptors.i1.value = hadoop04
a1.sources.r1.channels = c1
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a1.sinks.k1.channel=c1
|
注意:在把ha_collector.properties文件拷贝到另外一台collector的时候,记得更改该配置文 件中的主机名。在该配置文件中有注释
第四步:启动
先启动hadoop04和hadoop05上的collector角色: bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 - Dflume.root.logger=INFO,console
然后启动hadoop02,hadoop03,hadoop04上的agent角色: bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 - Dflume.root.logger=INFO,console
|
5.2.6、更多Source 和 Sink组件
更多Sources:http://flume.apache.org/FlumeUserGuide.html#flume-sources
更多Channels:http://flume.apache.org/FlumeUserGuide.html#flume-channels
更多Sinks:http://flume.apache.org/FlumeUserGuide.html#flume-sinks
6、综合案例
6.1、案例场景/需求
A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log。现在要求把A、B机器中的access.log、nginx.log、web.log采集汇总到C机器上然后统一收集到HDFS中。但是在hdfs中要求的目录为:
/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**
6.2、场景分析
6.3、数据处理流程分析
6.4、需求实现
第一:准备3台服务器
服务器A对应的IP为192.168.123.103,主机名为hadoop03
服务器B对应的IP为192.168.123.104,主机名为hadoop04
服务器C对应的IP为192.168.123.105,主机名为hadoop05
第二:设计采集方案exec_source_avro_sink.properties
在服务器hadoop03和服务器hadoop04上的$FLUME_HOME/agentconf创建采集方案的配置
文件exec_source_avro_sink.properties,文件内容为:
#指定各个核心组件
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
#准备数据源
## static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/flume_data/access.log a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /home/hadoop/flume_data/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /home/hadoop/flume_data/web.log
a1.sources.r3.interceptors = i35
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop05
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
|
第三:准备avro_source_hdfs_sink.properties配置文件
在服务器C上的$FLUME_HOME/agentconf中创建配置文件avro_source_hdfs_sink.properties文件内容为:
#定义agent名,source、channel、sink的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port =41414
#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor$Builder
#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
#定义sink a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://myha01/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 20
#flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
第四:启动
配置完成之后,在服务器A和B上的/home/hadoop/data有数据文件access.log、nginx.log、web.log。
先启动服务器C(hadoop05)上的flume,启动命令:在flume安装目录下执行:
bin/flume-ng agent -c conf -f agentconf/avro_source_hdfs_sink.properties -name a1 - Dflume.root.logger=DEBUG,console
然后在启动服务器上的A(hadoop03)和B(hadoop04),启动命令:在flume安装目录下执 行:
bin/flume-ng agent -c conf -f agentconf/exec_source_avro_sink.properties -name a1 - Dflume.root.logger=DEBUG,console
第五:测试
自行测试