设为首页 加入收藏

TOP

kafka进击之路(三) ——consumer high API开发
2018-11-30 10:47:16 】 浏览:85
Tags:kafka consumer high API 开发

说明

kafka版本:kafka_2.10-0.8.2.1(kafka0.9.xx版本提供了新的API)
IED环境:intellij14 + maven3.3
语言:java

consumer high API开发

高级别consumer特性

  1. 我们不需要自己管理消息的offset
  2. 高级别consumer会将从特定的partition读到的最后的offset保存至zookeeper, 且存储的offset是基于consumer提供的consumer group的名字
  3. 一个consumer group中的consumer不能重复消费,不同的consumer group之间可以重复消费
  4. consumer group名在整个kafka集群是全部可见的,因此在启动以同一个consumer group的程序时其他老的程序会被关闭。当以相同的consumer group的逻辑消费者启动后,kafka会将线程加入可消费该topic的线程集合并且触发re-balance。此时,kafka会将可用的partition分配给可用的线程,因此如果混合了新的和老的消费逻辑,可能会出现一些消息发给了老的消费逻辑。
  5. 高级别consumer是按topic的partition来进行消费的。如果消费程序线程数大于partition个数,一些线程将永远收不到消息;如果消费程序线程数小于partition个数,一些线程会收到多个partition的消息,并且这些消息从哪个partition消费可能是不确定的。

高级别API需要的配置

配置项 说明
zookeeper.connect zookeeper地址
group.id consumer group name
zookeeper.session.timeout.ms 连接zookeeper的session超时时间
zookeeper.sync.time.ms zookeeper同步时间
auto.commit.interval.ms consumer自动同步offset的间隔时间

程序示例

  1. maven
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
      </dependency>
  1. 程序包组织
    这里写图片描述
  2. 配置文件consumer.properties
#kafka zookeeper config
zookeeper.connect=xxxx:2181,xxxx:2181,xxxx:2181
group.id=consumer-group-name
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
auto.offset.reset=largest
#kafka consumer config
topic=myTopic
threadNum=10
  1. KafkaConfig.java
package kafka.high;

public class KafkaConfig {
    public String topic = null;
    public int threadNum = 0;
}
  1. KafkaMsgTask.java
package kafka.high;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Arrays;

/**
 * 处理kafka消息的线程
 */
public class KafkaMsgTask implements Runnable {
    private static final Logger logger = LogManager.getLogger(KafkaMsgTask.class);

    private KafkaStream stream = null;
    private int threadNum = 0;

    public KafkaMsgTask(KafkaStream stream, int num) {
        this.stream = stream;
        this.threadNum = num;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> iter = stream.iterator();
        while (iter.hasNext()) {
            String message = Arrays.toString(iter.next().message());
            // todo 处理消息逻辑
        }
        logger.info("shutting down thread :" + threadNum);
    }
}
  1. KafkaHighConsumer.java
package kafka.high;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * kafka消费者
 */
public class KafkaHighConsumer {
    private static final Logger logger = LogManager.getLogger(KafkaHighConsumer.class);

    protected ConsumerConnector consumer = null;
    protected ConsumerConfig consumerConfig = null;
    protected ExecutorService executor = null;
    protected KafkaConfig kafkaConfig = null;

    public KafkaHighConsumer() {  }

    /**
     * 初始化,加载consumer配置,topic,线程个数
     * @param confFile
     * @throws Exception
     */
    public void init(String confFile) throws Exception {
        // load config file
        Properties props = new Properties();
        FileInputStream in = new FileInputStream(confFile);
        props.load(in);
        kafkaConfig = new KafkaConfig();
        kafkaConfig.topic = props.getProperty("topic");
        kafkaConfig.threadNum = Integer.parseInt(props.getProperty("threadNum")); // 线程个数可以和partition个数一致
        // init consumer config
        consumerConfig = new ConsumerConfig(props);
        consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        logger.info("KafkaHighConsumer init success");
    }

    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(this.kafkaConfig.topic, kafkaConfig.threadNum);
        // create a list of message streams  for each topic, using the default decoder
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer.createMessageStreams(topicCountMap);
        // 一个topic的数据流
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaConfig.topic);
        // start all thread
        executor = Executors.newFixedThreadPool(kafkaConfig.threadNum);

        int threadNumber = 0;
        // 每个数据流开启一个消费者线程
        for (final KafkaStream stream : streams) {
            executor.submit(new KafkaMsgTask(stream, threadNumber));
            logger.info("start kafka message task thread. threadnum :" + threadNumber);
            threadNumber++;
        }
    }

    public void shutdown() {
        if (consumer != null) {
            consumer.shutdown();
        }
        if (executor != null) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                    logger.warn("timed out waitting for consumer threads to shutdown");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. Main.java
package kafka.high;

public class Main {
    public static void main(String[] args) {
        try {
            KafkaHighConsumer consumer = new KafkaHighConsumer();
            consumer.init("./consumer.properties");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            consumer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

何时使用高级别API开发

当我们不太关心offset,不想自己控制offset的时候,一般对消息的处理要求不严格的时候,可以采用这种方式,简单高效。


更多参考:
1. http://kafka.apache.org/documentation.html#api
2. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇MongoDB和数据流:使用MongoDB作.. 下一篇golang连接kafka消费进ES

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目