设为首页 加入收藏

TOP

Kafka的Exactly Once和事务
2019-02-07 02:33:27 】 浏览:108
Tags:Kafka Exactly Once 事务

概述

完整的Exactly Once时非常难以实现的,可以说时分布式消息系统的核心问题。
https://blog.csdn.net/qq_26222859/article/details/54933828
Kafka支持两种事务,单独的producer事务和接收-处理-发送事务,不支持单纯的consumer事务(说白了就是只有Producer提供了事务API)

Kafka的实现

Kafka在0.11.0.0之前的版本中只支持At Least Once和At Most Once语义,尚不支持Exactly Once语义。

但是在很多要求严格的场景下,如使用Kafka处理交易数据,Exactly Once语义是必须的。我们可以通过让下游系统具有幂等性来配合Kafka的At Least Once语义来间接实现Exactly Once。但是:

  • 该方案要求下游系统支持幂等操作(多次发送和一次发送效果相同),限制了Kafka的适用场景
  • 实现门槛相对较高,需要用户对Kafka的工作机制非常了解
  • 对于Kafka Stream而言,Kafka本身即是自己的下游系统(kafka stream的常见场景就是从kafka接数处理完kafka),但Kafka在0.11.0.0版本之前不具有幂等发送能力

在0.11以上版本,Kafka Stream API实现了Exactly Once语义。

在这里插入图片描述

  • PID producer ID,每个producer初始化时会被分配唯一的PID。

  • producer会为要发送的每一个Partition维护一个单调递增的Sequence Number,每发送一次为该值+1。broker也会自己记录该每一个PID对应Sequence Number的初始值并在每一次commit后对该值+1。显然,broker检查发送消息中的Sequence Number应该比broker本地大1,否则证明中间丢数据活着数据乱序,应该producer发出异常。这样的设计解决了两个问题

1. Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,造成数据重复
2. 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序
  • Transaction CoordinatorProducer。将多条消息作为一个事务批量发送,要么全部成功要么全部失败。为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块Transaction Coordinator,名为,用于管理Producer发送的消息的事务性。

  • 5.1/5.2/5.3是一个典型的2PC过程,用于维持集群对事务的相同状态。

Producer事务

单纯的producer事务将保证发送的消息同时发送成功或者同时无法发送

   Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("transactional.id", "my-transactional-id");
     Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    
     producer.initTransactions();
    
     try {
         producer.beginTransaction();
         for (int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
         producer.commitTransaction();
     } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
         // We can't recover from these exceptions, so our only option is to close the producer and exit.
         producer.close();
     } catch (KafkaException e) {
         // For all other exceptions, just abort the transaction and try again.
         producer.abortTransaction();
     }
     producer.close();

Consumer对于事务消息的处理

在上面的例子我们看到,再没有commit之前,producer已经实际上将消息发送到了broker。consumer如果此时取到这些未commmit消息,将无法处理也无法丢弃,只能缓存起来等待broker确认。这显然是一个丑陋的设计。
为此,Kafka添加了一个很重要概念,叫做LSO,即last stable offset。对于同一个TopicPartition,其offset小于LSO的所有transactional message的状态都已确定,要不就是committed,要不就是aborted。而broker对于read_committed的consumer,只提供offset小于LSO的消息。这样就避免了consumer收到状态不确定的消息,而不得不buffer这些消息。
另外,consumer会接收到引入了一种特殊类型的消息,即Control Message。consumer通过这一类消息,Consumer通过该消息过滤掉那些被abort的事务的消息。

接受-处理-发送事务

下面时kafka中最近经典的接受kafka——处理——发送到kafka的例子。

Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 消费数据,这里的groupid应该和下面producer.sendOffsetsToTransaction指定的groupid一致。
ConsumerRecords<String, String> records = consumer.poll(100);
try{
	// 发送数据给broker
	producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));//1
	// 发送消费数据的Offset给consumer的,将上述数据消费与数据发送纳入同一个Transaction内。
	producer.sendOffsetsToTransaction(offsets, "group1");//2
	// 数据发送及Offset发送均成功的情况下,提交事务
	producer.commitTransaction();//3 该句话在最后,来保证对所有消费和发送的处理事务的正确性。
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
	// 数据发送或者Offset发送出现异常时,终止事务
	producer.abortTransaction();
} finally {
	// 关闭Producer和Consumer
	producer.close();
	consumer.close();
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇EMQ集成Kafka插件编写过程 emq_pl.. 下一篇Apache Kafka 2.0.0

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目