设为首页 加入收藏

TOP

kafka2.0-producer如何将消息分配到partition中_05
2019-04-23 14:30:43 】 浏览:107
Tags:kafka2.0-producer 如何 消息 分配 partition _05
Topic:producer-0    PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: producer-0   Partition: 0    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: producer-0   Partition: 1    Leader: 3   Replicas: 3,2   Isr: 3,2
    Topic: producer-0   Partition: 2    Leader: 1   Replicas: 1,3   Isr: 1,3

从上一篇文章我们可以知道,名为producer-0的topic有3的partition,分别是partition:0partition:1partition:2,并且他们分别在不同的机器上。
在这里我们先讲讲如何将消息发送到指定的partition中,然后在讲默认的partition分配策略(即DefaultPartitioner.java

指定的partition发送

话不多说,直接上代码。(提示:在启动生产者之前,先启动消费者。
生产者代码如下:

/**
 * 将消息发送到指定的partition中
 * @author yangyaming
 */
public class PartitionProducer {

    public static final String TOPIC_NAME = "producer-0"; 

    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    public static void main(String[] args) {
         Producer<String, String> producer = new KafkaProducer<>(props);

         //获取该topic的所有partition信息
         List<PartitionInfo> partitions = producer.partitionsFor(TOPIC_NAME);
         printAllPartition(partitions);

         /**将消息发送分片列表的到第二个partition中**/
         int sendPartition = partitions.get(1).partition();

         System.out.println("消息将发送到partition:" + sendPartition + "中.");

         for (int i = 100; i < 200; i++)
             producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));
         System.out.println("发送完成");
         producer.close();
    }

    //打印所有的partition
    private static void printAllPartition(List<PartitionInfo> partitions) {
        if(partitions == null || partitions.size() <= 0)
            return;

        System.out.println("topic:" + TOPIC_NAME + ",所有的partition如下:");

        partitions.forEach((partition) -> System.out.println("partition:" + partition.partition()));
    }

}

示例源码:https://github.com/Mryangtaofang/sample

执行的结果如下图,可以看到消息全部发送到了序号为1的partition中:
这里写图片描述
消费者代码如下:

/**
 * kafka消费者
 */
public class PartitionConsumer {

    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public  static void main(String args[]){
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(PartitionProducer.TOPIC_NAME));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("partition = %d offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
         }
    }
}

执行结果如下,可以看到消费的所有消息都是来自于partition-1中:
这里写图片描述

很明显,在实际应用中,我们绝对不会将所有的消息都放在一个partition中,这会导致所有的消息将只能存放在一台机器上。

producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));

截取生产者的发送代码,如果我们不指定sendPartition这个入参,那么所有的消费该如何分配到各个partition中呢

kafka2.0默认的partition分片器(DefaultPartitioner)

分析kafka客户端源码,当没有指定Partitioner.class配置时,会使用默认的分片规则,代码实现如下:

/**
 * 默认的分片策略:
 * <ul>
 * <li>如果你指定了一个partition,用你指定的partition,如上面例子,我指定了partition-1
 * <li>如果没有指定partition,但是你指定了key,那么会根据key进行哈希,分配到对应的partition中
 * <li>如果partition和key都没指定,会使用round-robin算法
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, > configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}

重点分析partition()方法:

如果你手动指定了分区(就像上面我的例子一样),根本就不会进这个方法,因为不需要进行分区。

 // hash the keyBytes to choose a partition
 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

如果你指定了key这个参数,那么会针对key进行hash,也就是上面这段代码,其实通过Utils.murmur2(keyBytes)可以看出,他是根据key的序列化之后的字节码进行hash的,所以间接地,一个消息被分到哪个partition上跟序列化方式也有关系。
Utils.murmur2是在kafka中实现的,源码如下。

 /**
  * Generates 32 bit murmur2 hash from byte array
  * @param data byte array to hash
  * @return 32 bit hash of the given array
  */
 public static int murmur2(final byte[] data) {
     int length = data.length;
     int seed = 0x9747b28c;
     // 'm' and 'r' are mixing constants generated offline.
     // They're not really 'magic', they just happen to work well.
     final int m = 0x5bd1e995;
     final int r = 24;

     // Initialize the hash to a random value
     int h = seed ^ length;
     int length4 = length / 4;

     for (int i = 0; i < length4; i++) {
         final int i4 = i * 4;
         int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
         k *= m;
         k ^= k >>> r;
         k *= m;
         h *= m;
         h ^= k;
     }

     // Handle the last few bytes of the input array
     switch (length % 4) {
         case 3:
             h ^= (data[(length & ~3) + 2] & 0xff) << 16;
         case 2:
             h ^= (data[(length & ~3) + 1] & 0xff) << 8;
         case 1:
             h ^= data[length & ~3] & 0xff;
             h *= m;
     }

     h ^= h >>> 13;
     h *= m;
     h ^= h >>> 15;

     return h;
  }

上面使用的是MurmurHash2,常用的另一种hash算法是djb。MurmurHash算法的复杂度很高,所以很难理解上面这段代码,有兴趣的可以去研究下,下面是MurmurHash2一段介绍。

MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。 由Austin Appleby在2008年发明,
并出现了多个变种,都已经发布到了公有领域。与其它流行的哈希函数相比,对于规律性较强的key,MurmurHash的随机分布特征表现更良好。
Redis在实现字典时用到了两种不同的哈希算法,MurmurHash便是其中一种(另一种是djb),在Redis中应用十分广泛,包括数据库、集群、哈希键、阻塞操作等功能都用到了这个算法。发明算法的作者被邀到google工作,该算法最新版本是MurmurHash3,基于MurmurHash2改进了一些小瑕疵,使得速度更快,实现了32位(低延时)、128位HashKey,尤其对大块的数据,具有较高的平衡性与低碰撞率。

无论是哪一种hash算法,我们都希望能将消息尽量快速的,均匀的分配到不同的partition中。

如果你没有指定key,那么会采用round-robin算法,其实就是轮询算法,每一个partition依次排队,一个一个的分配。

代码如下:

int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
} else {
    // no partitions are available, give a non-available partition
    return Utils.toPositive(nextValue) % numPartitions;
}

nextValue()的实现,很容易知道,map针对每一个topic,都有一个序号,在一开始为空的时候,就随机生成一个数,然后再自增。

然后就是Utils.toPositive(nextValue),这个方法在kafka中实现的,其实就是把负数转为正数,如果你测试一下你会发现Utils.toPositive(-1)返回的是2147483647。Utils.toPositive()的实现如下:

  public static int toPositive(int number) {
      return number & 0x7fffffff;
  }

其实就是将number的符号未设置为0,由于java中表示数使用的补码,所以-1的符号位是1,而剩下的31为与0x7fffffff相同,所以Utils.toPositive(-1)=0x7fffffff

所以,在没有key的情况下,是采用依次轮询调度算法。以上源码是kafka2.0版本的,低版本的可能会有所不同。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka源码分析之kafkaApis 下一篇kafka中关于消息的命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目