设为首页 加入收藏

TOP

Kafka安装、原理、使用
2019-05-09 02:14:46 】 浏览:69
Tags:Kafka 安装 原理 使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zimiao552147572/article/details/89818714

大数据组件使用 总文章

1.Apache Kafka

 1.Apache Kafka 是一个开源消息系统,由 Scala 写成。是由 Apache 软件基金会开发的一个开源消息系统项目。
 2.Kafka 最初是由 LinkedIn 开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。
  该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
 3.Kafka 是一个分布式消息队列(Queue):生产者、消费者的功能。它提供了类似于 JMS 的特性,但是在设计实现上完全不同,此外它并不是 JMS 规范的实现。
 4.Kafka 对消息保存时根据 Topic(主题) 进行归类
 5.发送消息者称为 Producer,消息接受者称为 Consumer
 6.此外 kafka 集群有多个 kafka 实例组成,每个实例(server)成为 broker。
 7.无论是 kafka 集群, 还是 producer 和 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性


2.JMS 是什么

JMS(JAVAMessage Service,java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

 1.JMS 的基础
  JMS 是什么:JMS 是 Java 提供的一套技术规范
  JMS 干什么用:用来异构系统 集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活
  通过什么方式:生产消费者模式(生产者、服务器、消费者)

 2.JMS 消息传输模型
  1.点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
   点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。
   这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

  2.发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
   发布订阅模型则是一个基于推送的消息传送模型。
   发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,
   即使当前订阅者不可用,处于离线状态。

 3.JMS 核心组件
  1.Destination:消息发送的目的地,也就是前面说的 队列(Queue) 或 Topic(主题)。
  2.Message:从字面上就可以看出是被发送的消息。
  3.Producer:消息的生产者,要发送一个消息,必须通过这个生产者来发送。
  4.MessageConsumer:与生产者相对应,这是消息的消费者或接收者,通过它来接收一个消息。

 4.常见的类 JMS 消息服务器
  1.JMS 消息服务器 ActiveMQ
   ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1 和 J2EE 1.4 规范的。
   主要特点:
    1.多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    2.完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA 消息,事务)
    3.对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去,而且也支持Spring2.0 的特性
    4.通过 常见 J2EE 服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过 JCA1.5 resource adaptors 的配置,
     可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 商业服务器上
    5.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    6.支持通过 JDBC 和 journal 提供高速的消息持久化
    7.从设计上保证了高性能的集群,客户端-服务器,点对点
    8.支持 Ajax
    9.支持与 Axis 的整合
    10.可以很容易得调用内嵌 JMS provider,进行测试

  2.分布式消息中间件 Metamorphosis
   Metamorphosis (MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于 LinkedIn的 Kafka,
   具有消息存储顺序写、吞吐量大和支持本地和 XA 事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,
   在淘宝和支付宝有着广泛的应用,现已开源。
   主要特点:
    生产者、服务器和消费者都可分布
    消息存储顺序写
    性能极高,吞吐量大
    支持消息顺序
    支持本地和 XA 事务
    客户端 pull,随机读,利用 sendfile 系统调用,zero-copy ,批量拉数据
    支持消费端事务
    支持消息广播模式
    支持异步发送消息
    支持 http 协议
    支持消息重试和 recover
    数据迁移、扩容对用户透明
    消费状态保存在客户端
    支持同步和异步复制两种 HA
    支持 group commit

  3.分布式消息中间件 RocketMQ
   RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
    能够保证严格的消息顺序
    提供丰富的消息拉取模式
    高效的订阅者水平扩展能力
    实时的消息订阅机制
    亿级消息堆积能力
    Metaq3.0 版本改名,产品名称改为 RocketMQ

  4.其他 MQ
   .NET 消息中间件 DotNetMQ
   基于 HBase 的消息队列 HQueue
   Go 的 MQ 框架 KiteQ
   AMQP 消息服务器 RabbitMQ
   MemcacheQ 是一个基于 MemcacheDB 的消息队列服务器。


3.为什么需要消息队列
消息系统的核心作用就是三点:解耦、异步、并行
以用户注册的案列来说明消息系统的作用

1.用户注册的一般流程


2.用户注册的并行执行


3.用户注册的最终一致


4.Kafka 核心组件
Topic(主题):消息根据 Topic 进行归类
Producer:发送消息者
Consumer:消息接受者
broker(中间人):kafka cluster(kafka群)中的 每个 kafka 实例(server)
Zookeeper:依赖集群保存 meta 信息。


5.Kafka 集群部署

 1.集群部署的基本流程
  下载安装包、解压安装包、修改配置文件、分发安装包、启动集群
 
 2.集群部署的基础环境准备
  1.安装前的准备工作(zk 集群已经部署完毕)
  2.关闭防火墙:chkconfig iptables off && setenforce 0
  3.创建用户:
   groupadd 组名 && useradd 用户名 && usermod -a -G 组名 用户名
   groupadd realtime && useradd realtime && usermod -a -G realtime realtime
  4.创建工作目录并赋权
   mkdir /export
   mkdir /export/servers
   chmod 755 -R /export
  5.切换到 realtime 用户下:su realtime

 3.Kafka 集群部署
  1.下载安装包
   下载地址:http://kafka.apache.org/downloads.html
   在 linux 中使用 wget 命令下载安装包:wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz


2.解压安装包
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/
mv kafka_2.11-1.0.0 kafka

3.修改配置文件

   cp /export/servers/kafka/config/server.properties /export/servers/kafka/config/server.properties.bak
   vi /export/servers/kafka/config/server.properties
   输入以下内容:
    #broker 的全局唯一编号,不能重复
    broker.id=0
    #用来监听链接的端口,producer 或 consumer 将在此端口建立连接
    port=9092
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志存放的路径
    log.dirs=/export/servers/logs/kafka
    #topic(主题) 在当前 broker(中间人) 上的分片个数
    num.partitions=2
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #滚动生成新的 segment 文件的最大时间
    log.roll.hours=168
    #日志文件中每个 segment 的大小,默认为 1G
    log.segment.bytes=1073741824
    #周期性检查文件大小的时间
    log.retention.check.interval.ms=300000
    #日志清理是否打开
    log.cleaner.enable=true
    #broker 需要使用 zookeeper 保存 meta 数据
    zookeeper.connect=192.168.52.106:2181,192.168.52.107:2181,192.168.52.108:2181
    #zookeeper 链接超时时间
    zookeeper.connection.timeout.ms=6000
    #partion buffer 中,消息的条数达到阈值,将触发 flush 到磁盘
    log.flush.interval.messages=10000
    #消息 buffer 的时间,达到阈值,将触发 flush 到磁盘
    log.flush.interval.ms=3000
    #删除 topic,需要 server.properties 中设置 delete.topic.enable=true,否则只是标记删除
    delete.topic.enable=true
    #此处的 host.name 为本机 IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
    host.name=kafka01

4.分发安装包
1.scp -r /export/servers/kafka kafka02:/export/servers
然后分别在各机器上创建软连
cd /export/servers/

2.scp -r /export/servers/kafka kafka03:/export/servers
然后分别在各机器上创建软连
cd /export/servers/

5.再次修改配置文件(重要)
依次修改各服务器上配置文件的的 broker.id,分别是 0,1,2 不得重复。

6.启动集群

   1.依次在各节点上启动 kafka
    nohup /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &
   2.输出错误日志到黑洞
    command >/dev/null 2>&1 &


6.Kafka 常用操作命令

 1.查看当前服务器中的所有 topic
  bin/kafka-topics.sh --list --zookeeper zk01:2181

 2.创建 topic
  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test

 3.删除 topic
  bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
  需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除或者直接重启。

 4.通过 shell 命令发送消息
  bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test

 5.通过 shell 消费消息
  bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test

 6.查看消费位置
  bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

 7.查看某个 Topic 的详情
  bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181

 8.对分区数进行修改
  bin/kafka-topics.sh --zookeeper zk01 --alter --partitions 2 --topic test

7.Kafka JavaAPI

 1.Kafka 生产者
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.11.0.1</version>
  </dependency>

  public static void main(String[] args)
  {
   //1、准备配置文件
   Properties props = new Properties();
   props.put("bootstrap.servers", "node01:9092");
   props.put("acks", "all");
   props.put("retries", 0);
   props.put("batch.size", 16384);
   props.put("linger.ms", 1);
   props.put("buffer.memory", 33554432);
   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   //2、创建 Kafka Producer 生产者
   KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(props);
   for (int i=0;i<100;i++)
   {
    //3、发送数据
    kafkaProducer.send(new ProducerRecord<String, String>("test","num"+i,"value"+i));
   }
  }

2.Kafka 消费者

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.11.0.1</version>
  </dependency>

  public static void main(String[] args)
  {
   // 1、准备配置文件
   Properties props = new Properties();
   props.put("bootstrap.servers", "node01:9092");
   props.put("group.id", "test");
   props.put("enable.auto.commit", "true");
   props.put("auto.commit.interval.ms", "1000");
   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   // 2、创建 Kafka Consumer 消费者
   KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
   // 3、订阅数据,这里的 topic(主题) 可以是多个,可用List封装
   kafkaConsumer.subscribe(Arrays.asList("test"));
   // 4、获取数据
   while (true)
   {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
     System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",
        record.topic(), record.offset(), record.key(), record.value());
    }
   }
  }

