设为首页 加入收藏

TOP

Kafka Offset管理及语义概念的理解
2019-05-12 14:36:23 】 浏览:118
Tags:Kafka Offset 管理 语义 概念 理解
引语
消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。 kafka offset的管理方式分为两种保存offset和不保存offset,一般保存offset采用的是外部存储保护,这都要根据具体的业务情况来定。使用外部存储保存,我们可把offset保存到Checkpoint, Hbase, Zookeeper, Kafka,接下来我们就来offset保存的方式,各种方式使用的场景,关于至少一次语义,最多一次语义以及一次仅一次语义的一些相关概念,以及解决至少一次语义存在的数据重复和之多一次语义存在的数据丢失问题的方法。
Kafka Offset管理-Checkpoint

1、启用Spark Streaming的checkpoint是存储偏移量最简单的方法

2、流式checkpoint专门用于保存应用程序的状态,比如保存在HDFS上,在故障时能恢复

3、Spark Streaming的checkpoint无法跨越应用程序进行恢复(这个机器上保存的offset,想在另外一台机器上恢复这个offset)

4、Spark升级也将导致无法恢复(升级(API、版本迭代、逻辑修改)后会导致以前的checkpoint无法使用)

5、在关键生产应用,不建议使用spark检查点管理offset

Kafka Offset管理-Zookeeper

receiver会自动将offset维护到zookeeper中。这里的主要讲的是用Direct方式,手动地将offset的值维护到zookeeper中。

1、路径 val zkPath = s"{kafkaOffsetRootPath}/{groupName}/{o.partition}/{o.partition}"

2、如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset

3、如果Zookeeper中有保存offset,我们会利用这个offset作为kafka的起始位置

Kafka Offset管理-Hbase

1、基于Hbase的通用设计,使用同一张表保存可以跨越多个spark Streaming程序的topic的offset

2、rowkey = topic名称 + groupID + Streaming的batchtime.milliSeconds.尽管batchtime.milliSeconds不是必须的,但是它可以看到历史的批处理任务对offset的管理情况。

3、kafka的offset保存在下面的表中,列簇为offsets, 30天后自动过期Hbase表结构 create spark_kafka_offsets,{NAME => offset, TTL => 2592000}

4、offset的获取场景

场景1:Streaming作业首次启动。通过zookeeper来查找给定topic中分区的数量,然后返回“0”作为所有topic分区的offset

场景2:长时间运行的Streaming作业已经停止,新的分区被添加到kafka的topic中。通过zookeeper来查找给定topic中分区数量,对于所有旧的topic分区,将offset设置为Hbase中的最新偏移量。对于所有新的topic分区,它将返回“0”作为offset

场景3:长时间运行的Streaming作业已经停止,topic的分区没有任何更改。在这种情况下,Hbase中发现的最新偏移量作为每个topic分区的offset返回。

Spark Streaming消费数据反写kafka

实现流程:

1、flume将socket流数据采集到kafka

2、Streaming读取kafka的数据进行清洗

3、将清洗后的数据再次写到kafka

推荐方式

1、将kafkaProducer对象广播到所有executor节点,这样就可以在每个executor节点将数据插入到kafka

2、用partition的方式,一个rdd的partition对应一个kafkaProducer

生产环境中存在问题分析

kafka的保存offset过期问题(也称offset越界问题)

原因:segment过期导致offset在实际数据的offset之前(segment过期导致数据不存在了,但是在其他地方还存在offset,当再次消费这个数据取出offset的时候就会出现数据找不到问题)

解决方法:实现手动解决offset越界问题,需要把kafkaCluster类的内容拿过来,并且把包访问权限去掉,具体实现查看MyCluster

数据峰值期间如何限速

场景:Streaming宕机一段时间或数据峰值期间都会造成kafka数据积压,如果不对Streaming的批次间隔做限速处理,在批次数据中会拉取很多的数据,这样会影响处理效率。

解决办法:进行限速。限速参数:spark.streaming.kafka.maxRatePartition 每秒每个分区获取的记录数

kafka的消息传递语义

消息传递语义有:至少一次语义(at-least-once)、最多一次语义(at-most-once)、一次仅一次语义(exactly-once)。其中at-least-once和at-most-once,它们的使用会存在数据重复和数据丢失问题,可能出现这种情况的原因图解如下:

exactly-once

由于,至少一次语义会导致数据重复,最多一次语义会导致数据的丢失,所以提出了一次仅一次语义,就可以很好地解决了数据重复和数据丢失问题。那么怎么来实现一次仅一次语义?小编总结总结如下:

1、幂等写入:当获取到数据后,先写到MySQL,再保存offset,如果在写到MySQL数据后,在保存offset之前宕机,重启作业后也不会影响一次语义,因为会在MySQL重复更新需要设置好唯一的主键。比如Redis、MySQL,再比如每次往一个目录覆写数据,这样主键不容易获取

注:在软件开发领域,幂等写入即为同样的请求被执行一次与连续执行多次的效果是一样的,服务器的状态也是一样的,实际上就是接口的可重复调用(包括时间和空间的两个维度)

2、事物控制:保证数据和offset在同一个事务里面,比如用mysql这样需要事务存储的支持。

3、自定义实现:offset和数据绑定保存等

总结
在offset的管理当中,使用checkepoint的方式来管理offset简单易实现,但是如果进行程序迭代后其他应用程序是获取不到的,因此在实际生产环境中如果Streaming流式处理的复杂度不高,处理的数据比较小可以使用checkpoint来进行offset的管理。一般情况下,推荐使用zookeeper来进行offset的管理,尽管使用起来比checkpoint复杂,但是这种方式适用于比较复杂的Streaming,当机器升级或者应用程序改变时,其他程序任然可以获取到zookeeper中的offset值。


更新时间
第一次更新时间 :2018-12-09 增加了总结

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇编写Makefile简单文件 下一篇Junit测试Controller(MockMVC使..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目