设为首页 加入收藏

TOP

说说 MQ 之 Kafka(二)(三)
2018-10-28 10:11:10 】 浏览:505
Tags:说说 Kafka
ommit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3")); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset()); } } } }

消费者还有旧的 API,比如 Consumer 和 SimpleConsumer API,这些都可以从 Kafka 代码的 kafka-example 中找到,上述的两个例子也是改写自 kafka-example。使用新旧 API 在功能上都能满足消息收发的需要,但新 API 只依赖 kafka-clients,打包出来的 jar 包会小很多,以我的测试,新 API 的消费者 jar 包大约有 2M 左右,而旧 API 的消费者 jar 包接近 16M。

其实,Kafka 也提供了按分区订阅,可以一次订阅多个分区 TopicPartition[];也支持手动提交 offset,需要调用 consumer.commitSync

Kafka 似乎没有公开 Topic 创建以及修改的 API(至少我没有找到),如果生产者向 Broker 写入的 Topic 是一个新 Topic,那么 Broker 会创建这个 Topic。创建的过程中会使用默认参数,例如,分区个数,会使用 Broker 配置中的 num.partitions 参数(默认1);副本个数,会使用 default.replication.factor 参数。但是通常情况下,我们会需要创建自定义的 Topic,那官方的途径是使用 Kafka 的工具。也有一些非官方的途径 7,例如可以这样写,

String[] options = new String[]{
        "--create",
        "--zookeeper",
        "192.168.232.23:2181",
        "--partitions",
        "2",
        "--replication-factor",
        "3",
        "--topic",
        "topic1"
};
TopicCommand.main(options);

但是这样写有一个问题,在执行完 TopicCommand.main(options); 之后,系统会自动退出,原因是执行完指令之后,会调用 System.exit(exitCode); 系统直接退出。这样当然不行,我的办法是,把相关的执行代码挖出来,写一个 TopicUtils 类,如下,

import joptsimple.OptionSpecBuilder;
import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand$;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import scala.runtime.Nothing$;
public class TopicUtils {
    // from: http://blog.csdn.net/changong28/article/details/39325079
    // from: http://www.cnblogs.com/davidwang456/p/4313784.html
    public static void createTopic(){
        String[] options = new String[]{
                "--create",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--partitions",
                "2",
                "--replication-factor",
                "3",
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void listTopic(){
        String[] options = new String[]{
                "--list",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void deleteTopic(){
        String[] options = new String[]{
                "--delete",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--topic",
                KafkaProperties.TOPIC
        };
//
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Git内部原理之Git引用 下一篇说说 MQ 之 Kafka(一)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目