设为首页 加入收藏

TOP

流式处理的新贵 Kafka Stream - Kafka设计解析(七)(四)
2019-09-17 18:55:57 】 浏览:103
Tags:流式 处理 新贵 Kafka Stream 设计 解析
(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

3.4 聚合与乱序处理

聚合操作可应用于KStream和KTable。当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集。

需要说明的是,聚合操作的结果肯定是KTable。因为KTable是可更新的,可以在晚到的数据到来时(也即发生数据乱序时)更新结果KTable。

这里举例说明。假设对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操作。并且KStream先后出现时间为1秒, 3秒, 5秒的数据,此时5秒的窗口已达上限,Kafka Stream关闭该窗口,触发Count操作并将结果3输出到KTable中(假设该结果表示为<1-5,3>)。若1秒后,又收到了时间为2秒的记录,由于1-5秒的窗口已关闭,若直接抛弃该数据,则可认为之前的结果<1-5,3>不准确。而如果直接将完整的结果<1-5,4>输出到KStream中,则KStream中将会包含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。因此Kafka Stream选择将聚合结果存于KTable中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可得到完整的正确的结果。

这种方式保证了数据准确性,同时也提高了容错性。

但需要说明的是,Kafka Stream并不会对所有晚到的数据都重新计算并更新结果集,而是让用户设置一个retention period,将每个窗口的结果集在内存中保留一定时间,该窗口内的数据晚到时,直接合并计算,并更新结果KTable。超过retention period后,该窗口结果将从内存中删除,并且晚到的数据即使落入窗口,也会被直接丢弃。

3.5 容错

Kafka Stream从如下几个方面进行容错

  • 高可用的Partition保证无数据丢失。每个Task计算一个Partition,而Kafka数据复制机制保证了Partition内数据的高可用性,故无数据丢失风险。同时由于数据是持久化的,即使任务失败,依然可以重新计算。
  • 状态存储实现快速故障恢复和从故障点继续处理。对于Join和聚合及窗口等有状态计算,状态存储可保存中间状态。即使发生Failover或Consumer Rebalance,仍然可以通过状态存储恢复中间状态,从而可以继续从Failover或Consumer Rebalance前的点继续计算。
  • KTable与retention period提供了对乱序数据的处理能力。

4 Kafka Stream应用示例

下面结合一个案例来讲解如何开发Kafka Stream应用。本例完整代码可从作者Github获取。

订单KStream(名为orderStream),底层Topic的Partition数为3,Key为用户名,Value包含用户名,商品名,订单时间,数量。用户KTable(名为userTable),底层Topic的Partition数为3,Key为用户名,Value包含性别,地址和年龄。商品KTable(名为itemTable),底层Topic的Partition数为6,Key为商品名,价格,种类和产地。现在希望计算每小时购买产地与自己所在地相同的用户总数。

首先由于希望使用订单时间,而它包含在orderStream的Value中,需要通过提供一个实现TimestampExtractor接口的类从orderStream对应的Topic中抽取出订单时间。

public class OrderTimestampExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}

接着通过将orderStream与userTable进行Join,来获取订单用户所在地。由于二者对应的Topic的Partition数相同,且Key都为用户名,再假设Producer往这两个Topic写数据时所用的Partitioner实现相同,则此时上文所述Join条件满足,可直接进行Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 该lamda表达式定义了如何从orderStream与userTable生成结果集的Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 结果集Key序列化方式
        Serdes.String(),
         // 结果集Value序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

从上述代码中,可以看到,Join时需要指定如何从参与Join双方的记录生成结果记录的Value。Key不需要指定,因为结果记录的Key与Join Key相同,故无须指定。Join结果存于名为orderUserStream的KStream中。

接下来需要将orderUserStream与itemTable进行Join,从而获取商品产地。此时orderUserStream的Key仍为用户名,而itemTable对应的Topic的Key为产品名,并且二者的Partition数不一样,因此无法直接Join。此时需要通过through方法,对其中一方或双方进行重新分区,使得二者满足Join条件。这一过程相当于Spark的Shuffle过程和Storm的FieldGrouping。

orderUserStrea
    .through(
        // Key的序列化方式
        Serdes.String(),
        // Value的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 重新按照商品名进行分区,具体取商品名的哈希值,然后对分区数取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serd
首页 上一页 1 2 3 4 5 下一页 尾页 4/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇理论篇:关注点分离(Separation o.. 下一篇(*(工)*):目录

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目