设为首页 加入收藏

TOP

Flume 自定义拦截器(二)
2018-10-07 10:09:14 】 浏览:289
Tags:Flume 定义 拦截
; temp.add(new JSONObject(fields).toJSONString());
            }
            //3)Json obj 拼接
            outputBoday=String.join("\n",temp).getBytes();
        }catch (Exception e){
            System.out.println("输入数据:"+inputeBody);
            e.printStackTrace();
        }
        event.setBody(outputBoday);
        return event;
    }


    /**
    * 解析一批event
    * @param events
    * @return
    */
    @Override
    public List<Event> intercept(List<Event> events) {
        //输出---一批Event
        ArrayList<Event> result = new ArrayList<>();
        //输入---一批Event
        try{
            for (Event event : events) {
                //一条条解析
                Event interceptedEvent = intercept(event);
                byte[] interceptedEventBody = interceptedEvent.getBody();
                if(interceptedEventBody.length!=0){
                    String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);
                    String[] multiEventArr = multiEvent.split("\n");
                    for (String needEvent : multiEventArr) {
                        SimpleEvent simpleEvent = new SimpleEvent();
                        simpleEvent.setBody(needEvent.getBytes());
                        result.add(simpleEvent);
                    }


                }
            }


        }catch (Exception e){
            e.printStackTrace();
        }
        return result;
    }


    /**
    * 实现内部类接口
    */
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new ParseLogByRule();
        }


        @Override
        public void configure(Context context) {


        }
    }


}


插件打包上传


编译打包拦截器插件,然后将打包后的插件和依赖的fastjson一起上传到flume lib目录


配置agent


TAILDIR Source ===>file Channel ===>Kafka Sink


    cat agent.conf
    # source的名字
    agent.sources = s1
    # channels的名字
    agent.channels = c1
    # sink的名字
    agent.sinks = r1


    # 指定source使用的channel
  &n

首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Elasticsearch Kibana查询语法 下一篇Python基础教程:for 循环语句 与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目