Topic :producer-0 PartitionCount :3 ReplicationFactor :2 Configs :
Topic : producer-0 Partition : 0 Leader : 2 Replicas : 2,1 Isr : 2,1
Topic : producer-0 Partition : 1 Leader : 3 Replicas : 3,2 Isr : 3,2
Topic : producer-0 Partition : 2 Leader : 1 Replicas : 1,3 Isr : 1,3
从上一篇文章我们可以知道,名为producer-0的topic有3的partition,分别是partition:0
,partition:1
和partition:2
,并且他们分别在不同的机器上。
在这里我们先讲讲如何将消息发送到指定的partition中,然后在讲默认的partition分配策略(即DefaultPartitioner.java
)
指定的partition发送
话不多说,直接上代码。(提示:在启动生产者之前,先启动消费者。 )
生产者代码如下:
/**
* 将消息发送到指定的partition中
* @author yangyaming
*/
public class PartitionProducer {
public static final String TOPIC_NAME = "producer-0" ;
private static Properties props = new Properties();
static {
props.put("bootstrap.servers" , "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092" );
props.put("acks" , "all" );
props.put("retries" , 0 );
props.put("batch.size" , 16384 );
props.put("linger.ms" , 1 );
props.put("buffer.memory" , 33554432 );
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
}
public static void main (String[] args) {
Producer<String, String> producer = new KafkaProducer<>(props);
List<PartitionInfo> partitions = producer.partitionsFor(TOPIC_NAME);
printAllPartition(partitions);
/**将消息发送分片列表的到第二个partition中**/
int sendPartition = partitions.get(1 ).partition();
System.out.println("消息将发送到partition:" + sendPartition + "中." );
for (int i = 100 ; i < 200 ; i++)
producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));
System.out.println("发送完成" );
producer.close();
}
private static void printAllPartition (List<PartitionInfo> partitions) {
if (partitions == null || partitions.size() <= 0 )
return ;
System.out.println("topic:" + TOPIC_NAME + ",所有的partition如下:" );
partitions.forEach((partition) -> System.out.println("partition:" + partition.partition()));
}
}
示例源码:https://github.com/Mryangtaofang/sample
执行的结果如下图,可以看到消息全部发送到了序号为1的partition中:
消费者代码如下:
/**
* kafka消费者
*/
public class PartitionConsumer {
private static Properties props = new Properties();
static {
props.put("bootstrap.servers" , "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092" );
props.put("group.id" , "test" );
props.put("enable.auto.commit" , "true" );
props.put("auto.commit.interval.ms" , "1000" );
props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
}
public static void main (String args[]){
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(PartitionProducer.TOPIC_NAME));
while (true ) {
ConsumerRecords<String, String> records = consumer.poll(100 );
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d offset = %d, key = %s, value = %s%n" ,record.partition(), record.offset(), record.key(), record.value());
}
}
}
执行结果如下,可以看到消费的所有消息都是来自于partition-1中:
很明显,在实际应用中,我们绝对不会将所有的消息都放在一个partition中,这会导致所有的消息将只能存放在一台机器上。
producer.send(new ProducerRecord<String , String >(TOPIC_NAME,sendPartition, Integer .toString(i), Integer .toString(i)));
截取生产者的发送代码,如果我们不指定sendPartition这个入参,那么所有的消费该如何分配到各个partition中呢
kafka2.0默认的partition分片器(DefaultPartitioner)
分析kafka客户端源码,当没有指定Partitioner.class
配置时,会使用默认的分片规则,代码实现如下:
/**
* 默认的分片策略:
* <ul>
* <li>如果你指定了一个partition,用你指定的partition,如上面例子,我指定了partition-1
* <li>如果没有指定partition,但是你指定了key,那么会根据key进行哈希,分配到对应的partition中
* <li>如果partition和key都没指定,会使用round-robin算法
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure (Map<String, > configs) {}
public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null ) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0 ) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue (String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null ) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close () {}
}
重点分析partition()
方法:
如果你手动指定了分区(就像上面我的例子一样),根本就不会进这个方法,因为不需要进行分区。
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果你指定了key这个参数,那么会针对key进行hash,也就是上面这段代码,其实通过Utils.murmur2(keyBytes)
可以看出,他是根据key的序列化之后的字节码进行hash的,所以间接地,一个消息被分到哪个partition上跟序列化方式也有关系。
Utils.murmur2
是在kafka中实现的,源码如下。
/**
* Generates 32 bit murmur2 hash from byte array
* @param data byte array to hash
* @return 32 bit hash of the given array
*/
public static int murmur2 (final byte [] data) {
int length = data.length;
int seed = 0x9747b28c ;
final int m = 0x5bd1e995 ;
final int r = 24 ;
int h = seed ^ length;
int length4 = length / 4 ;
for (int i = 0 ; i < length4; i++) {
final int i4 = i * 4 ;
int k = (data[i4 + 0 ] & 0xff ) + ((data[i4 + 1 ] & 0xff ) << 8 ) + ((data[i4 + 2 ] & 0xff ) << 16 ) + ((data[i4 + 3 ] & 0xff ) << 24 );
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
h ^= k;
}
switch (length % 4 ) {
case 3 :
h ^= (data[(length & ~3 ) + 2 ] & 0xff ) << 16 ;
case 2 :
h ^= (data[(length & ~3 ) + 1 ] & 0xff ) << 8 ;
case 1 :
h ^= data[length & ~3 ] & 0xff ;
h *= m;
}
h ^= h >>> 13 ;
h *= m;
h ^= h >>> 15 ;
return h;
}
上面使用的是MurmurHash2,常用的另一种hash算法是djb。MurmurHash算法的复杂度很高,所以很难理解上面这段代码,有兴趣的可以去研究下,下面是MurmurHash2一段介绍。
MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。 由Austin Appleby在2008年发明,
并出现了多个变种,都已经发布到了公有领域。与其它流行的哈希函数相比,对于规律性较强的key,MurmurHash的随机分布特征表现更良好。
Redis在实现字典时用到了两种不同的哈希算法,MurmurHash便是其中一种(另一种是djb),在Redis中应用十分广泛,包括数据库、集群、哈希键、阻塞操作等功能都用到了这个算法。发明算法的作者被邀到google工作,该算法最新版本是MurmurHash3,基于MurmurHash2改进了一些小瑕疵,使得速度更快,实现了32位(低延时)、128位HashKey,尤其对大块的数据,具有较高的平衡性与低碰撞率。
无论是哪一种hash算法,我们都希望能将消息尽量快速的,均匀的分配到不同的partition中。
如果你没有指定key,那么会采用round-robin算法,其实就是轮询算法,每一个partition依次排队,一个一个的分配。
代码如下:
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster .availablePartitionsForTopic(topic);
if (availablePartitions.size () > 0 ) {
int part = Utils.toPositive(nextValue) % availablePartitions.size ();
return availablePartitions.get(part).partition ();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
看nextValue()
的实现,很容易知道,map针对每一个topic,都有一个序号,在一开始为空的时候,就随机生成一个数,然后再自增。
然后就是Utils.toPositive(nextValue)
,这个方法在kafka中实现的,其实就是把负数转为正数,如果你测试一下你会发现Utils.toPositive(-1)
返回的是2147483647。Utils.toPositive()
的实现如下:
public static int toPositive (int number) {
return number & 0x7fffffff ;
}
其实就是将number
的符号未设置为0,由于java 中表示数使用的补码,所以-1的符号位是1,而剩下的31为与0x7fffffff
相同,所以Utils.toPositive(-1)=0x7fffffff
。
所以,在没有key的情况下,是采用依次轮询调度算法。以上源码是kafka2.0版本的,低版本的可能会有所不同。