设为首页 加入收藏

TOP

kafka(十一):Kafka Java Consumer的实现
2019-04-23 14:31:33 】 浏览:54
Tags:kafka 十一 Kafka Java Consumer 实现

1.参考:

http://kafka.apache.org/081/documentation.html#consumerconfigs
http://kafka.apache.org/081/documentation.html#highlevelconsumerapi
http://kafka.apache.org/081/documentation.html#simpleconsumerapi
http://kafka.apache.org/081/documentation.html#apidesign

2.Consumer参数说明

参数名称

默认参数值

备注

group.id

Consumergroupid值,如果多个Consumergroupid的值一样,那么表示这多个Consumer属于同一个group

zookeeper.connect

Kafka元数据Zookeeper存储的url,和配置文件中的参数一样

consumer.id

消费者id字符串,如果不给定的话,默认自动产生一个随机id

socket.timeout.ms

30000

Consumer连接超时时间,实际超时时间是socket.timeout.ms+max.fetch.wait

socket.receive.buffer.bytes

65536

接收数据的缓冲区大小,默认64kb

fetch.message.max.bytes

1048576

指定每个分区每次获取数据的最大字节数,一般该参数要求比message允许的最大字节数要大,否则可能出现producer产生的数据consumer没法消费

num.consumer.fetchers

1

Consumer获取数据的线程数量

auto.commit.enable

true

是否自动提交offset偏移量,默认为true(自动提交)

auto.commit.interval.ms

60000

自动提交offset偏移量的间隔时间

rebalance.max.retries

4

当一个新的Consumer添加到ConsumerGroup的时候,会触发数据消费的rebalance操作;rebalance操作可能会失败,该参数的主要作用是设置rebalance的最大重试次数

fetch.min.bytes

1

一个请求最少返回记录大小,当一个请求中的返回数据大小达到该参数的设置值后,记录数据返回到consumer

fetch.wait.max.ms

100

一个请求等待数据返回的最大停留时间

rebalance.backoff.ms

2000

rebalance重试过程中的间隔时间

auto.offset.reset

largest

指定consumer消费kafka数据的时候offset初始值是啥,可选参数:largestsmallestsmallest指该consumer的消费offset是当前kafka数据中的最小偏移量;largest指该consumer的消费offset是当前kafka数据中的最大偏移量

consumer.timeout.ms

-1

给定当consumer多久时间没有消费数据后,抛出异常;-1表示不抛出异常

zookeeper.session.timeout.ms

6000

zk会话时间

zookeeper.connection.timeout.ms

6000

连接zk过期时间

3.Kafka提供了两种Consumer API

(1)High Level Consumer API:将底层具体获取数据、更新offset、设置偏移量等操作屏蔽掉,直接操作数据流的处理工作。优点是:操作简单;缺点:可操作性太差,无法按照自己的业务场景选择处理方式。(类:ConsumerConnector)

(2)Lower Level Consumer API:通过直接操作底层API获取数据的方式获取Kafka中的数据,需要自行给定分区、偏移量等属性。优点:可操作性强;缺点:代码相对而言比较复杂。(类:SimpleConsumer)

4.代码

