设为首页 加入收藏

TOP

Flume+Kafka环境构建和实战
2019-04-23 14:24:17 】 浏览:37
Tags:Flume Kafka 环境 建和 实战
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wqhlmark64/article/details/77983915


1. 准备工作

apache上下载 apache-flume-1.7.0, apache-kafka_2.12-0.11, apache-zookeeper-3.4.9

下载后分别解压至/home/hadoop/bigdata并重命名目录为flume, kafka, zk, (便于在.bashrc中export各个HOME变量及后续升级)


2. 配置并启动zookeeper

zk配置主要有两个方面

2.1 conf/zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes. // 下面这两个目录要提前手工建好
dataDir=/home/hadoop/bigdata/zk/zkdata
dataLogDir=/home/hadoop/bigdata/zk/zkdatalog

server.1=master:2888:3888
server.2=ndh-slave01:2888:3888
server.3=slave02:2888:3888

# the port at which the clients will connect
clientPort=2181


2.2 zkdata/myid

这里面主要保存与conf/zoo.cfg中server.N对应的数N,如master主机上是server.1, myid中即写1,其它两台机器同理。


2.3 启动并验证zk

把zookeeper目录及刚修改的配置文件一并拷贝到另两台slave机器上,并分别执行下面两个命令,以确保正常启动。

./zkServer.sh start

./zkServer.sh status

如遇到./zkServer.sh status报错问题,通常都是配置和操作的问题,请参考先前文章:http://blog.csdn.net/wqhlmark64/article/details/73250662。


3. flume配置和启动

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

简要来说,其设计目标和优势如下:

(1) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。
(2) 可扩展性
Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。
(3) 可管理性
所有agent和colletor由master统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
(4) 功能可扩展性
用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。

很明显,使用flume,主要就是确定好数据源和目标接收地,以及flume agent相关的source, channel, sink.


3.1 常用配置模式
主要修改flume/conf/flume-conf.properties文件。安装包中默认的文件名有后缀.template,要copy一份并重命名为flume-conf.properties。

3.1.1 监控特定文件
//声明名字为agent的Agent的source, sink, channel
agent.sources=s1
agent.sinks=k1
agent.channels=c1

// 定义各source,sink,channel的属性
agent.sources.s1.type=exec
agent.sources.s1.command = tail -f -n+1 /home/hadoop/bigdata/spark/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-master.out  // 这里取了spark的任务日志文件作为监控源,指定任何有流式输入的文件均可
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink     // lib/flume-ng-kafka-sink-1.7.0.jar, lib/flume-ng-core-1.7.0.jar中有,注意最后的KafkaSink首字母大写,执行过程中曾因把开关的K小写了,导致启动报错找不到类
agent.sinks.k1.brokerList=master:9092
agent.sinks.k1.topic=test0   // 与第4节中Kafka定义的topic相一致
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1

3.1.2 侦听网络端口数据
Agent名称定义为agent.
Source:可以理解为输入端,定义名称为s1 channel:
传输频道,定义为c1,设置为内存模式
sinks:可以理解为输出端,定义为sk1,

agent.sources = s1
agent.channels = c1
agent.sinks = sk1

#设置Source的netcat 端口为5678,使用的channel为c1
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 5678
agent.sources.s1.channels = c1

#设置Sink为logger模式,使用的channel为c1
agent.sinks.sk1.type = logger
agent.sinks.sk1.channel = c1

#设置channel信息
agent.channels.c1.type = memory   #内存模式
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100   #传输参数设置。


3.1.2 特定目录下新增文件
agent.sources = s1
agent.channels = c1
agent.sinks = sk1

#设置spooldir
agent.sources.s1.type = spooldir
agent.sources.s1.spoolDir =/var/logs/xxx
agent.sources.s1.fileHeader = true
agent.sources.s1.channels = c1
agent.sinks.sk1.type = logger
agent.sinks.sk1.channel = c1 #In Memory !!!
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10004
agent.channels.c1.transactionCapacity = 100


