设为首页 加入收藏

TOP

了解kafka
2019-04-24 02:34:10 】 浏览:71
Tags:了解 kafka

一:kafka简介
Kafka 是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务,可以简单理解为一个消息中间件。
二:kafka的特点
1:分布式
kafka的producer,consumer,broker都是分布式的,可水平扩展,无需停机。
2:持久化
kafka将日志持久化到磁盘,通过将消息持久化到磁盘(磁盘线性读写)以及它的replication机制,保证数据的不丢失,通常一个topic会有多个分区,不同的分区分布在不同的server上,而每个分区又可以有多个副本,这样既提高了并行处理的能力,又保证了消息的可靠,因为越多的partition意味着可以接受更多的consumer的pull请求。
3:高吞吐
kafka采用磁盘直接进行线性读写而不是随机读写,大大提高了起出来读写请求的速度。
4、kafka消息保障:
1.最多一次 --- 消息可能丢失,但永远不会重发。
我们知道所有的副本都有相同的日志相同的偏移量,当某个消费者在日志中保存了其消费消息的offest但是在消费之前崩溃了,那么其它进程来接管消费的时候,就会获取offest位置之后的消息开始消费,而前面未真正消费的消息就会丢失,这就是‘最多一次’语义
2.至少一次---消息绝不会丢失,但有可能重复
当某个消费者消费了某条消息,但是在日志中保存了其消费消息的offest之前崩溃了,那么其它进程来接管消费的时候,就会消费到已经被消费的消息。
而前面未真正消费的消息就会丢失,这就是‘最多一次’语义
3.恰好一次---每条消息保障只会被传递一次
实现恰好一次语义可以考虑二阶段提交或着让消费者的存储和输出的偏移量用同一个位置
三:kafka的核心概念
Producer 特指消息的生产者
Consumer 特指消息的消费者
Consumer Group 消费者组,可以并行消费Topic中partition的消息
Broker:Kafka 集群中的一台服务器称为一个 broker。
Topic:kafka处理的消息分类,一个消息分类就是一种topic。
Partition:Topic的分区,一个 topic可以分为多个
partition,每个partition是一个有序的队列,分区里面的消息都是按接收的顺序追加的且partition中的每条消息都会被分配一个有序的id(offset)。
Producers:消息和数据生产者,向 Kafka 的一个 topic发布消息的过程叫做 producers。
Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
3.1 kafka的Producers
在kafka中,生产的消息像kafka的topic发送的过程叫做producers,Producer能够决定将此消息发送到topic的哪个分区下面,可以通过配置文件配置也可在api里显示指定,其支持异步批量发送,批量发送可以有效的提高发送效率,先将消息缓存,然后批量发送给topic,减少网络IO.
3.2 kafka的Consumers
在kafka中,一个分区的消息可以被一个消费者组中一个consumer消费,但是一个consumer可以消费多个分区的消息。如果多个消费者同时在一个消费者组中,那么kafka会以轮询的方式,让消息在消费者之间负载均衡
如果不同的消费者存在不同的消费者组中,这就有点像zookeeper里面的发布-订阅模式,不同组的消费者可以同时消费某个分区的消息。
需要注意的是,kafka只能给我们保证某个分区的消息是按顺序被消费的,但它不能保证不同分区的消费按一定顺序。
3.3 kafka的broker
我们可以理解为一台机器就是一个broker,我们发送的消息(message)日志在broker中是以append的方式追加,并且broker会将我们的消息暂时的buffer起来,根据我们的配置,当消息的大小或者是个数达到了配置的值,broker会将消息一次性的刷新到磁盘,有效降低了每次消息磁盘调用的IO次数。kafka中的broker没有状态,如果一个broker挂掉,这里面的消息也会丢,由于broker的无状态,所以消息的消费都记录在消费者那,并不记录在broker。已经被消费了的消息会根据配置在保存一定时间后自动删除,默认是7天。
3.4 kafka的message
在kafka中,一条message消息由几部分组成,offest代表消息的偏移量,MessageSize表示消息的大小,data代表了消息的具体内容,kafka在记录message的时候,还会每隔一定的字节建立一个索引,当消费者需要消费指定某条消息的时候,kafka采用二分法查找索引的位置从而找到你需要消费的消息.
四:kafka常用命令
创建主题

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions
4 --topic test

查询集群描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181

生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

显示某个组的消费者详情

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

平衡leader

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port

查看某个分区的log和index

 bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /bigdata/kafka/kafka-logs/test_find_2-0/00000000000000000000.log  --print-data-log


五:kafka在zookeeper中的节点存储结构
zookeeper在kafka中扮演了举足轻重的作用(0.9版本后offest不放在zk上,由kafka内部topic自己保存),kakfa的 broker,消费者等相关的信息都存在zk的节点上,zk还提供了对kafka的动态负载均衡等机制,下面我们一一介绍
5.1:broker注册
在kafka中,每当一个broker启动后,会在zk的节点下存储相关的信息,这是一个临时节点(如果不清楚zk的节点类型,可以参考其官网介绍),所以当某个broker宕掉后,其对应的临时节点会消失.

[zk: ip:2181(CONNECTED) 0] ls /brokers
[seqid, topics, ids]
[zk:
ip:2181(CONNECTED) 1] ls /brokers/topics
[testKafka, __consumer_offsets,
test, consumer_offsets, myfirstTopic, first_topic, kafka_first_topic]
[zk: ip:2181(CONNECTED) 2] ls /brokers/ids
[2, 1, 0]

可以看到,brokers节点下存储目前有多少台broker,该broker下有哪些topic,每个broker可以通过get来获取详细信息

