设为首页 加入收藏

TOP

Flume 自定义拦截器(一)
2018-10-07 10:09:14 】 浏览:288
Tags:Flume 定义 拦截

做项目时遇到一个问题,需要对接收到的日志数据做复杂逻辑处理并将一条转换成多条。


对比了td-agent,filebeat、flume日志采集工具。


td-agent核心部分是用C实现,而插件部分用了ruby,但ruby不熟;filebeat正则匹配很强大,但关于插件相关资料很少;flume插件却可以直接用java实现。于是决定通过自定义flume拦截器实现这一功能。


Flume拦截器


Flume的拦截器可删除或修改Event。


Timestamp 拦截器:在Event Header中添加时间戳。
Host 拦截器:在Event Header中添加agent运行机器的Host或IP。
Static 拦截器:在Event Header中添加自定义静态属性。
Remove Header拦截器:可移除Event Header中指定属性。
UUID拦截器:在Event Header中添加全局唯一UUID。
Search and Replace拦截器:基于正则搜索和替换字符串等。
Regex Filtering拦截器:基于正则过滤或反向过滤Event。
Regex Extractor拦截器:基于正则在Event Header添加指定的Key,并将匹配到的内容作为对应的Value。


自定义Flume拦截器


自定义拦截器,实现给每条数据添加公共字段,并将一条转换成多条。其他复杂逻辑类似。


package com.flumePlugins.interceptor;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


/**
 * Author: Wang Pei
 * Summary:
 */
public class ParseLogByRule implements Interceptor {
    @Override
    public void initialize() {
        //pass
    }


    @Override
    public void close() {
        //pass


    }


    /**
    * 解析单条event
    * @param event
    * @return
    */
    @Override
    public Event intercept(Event event) {
        //输入
        String inputeBody=null;
        //输出
        byte[] outputBoday=null;
        //解析---这里定义对单条Event处理规则
        try {
            inputeBody=new String(event.getBody(), Charsets.UTF_8);
            ArrayList<String> temp = new ArrayList<>();


            JSONObject bodyObj = JSON.parseObject(inputeBody);


            //1)公共字段
            String host = bodyObj.getString("host");
            String user_id = bodyObj.getString("user_id");
            JSONArray data = bodyObj.getJSONArray("items");


            //2)Json数组=>every json obj
            for (Object item : data) {
                JSONObject itemObj = JSON.parseObject(item.toString());
                HashMap<String, Object> fields = new HashMap<>();
                fields.put("host",host);
                fields.put("user_id",user_id);
                fields.put("item_type",itemObj.getString("item_type"));
                fields.put("active_time",itemObj.getLongValue("active_time"));
               

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Elasticsearch Kibana查询语法 下一篇Python基础教程:for 循环语句 与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目