设为首页 加入收藏

TOP

Flume_拦截器、选择器、Sink组
2018-11-28 18:07:42 】 浏览:115
Tags:Flume_ 拦截 选择 Sink
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Regan_Hoo/article/details/78728642
  1. 拦截器(interceptor)

    拦截器是简单插件式组件,设置在Source和Source写入数据的Channel之间。每个拦截器实例只处理同一个Source接收到的事件。

    因为拦截器必须在事件写入channel之前完成转换操作,只有当拦截器已成功转换事件后,channel(和任何其他可能产生超时的source)才会响应发送事件的客户端或sink,因此在拦截器中进行大量重量级的处理并不是一个好主意。如果拦截器中的处理是重量级的、耗时的,那么需要相应的调整超时时间属性。

    拦截器一般用于分析事件以及在需要的时候丢弃事件,通常情况是,拦截器给事件插入报头,这些事件后续用于HDFS Sink(用于时间戳或者用于基于报头的分桶)、HBase Sink(用于行键)等。这些事件报头也经常在复杂Channel处理器中用于将流分为多个流的分支,或者基于优先级将事件发送到不同的Sink中,这些事件报头是拦截器分析的内容。

    这里写图片描述
    ①时间拦截器(timestamp interceptor)
    用于将时间戳插入到Flume的事件报头中,HDFS Sink可以根据时间戳进行分桶

    这里写图片描述
    ②主机拦截器(host interceptor)
    用于将服务器的IP地址或者主机名注入到Flume的事件报头中

    这里写图片描述
    ③静态拦截器
    将固定报头的键和值插入拦截的每个事件中,报头键和值是可配置的。

    这里写图片描述
    下面的配置将触发报头中包含以book为键,以usingFlume为值的事件:

    这里写图片描述
    ④正则过滤拦截器
    过滤操作基于配置文件中提供的正则表达式(regex),每个正则过滤器将事件体转换为UTF-8字符串,并根据提供的正则表达式匹配字符串。

    这里写图片描述
    ⑤Morphline拦截器
    Morphline Commands实现了Flume、MapReduce、HBase、Spark到Apache Solr的数据ETL。该拦截器获取使用哪些morphline文件和使用文件中的哪些morphline去处理事件的信息。

    这里写图片描述
    ⑥UUID拦截器
    类似solr这样的系统,需要每个文档写入时都带有唯一ID。UUID(通用唯一标识符)拦截器可以用于为每个事件生成这样的唯一标识符。

    这里写图片描述
    ⑦编写拦截器
    以下展示了所有拦截器必须实现的Interceptor接口:

    这里写图片描述
    定制拦截器可以部署在plugins.d目录

  2. Channel选择器

    Channel选择器是决定Source接收的一个特定事件写入哪些Channel的组件,它们告知Channel处理器,然后由其将事件写入到每个Channel。

    Flume内置两种选择器:replicating和multiplexing。如果source的配置中没有指定选择器,那么会自动使用复制Channel选择器。

    这里写图片描述
    ① 复制Channel选择器
    该选择器复制每个事件到通过Source的channels参数所指定的所有的Channels中。复制Channel选择器还有一个可选参数optional,该参数是空格分隔的channel名字列表。此参数指定的所有channel都认为是可选的,所以如果事件写入这些channel时,若有失败发生,会忽略。而写入其他channel失败时会抛出异常。

    这里写图片描述
    以上的配置使得写入C3出现问题时不会抛出异常,并且source将通知前一阶段写入是成功的。
    可选channel必须罗列在source的channels参数中。

    ② 多路复用channel选择器
    多路复用channel选择器是一种专门用于动态路由事件的channel选择器,通过选择事件应该写入的channel,基于一个特定的事件头的值进行路由。结合拦截器,可以在事件上做一些分析,然后决定应该写入哪一个channel。
    被写入的channel列表由配置文件中的每个报头的值指定,如果配置未指定一个特定事件中报头的值,则该事件写入到默认的channel

    这里写图片描述
    ③ 自定义channel选择器
    需要实现channelSelector接口或继承AbstractChannelSelector类。对于每个事件,channel处理器调用channel选择器的getRequiredChannels和getOptionalChannels方法,返回需要的和可选的将要写入事件的channel列表。如果写入到任何所需的channel的操作失败,channel处理器将会抛出异常,并使source重试。任何写入到可选channel的操作失败都会被忽略。

  3. Sink组和Sink处理器

    Flume框架为每个sink组实例化一个sink运行器,来运行sink组。每个sink组可以包含任意数量的sink。sink运行器持续请求sink组,要求其中的一个sink从自己的channel中读取事件。sink组通常用于RPC Sink,在层之间以负载均衡或故障转移方式发送数据。
    定义sink组:

    这里写图片描述
    下面展示用sink集合配置sg1和sg2:

    这里写图片描述
    sink组中的每个sink必须单独进行配置,包括,sink从哪个channel读取,写数据到哪些主机或集群等。
    sink处理器决定任何时候哪个sink是活跃的组件。
    sink处理器与sink运行器不同。sink运行器实际上是运行sink的,而sink处理器决定了哪个sink应该从自己的channel中拉取事件。
    Flume自带了两类sink处理器:load-balancing sink处理器和failover sink处理器
    配置sink处理器:

    这里写图片描述
    ① load-balancing sink处理器
    假设第一层100个agent,第二层有4个agent。第一层每个agent将有4个avro sink用来推送数据到第二层的每个agent。该工作正常运行,直到其中第二层的一个agent失败。此时,配置发送数据的sink 将不会发送任何数据,直到第二层失败的agent重新上线。
    除了该sink耗尽了agent上的几个线程这样一个事实(一个用于sink runner,另一个用于使用netty发送数据的线程池)浪费了CPU周期,直到第二层agent启动并运行,通过创建事件的事务且回滚。该sink也会给channel造成额外的压力,对于File channel,尽管事务没有被提交,许多将要写入文件的读取操作(即使事务没有被提交,这些读取操作也要被写入文件),也造成了I/O成本和磁盘空间成本。

    这里写图片描述
    为了避免这样的问题,sink组使用load-balancing sink处理器是一个好主意,它将从sink组所有的sink中选择一个sink,处理来自channel的事件
    sink选择的顺序可以配置为random或round-Robin。sink处理器可以配置将失败的sink加入黑名单,回退时间以指数方式增长直到达到上限值。这能确保相同的额sink不会循环重复尝试且不浪费资源,直到回退时间过期。

    这里写图片描述
    该配置意味着,在任何时候每个agent只有一个sink写数据。可以通过添加多个有相似配置的load-balancing sink处理器的sink组进行修改。注意,可能会有多个agent尝试写入数据到第二层每个agent
    【注意】因为每个avro sink对avro source保持持续开放的连接,拥有写入到相同agent的多个sink会增加更多的socket连接,且在第二层agent上占据更多的资源。对相同的agent增加大量sink组之前必须要谨慎考虑。

    ② 编写sink选择器
    自定义选择器必须实现LoadBalancingSinkProcessor$SinkSelector接口:

    这里写图片描述
    这里写图片描述
    ③ Failover Sink处理器
    Failover Sink处理器从sink组中以优先级的顺序选择sink。拥有最高优先级的sink先写数据直到它失败,然后选择组中其他sink中拥有最高优先级的sink。这意味着,即使已经失败的最高优先级的agent重新上线,Failover Sink处理器也不会让写入该agent的sink激活,直到目前活跃的sink遇到一个错误。

    这里写图片描述
    如果两个sink有相同的优先级,只会激活sink组中首先指定的sink

    这里写图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume或Kafka和Elasticsearch整合 下一篇Flume常用命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目