设为首页 加入收藏

TOP

【Flume】文件收集框架Flume
2019-05-12 14:12:58 】 浏览:127
Tags:Flume 文件 收集 框架
版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/80956067

1、Flume架构

Flume是Cloudera 开发的框架,用于从文件中实时收集数据。

角色 描述 常用形式
Source Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel Avro Source、Exec Source、Spooling Directory Source、Kafka Source、Netcat Source、Syslog Sources、HTTP Source
Channel 连接 sources 和 sinks ,这个有点像一个队列 Memory Channel、JDBC Channel、Kafka Channel、File Channel
Sink 从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase HDFS Sink、Hive Sink、HBase Sinks、MorphlineSolrSink、ElasticSearchSink、Kafka Sink

Events:Event是Flume数据传输的基本单元,Flume以事件的形式将数据从源头传送到最终的目的。Event由可选的header和载有数据的一个byte array构成,载有的数据对flume是不透明的,Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的,Header可以在上下文中使用扩展。

Source、Channel、Sink、Event的执行流程如下:

2、Flume的安装

(1)Flume的运行环境:
1)运行在logs收集的地方,将生成的logs在Source中收集起来;
2)系统:只支持Linux;
3)需要JVM/JDK环境;
4)是一种轻量级服务,其他的轻量级服务还有:zookeeper,journalnode,zkfc,sqoop等。
(2)Flume的安装:
1)上传下载好的flume压缩包到Linux系统中,并为压缩包赋予执行权限:
$ chmod u+x flume-ng-1.5.0-cdh5.3.6.tar.gz
2)解压flume安装包:
$ tar -zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/
3)将flume文件夹改名:
cdh-5.3.6]$ mv apache-flume-1.5.0-cdh5.3.6-bin/ flume-1.5.0-cdh5.3.6
4)修改配置文件/opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/conf/flume-env.sh,指定JAVA_HOME。
flume-1.5.0-cdh5.3.6]$ cd conf conf]$ cp flume-env.sh.template flume-env.sh

export JAVA_HOME=/opt/modules/jdk1.7.0_67

5)验证安装是否成功:
flume-1.5.0-cdh5.3.6]$ bin/flume-ng version
出现如下结果说明安装成功:

Flume 1.5.0-cdh5.3.6
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: e97c9912e8b940cf493c2392d2b113b97194cffb
Compiled by jenkins on Tue Jul 28 15:21:40 PDT 2015
From source with checksum e4d03999f62abb1c9e9f34054fe59f06

3、Flume命令

[beifeng@hadoop-senior flume-1.5.0-cdh5.3.6]$ bin/flume-ng 
Usage: bin/flume-ng <command> [options]...

commands:
  agent                     run a Flume agent

global options:
  --conf,-c <conf>          use configs in <conf> directory
  -Dproperty=value          sets a Java system property value

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)

4、Flume使用示例一

Source为Telnet输入,Channel为内存,Sink存储为日志,debug信息输出到屏幕上。
1)配置文件a1.conf:

# define agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# define sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-senior.ibeifeng.com
a1.sources.r1.port = 44444

# define sinks
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024

# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# define bindings
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)运行命令:

bin/flume-ng agent \
-c conf \
-n a1 \
-f conf/a1.conf \
-Dflume.root.logger=DEBUG,console

5、Flume使用示例二

Source为执行命令,Channel为内存,Sink存储在hdfs上,debug信息输出到屏幕上。
1)配置文件flume-tail.conf:

# define agent
a2.sources = r2
a2.channels = c2
a2.sinks = k2

# define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# define sinks
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10

# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# define bindings
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

2)运行命令:

bin/flume-ng agent \
-c conf \
-n a2 \
-f conf/flume-tail.conf \
-Dflume.root.logger=DEBUG,console

6、Flume使用示例三

Source为Spooling,Channel为file channel,Sink存储到hdfs上,debug信息输出到屏幕上。选择非.log结尾的文件传输,传输后原文件加上后缀.delete。Sink在hdfs上根据当前日期创建文件夹,输出文件结果。
1)配置文件flume-app.conf:

# define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3

# define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\\.log$
a3.sources.r3.fileSuffix = .delete

# define sinks
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/splogs/%Y%m%d
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10

# define channels
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/checkpoint
a3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/data

# define bindings
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

其中,spoolDir为监控的日志目录,抽取完整的日志文件,写的日志文件不抽取; 使用FileChannel,本地文件系统缓冲,比内存安全性更高;数据存储在HDFS上,存储对应hive表的目录或者hdfs目录。
2)运行命令:

bin/flume-ng agent \
-c conf \
-n a3 \
-f conf/flume-app.conf \
-Dflume.root.logger=DEBUG,console

7、Flume架构的使用场景




】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Mybatis解决属性名与字段名不一致 下一篇解决flume运行中的一个异常问题!

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目