设为首页 加入收藏

TOP

kafka JAVA API解析
2018-11-30 10:49:04 】 浏览:59
Tags:kafka JAVA API 解析

原文地址:http://blog.csdn.net/honglei915/article/details/37697655

Kafka Producer APIs

新版的Producer API提供了以下功能:
  1. 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
  2. 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
    interface Encoder<T> {
      public Message toMessage(T data);
    }
  3. 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  4. 通过分区函数kafka.producer.Partitioner类对消息分区
    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }
    分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

新的api完整实例如下:

package com.cuicui.kafkademon;


import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


/**
* @author <a href="mailto:leicui001@126.com">崔磊</a>
* @date 2015年11月4日 上午11:44:15
*/
public class MyProducer {


  public static void main(String[] args) throws InterruptedException {


    Properties props = new Properties();
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);
    props.put("partitioner.class", "com.cuicui.kafkademon.MyPartitioner");
    props.put("request.required.acks", "1");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);


    // 单个发送
    for (int i = 0; i <= 1000000; i++) {
      KeyedMessage<String, String> message =
          new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);
      producer.send(message);
      Thread.sleep(5000);
    }


    // 批量发送
    List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);
    for (int i = 0; i <= 10000; i++) {
      KeyedMessage<String, String> message =
          new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);
      messages.add(message);
      if (i % 100 == 0) {
        producer.send(messages);
        messages.clear();
      }
    }
    producer.send(messages);
  }
}

下面这个是用到的分区函数:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;


public class MyPartitioner implements Partitioner {
  public MyPartitioner(VerifiableProperties props) {


  }


  /*
  * @see kafka.producer.Partitioner#partition(java.lang.Object, int)
  */
  @Override
  public int partition(Object key, int partitionCount) {
    return Integer.valueOf((String) key) % partitionCount;
  }
}


KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

低级别的API

package com.cuicui.kafkademon;


import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;


/**
* offset自己维护 目标topic、partition均由自己分配
*
* @author <a href="mailto:leicui001@126.com">崔磊</a>
* @date 2015年11月4日 上午11:44:15
*
*/
public class MySimpleConsumer {


  public static void main(String[] args) {
    new MySimpleConsumer().consume();
  }


  /**
  * 消费消息
  */
  public void consume() {
    int partition = 0;


    // 找到leader
    Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);


    // 从leader消费
    SimpleConsumer simpleConsumer =
        new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");
    long startOffet = 1;
    int fetchSize = 1000;


    while (true) {
      long offset = startOffet;
      // 添加fetch指定目标tipic,分区,起始offset及fetchSize(字节),可以添加多个fetch
      FetchRequest req =
          new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();


      // 拉取消息
      FetchResponse fetchResponse = simpleConsumer.fetch(req);


      ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);
      for (MessageAndOffset messageAndOffset : messageSet) {
        Message mess = messageAndOffset.message();
        ByteBuffer payload = mess.payload();
        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        String msg = new String(bytes);


        offset = messageAndOffset.offset();
        System.out.println("partition : " + 3 + ", offset : " + offset + " mess : " + msg);
      }
      // 继续消费下一批
      startOffet = offset + 1;
    }
  }


  /**
  * 找到制定分区的leader broker
  *
  * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”
  * @param topic topic
  * @param partition 分区
  * @return
  */
  public Broker findLeader(String brokerHosts, String topic, int partition) {
    Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();
    System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),
        leader.port()));
    return leader;
  }


  /**
  * 找到指定分区的元数据
  *
  * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”
  * @param topic topic
  * @param partition 分区
  * @return 元数据
  */
  private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {
    PartitionMetadata returnMetaData = null;
    for (String brokerHost : brokerHosts.split(",")) {
      SimpleConsumer consumer = null;
      String[] splits = brokerHost.split(":");
      consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");
      List<String> topics = Collections.singletonList(topic);
      TopicMetadataRequest request = new TopicMetadataRequest(topics);
      TopicMetadataResponse response = consumer.send(request);
      List<TopicMetadata> topicMetadatas = response.topicsMetadata();
      for (TopicMetadata topicMetadata : topicMetadatas) {
        for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {
          if (PartitionMetadata.partitionId() == partition) {
            returnMetaData = PartitionMetadata;
          }
        }
      }
      if (consumer != null)
        consumer.close();
    }
    return returnMetaData;
  }


  /**
  * 根据时间戳找到某个客户端消费的offset
  *
  * @param consumer SimpleConsumer
  * @param topic topic
  * @param partition 分区
  * @param clientID 客户端的ID
  * @param whichTime 时间戳
  * @return offset
  */
  public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
        new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
  }
}
低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

高级别的API

package com.cuicui.kafkademon;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;


/**
 * offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护
 * 
 * @author <a href="mailto:leicui001@126.com">崔磊</a>
 * @date 2015年11月4日 上午11:44:15
 */
public class MyHighLevelConsumer {

    /**
     * 该consumer所属的组ID
     */
    private String groupid;

    /**
     * 该consumer的ID
     */
    private String consumerid;

    /**
     * 每个topic开几个线程?
     */
    private int threadPerTopic;

    public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {
        super();
        this.groupid = groupid;
        this.consumerid = consumerid;
        this.threadPerTopic = threadPerTopic;
    }

    public void consume() {
        Properties props = new Properties();
        props.put("group.id", groupid);
        props.put("consumer.id", consumerid);
        props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "2000");
        // props.put("auto.commit.interval.ms", "1000");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        // 设置每个topic开几个线程
        topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);

        // 获取stream
        Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);

        // 为每个stream启动一个线程消费消息
        for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {
            new MyStreamThread(stream).start();
        }
    }

    /**
     * 每个consumer的内部线程
     * 
     * @author cuilei05
     *
     */
    private class MyStreamThread extends Thread {
        private KafkaStream<byte[], byte[]> stream;

        public MyStreamThread(KafkaStream<byte[], byte[]> stream) {
            super();
            this.stream = stream;
        }

        @Override
        public void run() {
            ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

            // 逐条处理消息
            while (streamIterator.hasNext()) {
                MessageAndMetadata<byte[], byte[]> message = streamIterator.next();
                String topic = message.topic();
                int partition = message.partition();
                long offset = message.offset();
                String key = new String(message.key());
                String msg = new String(message.message());
                // 在这里处理消息,这里仅简单的输出
                // 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理
                System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()
                        + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "
                        + key + " , mess : " + msg);
            }
        }
    }

    public static void main(String[] args) {
        String groupid = "myconsumergroup";
        MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1", 3);
        MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2", 3);

        consumer1.consume();
        consumer2.consume();
    }
}
这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka报错(三):Failed to cons.. 下一篇kafka 为什么快

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目