[zk: 192.168.8.88:2181(CONNECTED) 0] get /brokers/ids/2
{"jmx_port":-1,"timestamp":"1461828196728","endpoints":["PLAINTEXT://192.168.8.88:9094"],"host":"192.168.8.88","version":2,"port":9094}
cZxid = 0x490000006b
ctime = Thu Apr 28 19:23:16 GMT+12:00 2016
mZxid =
0x490000006b
mtime = Thu Apr 28 19:23:16 GMT+12:00 2016
pZxid =
0x490000006b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x3545bbe83450003
dataLength = 135
numChildren = 0
[zk:
192.168.8.88:2181(CONNECTED) 1]

这里面有每个broker的ip,port,版本等信息,每个topic的分区消息分布在不同的broker上,

[zk: 192.168.8.88:2181(CONNECTED) 1] get
/brokers/topics/kafka_first_topic
{"version":1,"partitions":{"2":[0,1,2],"1":[2,0,1],"0":[1,2,0]}}
cZxid =
0x4100000236
ctime = Sat Apr 16 04:48:10 GMT+12:00 2016
mZxid =
0x4100000236
mtime = Sat Apr 16 04:48:10 GMT+12:00 2016
pZxid =
0x410000023a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 64
numChildren = 1
[zk: 192.168.8.88:2181(CONNECTED) 2]

意思是说,kafka_first_topic这个topic有3个分区,每个不同的分区有3个备份,分别备份在broker id为0,1,2的机器上
5.2 消费者注册
前面已经说到,消费者有消费者组的概念,kafka会为每个消费者组分配一个唯一的ID,也为会每个消费者分配一个ID,

[zk: 192.168.8.88:2181(CONNECTED) 40] ls
/consumers/console-consumer-85351/ids
[console-consumer-85351_hadoop-1461831147033-3265fdb3]

意思是在消费者组console-consumer-85351下有一个id为如上的消费者,而一个消费者组里面的某个消费者消费某个分区的消息,在zk中是这样记录的

[zk: 192.168.8.88:2181(CONNECTED) 4]
get /consumers/console-consumer-85351/owners/kafka_first_topic/2
console-consumer-85351_hadoop-1461831147033-3265fdb3-0
cZxid =
0x490000010f
ctime = Thu Apr 28 20:12:28 GMT+12:00 2016
mZxid =
0x490000010f
mtime = Thu Apr 28 20:12:28 GMT+12:00 2016
pZxid =
0x490000010f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x1545bbe6fb80015
dataLength = 54
numChildren = 0

表示在消费者组console-consumer-85351的topic为kafka_first_topic的第二个分区下,有一个消费者id为 console-consumer-85351_hadoop-1461831147033-3265fdb3-0正在消费
每个topic有不同的分区,每个分区存储了消息的offest,当消费者重启后能够从该节点记录的值后开始继续消费

[zk:
192.168.8.88:2181(CONNECTED) 37] get
/consumers/console-consumer-85351/offsets/kafka_first_topic/0
38
cZxid =
0x4900000117
ctime = Thu Apr 28 20:13:27 GMT+12:00 2016
mZxid =
0x4900000117
mtime = Thu Apr 28 20:13:27 GMT+12:00 2016
pZxid =
0x4900000117
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 2
numChildren = 0

表示分区0的消息的消费偏移到了38这个位置,需要注意的是,这是一个临时节点,当我关了消费者经常,你会发现在consumers里面没有刚才那个消费者组了.
5.3 controller
zookeeper节点contorller主要存储的是中央控制器(可以理解为leader)所在机器的信息,下面表示此集群leader是broker id为0这台机器

[zk: 192.168.8.88:2181(CONNECTED) 33] get /controller
{"version":1,"brokerid":0,"timestamp":"1461828122648"}
cZxid =
0x4900000007
ctime = Thu Apr 28 19:22:02 GMT+12:00 2016
mZxid =
0x4900000007
mtime = Thu Apr 28 19:22:02 GMT+12:00 2016
pZxid =
0x4900000007
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x2545bbe6fda0000
dataLength = 54
numChildren = 0
[zk:
192.168.8.88:2181(CONNECTED) 34]


5.4:controller_epoch
zookeeper里面的controller_epoch存储的是leader选举的次数,比如一开始只有broker0这台机器,后面加入了broker1,那么就会重新进行leader选择,次数就会+1,这样依次类推

[zk: 192.168.8.88:2181(CONNECTED) 2] get /controller_epoch
72
cZxid =
0x3700000014
ctime = Tue Apr 05 19:09:47 GMT+12:00 2016
mZxid =
0x4900000008
mtime = Thu Apr 28 19:22:02 GMT+12:00 2016
pZxid =
0x3700000014
cversion = 0
dataVersion = 71
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 2
numChildren = 0

5.5 动态负载均衡
5.5.1消费者负载均衡
消费者在注册的时候,会使用zookeeper的watcher监听机制来实现对消费者分组里的各个消费者,以及broker节点注册监听,
一旦某个消费者组里的某个消费者宕掉,或者某个broker宕掉,消费者会收到了事件监听回复,就会根据需要触发消费者负载均衡。
5.5.2生产这负载均衡
生产者在将消息发送给broker的时候,会注册watcher监听,监听broker节点的变化,每个topic的新增和减少,以便合理的发送消息到broker.生产者可以将将消息随机或者以hash(key)或者指定某个分区发送,在客户端控制消息的负载均衡。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇用嵌套的两个kafka实现突发性高并.. 下一篇Kafka跨集群同步工具——MirrorMa..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目