8.Kafka 整体结构图

 1.Producer:消息生产者,就是向 kafka broker(中间人) 发消息的客户端
 2.Consumer:消息消费者,向 kafka broker(中间人) 取消息的客户端
 3.Topic:主题
 4.Consumer Group(CG)消费者组:
  这是 kafka 用来实现一个 topic(主题) 消息的广播(发给所有的 consumer消费者)和 单播(发给任意一个 consumer消费者)的手段。
  一个 topic(主题) 可以有多个 CG消费者组。topic(主题) 的消息会复制(不是真的复制,是概念上的)到所有的 CG消费者组,
  但每个 partion(分区) 只会把消息发给该 CG 中的一个consumer消费者。
  如果需要实现广播,只要每个 consumer消费者 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。
  用 CG消费者组 还可以将 consumer消费者 进行自由的分组,而不需要多次发送消息到不同的 topic(主题)。
 5.Broker:一台 kafka 服务器就是一个 broker(中间人)。一个kafka集群(cluster) 由多个 broker(中间人) 组成。一个 broker 可以容纳多个 topic(主题)。
 6.Partition(分区):
  为了实现扩展性,一个非常大的 topic(主题) 可以分布到多个 broker(即服务器)上,一个topic(主题) 可以分为多个 partition(分区),
  每个 partition(分区) 是一个有序的队列(Queue)。partition(分区) 中的每条消息都会被分配一个有序的 id(offset)。
  kafka 只保证按一个 partition(分区) 中的顺序 将消息发给 consumer消费者,不保证一个 topic 的整体(多个 partition分区 间)的顺序
 7.Offset:
  kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。
  例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是00000000000.kafka
 8.Replication(复制):
  Kafka 支持以 Partition(分区) 为单位对 Message 进行冗余备份,每个 Partition(分区) 都可以配置至少 1 个 Replication(复制),
  当仅只有 1 个 Replication(复制) 时 即仅该 Partition(分区) 本身。
 9.Leader:
  每个 Replication(复制) 集合中的 Partition(分区) 都会选出一个唯一的 Leader,所有的 读写请求 都由 Leader 处理。
  其他 Replicas(复数的复制品) 从 Leader 处把数据更新同步到本地,过程类似大家熟悉的 MySQL中的 Binlog 同步。
  每个 Cluster(集群) 当中会选举出一个 Broker(中间人) 来担任 Controller,负责处理 Partition(分区) 的 Leader 选举,协调 Partition(分区) 迁移等工作。

 10.ISR(In-Sync Replica):
  是 Replicas(复制品) 的一个子集,表示目前 Alive 且与 Leader 能够“Catch-up(追赶)”的 Replicas(复制品) 集合。
  由于读写都是首先落到 Leader 上,所以一般来说通过同步机制从 Leader 上拉取数据的 Replica(复制品) 都会和 Leader 有一些延迟
  (包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该 Replica(复制品) 踢出 ISR。每个 Partition(分区) 都有它自己独立的 ISR。

9.Kafka 配置文件详解


10.Consumer消费者 与 topic主题 关系

 1.本质上 kafka 只支持 Topic主题
 2.每个 group 中可以有多个 consumer消费者,每个 consumer消费者 属于一个 consumer group;
  通常情况下,一个 group 中会包含多个 consumer,这样不仅可以提高 topic主题 中消息的并发消费能力,而且还能提高"故障容错"性,
  如果 group 中的某个 consumer消费者 失效,那么其消费的 partitions分区 将会有其他 consumer 自动接管。
 3.对于 Topic主题 中的一条特定的消息,只会被订阅此 Topic主题 的每个 group 中的其中一个 consumer消费者,此消息不会发送给一个 group 的多个 consumer;
  那么一个 group 中所有的 consumer消费者 将会交错的消费整个 Topic主题,每个 group 中 consumer 消息消费 互相独立,我们可以认为一个 group 是一个"订阅"者。
 4.在 kafka 中,一个 partition分区 中的消息只会被 group 中的一个 consumer 消费(同一时刻);
  一个 Topic主题 中的每个 partions分区,只会被一个"订阅者"中的一个 consumer 消费,不过一个 consumer 可以同时消费多个 partitions分区 中的消息。
 5.kafka 的设计原理决定,对于一个 topic主题, 同一个 group 中不能有多于 partitions分区 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息。
 6.kafka 只能保证一个 partition分区 中的消息被某个 consumer 消费时是顺序的;事实上,从 Topic主题 角度来说,当有多个 partitions分区 时,消息仍不是全局有序的。


11.Kafka 消息的分发

 1.Producer 客户端负责消息的分发
 2.kafka 集群中的任何一个 broker中间者 都可以向 producer 提供 metadata 信息,这些 metadata 中包含"集群中存活的 servers 列表" / "partitions分区 leader 列表"等信息;
 3.当 producer 获取到 metadata 信息之后, producer 将会和 Topic 下所有 partition分区 leader 保持socket 连接;
 4.消息由 producer 直接通过 socket 发送到 broker中间者,中间不会经过任何"路由层",事实上,消息被路由到哪个 partition分区 上由 producer 客户端决定;
  比如可以采用"random"、"key-hash"、"轮询"等,如果一个 topic主题 中有多个 partitions分区,那么在 producer 端实现"消息均衡分发"是必要的。
 5.在 producer 端的配置文件中,开发者可以指定 partition 路由的方式。
 6.Producer 消息发送的应答机制
  设置发送数据是否需要服务端的反馈,有三个值 0、1、-1
   0:producer 不会等待 broker中间者 发送 ack(命令正确应答)
   1:当 leader 接收到消息之后发送 ack(命令正确应答)
   -1:当所有的 follower 都同步消息成功后发送 ack(命令正确应答)
  request.required.acks=0


12.Consumer消费者 的负载均衡

 当一个 group 中,有 consumer消费者 加入或者离开时,会触发 partitions分区 均衡。均衡的最终目的,是提升 topic主题 的并发消费能力,步骤如下:
 1.假如 topic主题 具有如下 partitions分区:P0、P1、P2、P3
 2.加入 group 中,有如下 consumer消费者:C1、C2
 3.首先根据 partition分区 索引号对 partitions分区 排序:P0、P1、P2、P3
 4.根据 consumer.id 排序:C0、C1
 5.计算倍数:M = [P0,P1,P2,P3].size / [C0,C1].size,本例值 M=2(向上取整)
 6.然后依次分配 partitions分区:C0 = [P0,P1],C1 = [P2,P3],即 Ci = [P(i * M),P((i + 1) * M -1)]


13.Kafka 文件存储机制

1.Kafka 文件存储基本结构
1.在Kafka文件存储中,同一个 topic主题 下有多个不同 partition分区, 每个 partition分区 为一个目录, partiton分区 命名规则为 topic主题名称 + 有序序号,
第一个 partiton分区 序号从 0 开始,序号最大值为 partitions分区 数量减 1。
2.每个 partion分区(目录) 相当于一个巨型文件被平均分配到多个 大小相等 segment(段)数据文件中。
但每个段 segment file 消息数量不一定相等,这种特性方便 old segment file 快速被删除。
默认保留 7 天的数据。

3.每个 partiton分区 只需要支持顺序读写就行了,segment 文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)
问:数据有序的讨论?一个 partition分区 的数据是否是有序的?
答:间隔性有序,不连续针对一个 topic主题 里面的数据,只能做到 partition分区 内部有序,不能做到全局有序。
问:特别加入消费者的场景后,如何保证消费者 消费的数据 全局有序的?
答:伪命题。
问:只有一种情况下才能保证全局有序?
答:就是只有一个 partition。

