简介
首先简单说下对kafka的理解:
1、kafka是一个分布式的消息缓存系统;
2、kafka集群中的服务器节点都被称作broker
3、kafka的客户端分为:一是producer(消息生产者)负责往消息队列中放入消息;另一类是consumer(消息消费者)负责从消息队列中取消息。客户端和服务器之间的通信采用tcp协议
4、kafka中不同业务系统的消息可以通过topic(主题)进行区分,也就是说一个主题就是一个消息队列,而且每一个消息topic都会被分区,以分担消息读写的负载
5、parition(分区)是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。每一个分区都可以有多个副本,以防止数据的丢失
6、某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
7、消费者可以分组,每一个consumer属于特定的组,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。比如有两个消费者组A和B,共同消费一个topic:topic-1,A和B所消费的消息不会重复.
比如 topic-1中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号
8、消费者在具体消费某个topic中的消息时,可以指定起始偏移量
集群安装、启动
1、下载安装包并解压
tar xf kafka_2 .10 -0.8 .1.1 .tgz
cd kafka_2 .10 -0.8 .1.1
2、修改config/server.properties配置文件
broker.id =1
zookeeper.connect =192.168 .2 .100 :2181 , 192.168 .2 .110 :2181 , 192.168 .2 .120 :2181
注:kafka集群依赖zookeeper集群,所以此处需要配置zookeeper集群;zookeeper集群配置请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html
3、将kafka解压包使用scp命令拷贝至集群其他节点,命令:
scp -r kafka_2.10 -0 .8.1 .1 / 192.168 .2.110 ://home/hadoop/app
4、将zookeeper集群启动,请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html
5、在每一台节点上启动broker
bin/kafka-server -start.sh config/server .properties
bin/kafka-server -start.sh config/server .properties 1 >/dev/null 2 >&1 &
[hadoop@hadoop1-1 kafka_2.10 -0.8 .1 .1 ]$ jps
2400 Jps
2360 Kafka
2289 QuorumPeerMain
**
简单测试
**
1、在kafka集群中创建一个topic
[ - . - . . . ] - . - - - - . . . - - - - - - -
.
replication-factor:表示副本数量
partitions :分区数量
2、用一个producer向某一个topic中写入消息
[hadoop@hadoop1-1 kafka_2.10 -0.8 .1 .1 ]$ bin/kafka-console-producer.sh --broker-list 192.168 .2 .100 :9092 --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP ) logger implementation
SLF4J: See http://www.slf 4j.org /codes.html #StaticLoggerBinder for further details.
3、用一个comsumer从某一个topic中读取信息
[hadoop@hadoop1-2 kafka_2.10 -0.8 .1 .1 ]$ bin/kafka-console-consumer.sh --zookeeper 192.168 .2 .100 :2181 --from-beginning --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP ) logger implementation
SLF4J: See http://www.slf 4j.org /codes.html #StaticLoggerBinder for further details.
4、查看一个topic的分区及副本状态信息
[hadoop@hadoop1 -3 kafka_2.10 -0 .8.1 .1 ]$ bin/kafka-topics.sh --describe --zookeeper 192.168 .2.110 : 2181 --topic topictest
Topic :topictest PartitionCount : 1 ReplicationFactor : 3 Configs :
Topic : topictest Partition : 0 Leader : 1 Replicas : 1 ,0 ,2 Isr : 1 ,0 ,2
[hadoop@hadoop1 -3 kafka_2.10 -0 .8.1 .1 ]$ bin/kafka-topics.sh --describe --zookeeper 192.168 .2.100 : 2181 --topic topictest
Topic :topictest PartitionCount : 1 ReplicationFactor : 3 Configs :
Topic : topictest Partition : 0 Leader : 1 Replicas : 1 ,0 ,2 Isr : 1 ,0 ,2
[hadoop@hadoop1 -3 kafka_2.10 -0 .8.1 .1 ]$ bin/kafka-topics.sh --describe --zookeeper 192.168 .2.120 : 2181 --topic topictest
Topic :topictest PartitionCount : 1 ReplicationFactor : 3 Configs :
Topic : topictest Partition : 0 Leader : 1 Replicas : 1 ,0 ,2 Isr : 1 ,0 ,2
[hadoop@hadoop1 -3 kafka_2.10 -0 .8.1 .1 ]$
5、查看topic
- . - - - - . . .