我们都知道Kafka的吞吐量很大,但是Kafka究竟会不会丢失消息呢?又会不会重复消费消息呢?
? 有很多公司因为业务要求必须保证消息不丢失、不重复的到达,比如无人机实时监控系统,当无人机闯入机场区域,我们必须立刻报警,不允许消息丢失。而无人机离开禁飞区域后我们需要将及时报警解除。如果消息重复了呢,我们是否需要复杂的逻辑来自己处理消息重复的情况呢,这种情况恐怕相当复杂而难以处理。但是如果我们能保证消息exactly once,那么一切都容易得多。
图 无人机实时监控
下面我们来简单了解一下消息传递语义,以及kafka的消息传递机制。
首先我们要了解的是message delivery semantic 也就是消息传递语义。
这是一个通用的概念,也就是消息传递过程中消息传递的保证性。
分为三种:
最多一次(at most once): 消息可能丢失也可能被处理,但最多只会被处理一次。
可能丢失 不会重复
至少一次(at least once): 消息不会丢失,但可能被处理多次。
可能重复 不会丢失
精确传递一次(exactly once): 消息被处理且只会被处理一次。
不丢失 不重复 就一次
而kafka其实有两次消息传递,一次生产者发送消息给kafka,一次消费者去kafka消费消息。
两次传递都会影响最终结果,
两次都是精确一次,最终结果才是精确一次。
两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。
一、Produce端消息传递
这是producer端的代码:
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
System.out.println("testkafka"+i);
}
kafkaProducer.close();
其中指定了一个参数acks 可以有三个值选择:
0: producer完全不管broker的处理结果 回调也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高
? all或者-1: leader broker会等消息写入 并且ISR都写入后 才会响应,这种只要ISR有副本存活就肯定不会丢失,但吞吐量最低。
? 1: 默认的值 leader broker自己写入后就响应,不会等待ISR其他的副本写入,只要leader broker存活就不会丢失,即保证了不丢失,也保证了吞吐量。
所以设置为0时,实现了at most once,而且从这边看只要保证集群稳定的情况下,不设置为0,消息不会丢失。
但是还有一种情况就是消息成功写入,而这个时候由于网络问题producer没有收到写入成功的响应,producer就会开启重试的操作,直到网络恢复,消息就发送了多次。这就是at least once了。
kafka producer 的参数acks 的默认值为1,所以默认的producer级别是at least once。并不能exactly once。
图kafka-apis
二、Consumer端消息传递
consumer是靠offset保证消息传递的。
consumer消费的代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(