package _0807ProducerSelf;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class ConsumerDemo {
    private ConsumerConnector connector = null;
    private String topicName = null;
    private int numThreads = 0;

    public ConsumerDemo(String groupId, String zookeeperUrl, boolean largest, String topicName, int numThreads) {
        this.topicName = topicName;
        this.numThreads = numThreads;

        // 1. 给定Consumer连接的相关参数
        Properties props = new Properties();
        // a. 给定group id
        props.put("group.id", groupId);
        // b. 给定zk连接url
        props.put("zookeeper.connect", zookeeperUrl);
        // c. 给定自动提交offset偏移量间隔时间修改为2s(默认60s)
        props.put("auto.commit.interval.ms", "2000");
        // d. 给定初始化consumer时候的offset值(该值只有在第一次consumer消费数据的时候有效 --> 只要zk中保存了该consumer的offset偏移量信息,那么该参数就无效了)
        if (largest) {
            props.put("auto.offset.reset", "largest");
        } else {
            props.put("auto.offset.reset", "smallest");
        }
        // 2. 创建Consumer上下文
        ConsumerConfig config = new ConsumerConfig(props);
        // 3. 创建Consumer连接器
        this.connector = Consumer.createJavaConsumerConnector(config);
    }

    public void shutdown() {
        if (this.connector != null) {
            // 当调用shutdown后,KafkaStream所产生的ConsumerIterator迭代器就没有数据了
            this.connector.shutdown();
        }
    }

    public void run() {
        // TODO: topicCountMap给定消费者消费的Topic名称以及消费该Topic使用多少个线程进行数据消费操作;一个消费者可以消费多个Topic的数据 ==> key为topic名称,value为该topic数据消费的线程数
        final Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topicName, numThreads);
        Decoder<String> keyDecoder = new StringDecoder(new VerifiableProperties());
        Decoder<String> valueDecoder = new StringDecoder(new VerifiableProperties());

        // 2. 根据参数创建数据读取流
        // TODO: 该API返回的集合中的数据是一个以Topic名称为Key,以该Topic的读取数据流集合为Value的一个Map集合;
        // TODO: List<KafkaStream<String, String>> ==> 指的其实就是对应Topic消费数据的流,该List集合中的流对象数目和给定的topicCountMap中该topic对应的count值一样
        // TODO: List<KafkaStream<String, String>> ==> 如果一个Topic有多个分区,而且在topicCountMap中该topic给定的count值大于分区数,那么其实表示一个KafkaStream流消费一个Topic分区的数据;这里类似Kafka的Consumer Group Rebalance ===> 一个分区的数据只允许一个KafkaStream消费,但是一个KafkaStream可以消费多个分区的数据(>=0)
        Map<String, List<KafkaStream<String, String>>> consumerStreamsMap = this.connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        // 3. 获取对应Topic的数据消费流
        List<KafkaStream<String, String>> streams = consumerStreamsMap.get(topicName);

        // 4. 数据消费
        int k = 0;
        for (final KafkaStream<String, String> stream : streams) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int count = 0;
                    String threadNames = Thread.currentThread().getName();
                    ConsumerIterator<String, String> iter = stream.iterator();
                    while (iter.hasNext()) {
                        // 获取数据
                        MessageAndMetadata<String, String> messageAndMetadata = iter.next();

                        // 处理数据
                        StringBuilder sb = new StringBuilder();
                        sb.append("线程").append(threadNames);
                        // 1. 获取元数据
                        long offset = messageAndMetadata.offset();
                        int partitionID = messageAndMetadata.partition();
                        String topicName = messageAndMetadata.topic();
                        // TODO: 元数据存储,方便做容错
                        // TODO: 这里可以将元数据保存zk/redis/mysql...(可以考虑一下怎么保存)
                        sb.append(";元数据=>[").append("offset=").append(offset).append("; partitionID=").append(partitionID).append("; topicName=").append(topicName).append("]");

                        // 2. 获取消息(key/value键值对)
                        String value = messageAndMetadata.message();
                        String key = messageAndMetadata.key();
                        sb.append("; 消息=>[key=").append(key).append("; value=").append(value).append("]");

                        System.out.println(sb.toString());
                        count++;
                    }
                    System.out.println("线程" + threadNames + "总共消费数据" + count + "条!!!");
                }
            }, "Thread-[" + k + "]-[" + topicName + "]").start();
            k++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        String groupId = "170505_2";
        String zookeeperUrl = "bigdata.ibeifeng.com:2181/kafka08";
        boolean largest = true;
        String topicName = "beifeng1";
        int numThreads = 1;
        ConsumerDemo demo = new ConsumerDemo(groupId, zookeeperUrl, largest, topicName, numThreads);
        demo.run();

        // 休息一段数据后关闭
        Thread.sleep(2000);

        demo.shutdown();
    }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark streaming读取kafka示例 下一篇rabbitmq转存入Kafka

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目