3.2 启动flume
因后面要和Kafka配合,这里启动第一种模式的配置。
./bin/flume-ng agent --name agent --conf conf --conf-file conf/flume-conf.properties -Dflume.root.logger=INFO,console &
命令说明:指定Agent name为agent, 与flume-conf.properties中配置的agent名字一致,使用conf/flume-conf.properties配置文件


4. Kafka配置和启动
4.1 kafka配置和server启动
因为以集群方式启动,在conf下server.properties分别拷贝两份且命名为server-1.properties, server-2.properties.
以server.properties为例,修改其中的属性字段如下
broker.id=0
log.dirs=/tmp/kafka-logs-0

server-1,server-2中分别把对应的0修改为1和2,以便和名字对应方便识别。

然后分别执行以启动3个server进程,与后面使用过程中指定副本对应: 
./bin/kafka-server-start.shconfig/server.properties &
./bin/kafka-server-start.shconfig/server-1.properties &
./bin/kafka-server-start.shconfig/server-2.properties &

4.2 创建topic,读取数据并消费

./bin/kafka-topics.sh --create --zookeeper master:2181 --partitions 1 --replication-factor 3 --topic test0  //创建名字为test0的topic, 1个分区,3份副本

./bin/kafka-topics.sh --list --zookeeper master:2181   // 查看创建的topic

test
test-replica-3
test0
test1

./bin/kafka-topics.sh --describe --zookeeper master:2181 --topic test0  // 查看topic test0的状态信息
Topic:test0	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: test0	Partition: 0	Leader: 0	    Replicas: 0,2,1	Isr: 0,2,1
  • leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
  • replicas:列出了所有的副本节点,不管节点是否在服务中.
  • isr:是正在服务中的节点.

./bin/kafka-console-producer.sh --broker-list master:9092 --topic test0 // 命令行中通过生产者向test0的topic中写数据,即除了flume中向test0写外,命令行中也在写

>iko nknxlr mwqoi

./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test0 --from-beginning   //消费test0的topic中的数据



17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0



iko nknxlr mwqoi


值得提及的是,flume对数据作有存储,建立多个consumer进程同时消费test0 topic中的数据时,每个进程获取的数据也都是相同的。


到此,Flume+Kafka完成,后续将还有至少两方面需要尝试:
a. flume其它两种模式和更多的组合
b. flume+kafka与storm的整合。


5. 期间遇到的问题
5.1 "Unable to load sink type: org.apache.flume.sink.kafka.kafkaSink"
如3.1.1中所说,没有大写kafkaSink中的首字母,导致类找不到。

5.2 "Agent configuration for 'agent' does not contain any channels. Marking it as invalid"
在3.2中启动 flume时没指定 --name参数且名字与flume-conf.properties中的agent不一致。如下
./bin/flume-ng agent --nameagent--conf conf --conf-file conf/flume-conf.properties -Dflume.root.logger=INFO,console &



17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0








iko nknxlr mwqoi
17/09/14 19:15:55 INFO master.Master: Registering app Spark Pi
17/09/14 19:15:55 INFO master.Master: Registered app Spark Pi with ID app-20170914191555-0025
17/09/14 19:15:55 INFO master.Master: Launching executor app-20170914191555-0025/0 on worker worker-20170825172933-10.76.9.198-40179
17/09/14 19:37:33 INFO master.Master: Received unregister request from application app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: Removing app app-20170914191555-0025
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:50653 got disassociated, removing it.
17/09/14 19:37:33 INFO master.Master: 10.76.9.230:59059 got disassociated, removing it.
17/09/14 19:37:34 WARN master.Master: Got status update for unknown executor app-20170914191555-0025/0








iko nknxlr mwqoi
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka文档(3)----0.8.2-kaf.. 下一篇kafkaProducer 1.1 读取文件目录..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目