Kafka 的工具和编程接口
Kafka 的工具
Kafka 提供的工具还是比较全的,bin/
目录下的工具有以下一些,
bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh
我常用的命令有以下几个,
bin/kafka-server-start.sh -daemon config/server.properties & bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181 bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1 bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1
kafka-server-start.sh
是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties
,该文件中的配置项待会再说.还有一个 -daemon
参数,这个是将 Kafka 放在后台用守护进程的方式运行,如果不加这个参数,Kafka 会在运行一段时间后自动退出,据说这个是 0.10.0.0 版本才有的问题 5。kafka-topics.sh
是用于管理 Topic 的工具,我主要用的 --describe
、--list
、--delete
、--create
这4个功能,上述的例子基本是不言自明的,--replication-factor 3
、--partitions 2
这两个参数分别表示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh
和 kafka-console-producer.sh
是生产者和消费者的简易终端工具,在调试的时候比较有用,我常用的是 kafka-console-consumer.sh
。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的发布版本 3.4.8,端口是默认2181,与 Broker 在同一台机器上。
下面说一下 Broker 启动的配置文件 config/server.properties
,我在默认配置的基础上,修改了以下一些,
broker.id=0 listeners=PLAINTEXT://192.168.232.23:9092 log.dirs=/tmp/kafka-logs delete.topic.enable=true
broker.id
是 Kafka 集群中的 Broker ID,不可重复,我在多副本的实验中,将他们分别设置为0、1、2;listeners
是 Broker 监听的地址,默认是监听 localhost:9092
,因为我不是单机实验,所以修改为本机局域网地址,当然,如果要监听所有地址的话,也可以设置为 0.0.0.0:9092
,多副本实验中,将监听端口分别设置为 9092、9093、9094;log.dirs
是 Broker 的 log 的目录,多副本实验中,不同的 Broker 需要有不同的 log 目录;delete.topic.enable
设为 true 后,可以删除 Topic,并且连带 Topic 中的消息也一并删掉,否则,即使调用 kafka-topics.sh --delete
也无法删除 Topic,这是一个便利性的设置,对于开发环境可以,生产环境一定要设为 false(默认)。实验中发现, 如果有消费者在消费这个 Topic,那么也无法删除,还是比较安全的。
剩下的工具多数在文档中也有提到。如果看一下这些脚本的话,会发现多数脚本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@"
,可见,这些工具都是使用运行 Java Class 的方式调用的。
Kafka 的 Java API
在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,基本上,无论用什么语言开发,都能找到相应的 API。下面说一下 Java 的 API 接口。
生产者的 API 只有一种,相对比较简单,代码如下,
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.ap