14.Kafka Partition Segment

1.Segment file 组成:
由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件。

2.Segment 文件命名规则:
partion分区 全局的第一个 segment 从 0 开始, 后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。
数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
3.索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。
下述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。
其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton 表示第 368772 个 message)、以及该消息的物理偏移地址为 497。

4.segment data file 由许多 message 组成,物理结构如下:

15.Kafka 查找 message
1.读取 offset=368776 的 message,需要通过下面 2 个步骤查找。

 2.查找 segment file
  00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为 0
  00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1
  00000000000000737337.index 的起始偏移量为 737338 = 737337 + 1
  其他后续文件依次类推。
  以起始偏移量命名并排序这些文件,只要根据 offset **二分查找**文件列表,就可以快速定位到具体文件。
  当 offset = 368776 时定位到 00000000000000368769.index 和对应 log 文件。

 3.通过 segment file 查找 message
  当 offset=368776 时,依次定位到 00000000000000368769.index 的元数据物理位置 和 00000000000000368769.log 的物理偏移地址,
  然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。


16.Kafka 自定义 Partition
如果指定 partition,就用 partition
如果指定 key,使用 key 进行 hash 取模。
如果没有指定 key,使用轮询的方式。

 public class DefaultPartitioner implements Partitioner
 {
  private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
  public void configure(Map<String, > configs) {}
  /**
  * 计算给定记录的分区
  *
  * @param topic 主题名称
  * @param key 分区上的键(或如果没有键,则为空)
  * @param keyBytes 序列化的分区键(或如果没有键,则为空)
  * @param value 分区或无效的值
  * @param valueBytes 分区 序列化值 或 无效
  * @param cluster 当前集群 元数据
  */
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
  {
   List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
   int numPartitions = partitions.size();
   if (keyBytes == null)
   {
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0)
    {
     int part = Utils.toPositive(nextValue) % availablePartitions.size();
     return availablePartitions.get(part).partition();
    }
    else
    {
     // 没有分区可用,给出非可用分区
     return Utils.toPositive(nextValue) % numPartitions;
    }
   }
   else
   {
    // 散列键值以选择分区
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
   }
  }

  private int nextValue(String topic)
  {
   AtomicInteger counter = topicCounterMap.get(topic);
   if (null == counter)
   {
    counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
    AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
    if (currentCounter != null)
    {
     counter = currentCounter;
    }
   }
   return counter.getAndIncrement();
  }
  public void close() {}
 }

17.Kafka 为什么那么快

 1.Broker中间者
  1.不同于 Redis 和 MemcacheQ 等内存消息队列,Kafka 的设计是把所有的 Message 都要写入速度低容量大的硬盘,以此来换取更强的存储能力。
  2.实际上,Kafka 使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。
   首先,说“规规矩矩”是因为 Kafka 在磁盘上只做 Sequence顺序 I/O,由于消息系统读写的特殊性,这并不存在什么问题。
   关于磁盘 I/O 的性能,引用一组 Kafka 官方给出的测试数据(Raid-5,7200rpm):Sequence I/O: 600MB/s、Random I/O: 100KB/s。
   所以通过只做 Sequence顺序 I/O 的限制,规避了磁盘访问速度低下对性能可能造成的影响。

 2.接下来我们再聊一聊 Kafka 是如何“抄近道的”。
  1.首先,Kafka 重度依赖底层操作系统提供的 Page Cache页面缓存 功能。当上层有写操作时,操作系统只是将数据写入 Page Cache页面缓存,
   同时标记 Page页面 属性为 Dirty脏的。
  2.当读操作发生时,先从 Page Cache页面缓存 中查找,如果发生 缺页才进行磁盘调度,最终返回需要的数据。
   实际上 Page Cache页面缓存 是把尽可能多的空闲内存 都当做了磁盘缓存来使用。
   同时如果有其他进程申请内存,回收 Page Cache页面缓存 的代价又很小,所以现代的 OS 都支持 Page Cache页面缓存。
  3.使用 Page Cache页面缓存 功能同时可以避免在 JVM 内部缓存数据,JVM 为我们提供了强大的 GC(垃圾回收机制) 能力,同时也引入了一些问题不适用与 Kafka 的设计。
   如果在 Heap堆 内管理缓存,JVM 的 GC(垃圾回收机制) 线程会频繁扫描 Heap堆 空间,带来不必要的开销。
   如果 Heap堆过大,执行一次 Full GC 对系统的可用性来说将是极大的挑战。所有在 JVM 内的对象都不免带有一个 Object Overhead(对象开销),
   内存的有效空间利用率会因此降低。
  4.所有的 In-Process Cache进程内缓存 在 OS 中都有一份同样的 Page Cache页面缓存。 所以通过将缓存只放在 Page Cache页面缓存,
   可以至少让可用缓存空间翻倍。
  5.如果 Kafka 重启,所有的 In-Process Cache进程内缓存 都会失效,而 OS 管理的 Page Cache页面缓存 依然可以继续使用。
   Page Cache页面缓存 还只是第一步,Kafka 为了进一步的优化性能还采用了 Sendfile 技术。
   在解释 Sendfile 之前,首先介绍一下传统的网络 I/O 操作流程,大体上分为以下 4 步。
   1.OS 从硬盘 把数据读到 内核区的 Page Cache页面缓存。
   2.用户进程 把 数据 从 内核区 Copy 到 用户区。
   3.然后 用户进程 再把 数据 写入到 Socket,数据 流入 内核区的 Socket Buffer 上。
   4.OS 再把 数据从 Buffer 中 Copy 到 网卡的 Buffer 上,这样完成一次发送。


   整个过程共经历两次 Context Switch上下文切换,四次 System Call系统调用。
   同一份数据在内核 Buffer 与用户 Buffer 之间重复拷贝,效率低下。
   其中 2、3 两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是 Sendfile 所解决的问题,经过 Sendfile 优化后,整个 I/O 过程就变成了下面这个样子。

18.Kafka 最佳实践


19.Kafka 监控工具

 1.Kafka Web Console:监控功能较为全面,可以预览消息,监控 Offset、Lag 等信息,但存在 bug,不建议在生产环境中使用。
 2.Kafka Manager:偏向 Kafka 集群管理,若操作不当,容易导致集群出现故障。对 Kafka 实时生产和消费消息是通过 JMX 实现的。没有记录 Offset、Lag 等信息。
 3.KafkaOffsetMonitor:程序一个 jar 包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。
 4.若只需要监控功能,推荐使用 KafkaOffsetMonito,若偏重 Kafka 集群管理,推荐使用 KafkaManager。
  因为都是开源程序,稳定性欠缺。


20.配置一键启动脚本

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Error while fetching metadata w.. 下一篇        ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目