设为首页 加入收藏

TOP

自定义Flume的Interceptor,编写、配置与使用
2019-02-11 14:10:50 】 浏览:49
Tags:定义 Flume Interceptor 编写 配置 使用

基于需求,自己写了一个Flume的Interceptor,主要需求如下:

kafka中数据格式为json字符串,需要利用Flume消费kafka中的数据,并按照指定的key的顺序,将value输出,并用指定分隔符分隔
输入:String JsonString = "{'key1':'a','key2':'b','key3':'c','key4':'d','key5':'e','key6':'f'}";
指定的key的顺序:String[] keys = {"key1","key2","key3","key4","key5","key6"};
输出:a\u0001b\u0001c\u0001d\u0001e\u0001f

具体实现代码如下:

1、Interceptor代码:

package com.bonc.flumeInterceptor;

import java.util.HashMap;
import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.bonc.utils.Json2Map;
import com.bonc.utils.StringUnicodeUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
 * 
 * @author pengzhe
 * 自定义的一个flume拦截器,用来将json字符串转化成map,再按照指定顺序将map中的value按照指定格式输出
 * 例子:
 * 	 String[] keys = {"key1","key2","key3","key4","key5","key6"};
	  输入:String JsonString = "{'key1':'a','key2':'b','key3':'c','key4':'d','key5':'e','key6':'f'}";
	  输出:a\u0001b\u0001c\u0001d\u0001e\u0001f
 *
 */
public class FlumeJsonInterceptor implements Interceptor{

	@Override
	public void close() {	
	}
	@Override
	public void initialize() {
	}
	@Override
	public Event intercept(Event event) {
		//以keys数组中的元素顺序,输出jsonMap中对应key的value的值
		String[] keys = {"key1","key2","key3","key4","key5","key6"};
		String JsonString = new String(event.getBody(),Charsets.UTF_8);
		//将jsonString转化为jsonMap
		HashMap jsonMap = (HashMap) Json2Map.json2Map(JsonString);
		System.out.println(jsonMap);
		//将\u0001转化成字符串,用于拼接
		String separatorStr = StringUnicodeUtil.unicode2String("\u0001");
		//声明一个stringBuffer,来拼接字符串
		StringBuffer finalStr = new StringBuffer();
		
		//按keys数组的顺序输出jsonMap中的value
		for (String keyStr : keys) {
			String jsonValue = (String) jsonMap.get(keyStr);
			finalStr.append(jsonValue).append(separatorStr);
		}
		event.setBody(finalStr.toString().getBytes());
        return event;
	}
	@Override
	public List<Event> intercept(List<Event> events) {
		List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
	}
	public static class Builder implements Interceptor.Builder{
		@Override
		public void configure(Context context) {
		}
		@Override
		public Interceptor build() {
			return new FlumeJsonInterceptor();
		}
	}
}

2、JsonToMap代码

package com.bonc.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

public class Json2Map {
	 
    /**
     * 将json字符串转为Map结构
     * 如果json复杂,结果可能是map嵌套map
     * @param jsonStr 入参,json格式字符串
     * @return 返回一个map
     */
    public static Map<String, Object> json2Map(String jsonStr) {
        Map<String, Object> map = new HashMap<>();
        if(jsonStr != null && !"".equals(jsonStr)){
            //最外层解析
            JSONObject json = JSONObject.fromObject(jsonStr);
            for (Object k : json.keySet()) {
                Object v = json.get(k);
                //如果内层还是数组的话,继续解析
                if (v instanceof JSONArray) {
                    List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
                    Iterator<JSONObject> it = ((JSONArray) v).iterator();
                    while (it.hasNext()) {
                        JSONObject json2 = it.next();
                        list.add(json2Map(json2.toString()));
                    }
                    map.put(k.toString(), list);
                } else {
                    map.put(k.toString(), v);
                }
            }
            return map;
        }else{
            return null;
        }
    }
}

3、UNICODE与String相互转化代码

package com.bonc.utils;
 
import java.util.regex.Matcher;  
import java.util.regex.Pattern;  
   
/** 
 *  
 * <p>Title: String 与 Unicode 互相转换的工具类</p> 
 * <p>Description: </p> 
 * <p>Company: SCAU@Copyright</p> 
 * @Copyright 1.0 
 * @author jodenhe (824923282@qq.com) 
 * @version 1.0 
 * @since 2017年8月17日 下午9:42:50 
 */ 
public class StringUnicodeUtil {  
       
    /** 
     * 含有unicode 的字符串转一般字符串 
     * @param unicodeStr 混有 Unicode 的字符串 
     * @return 
     */ 
    public static String unicodeStr2String(String unicodeStr) {  
        int length = unicodeStr.length();  
        int count = 0;  
        //正则匹配条件,可匹配“\\u”1到4位,一般是4位可直接使用 String regex = "\\\\u[a-f0-9A-F]{4}";  
        String regex = "\\\\u[a-f0-9A-F]{1,4}";  
        Pattern pattern = Pattern.compile(regex);  
        Matcher matcher = pattern.matcher(unicodeStr);  
        StringBuffer sb = new StringBuffer();  
           
        while(matcher.find()) {  
            String oldChar = matcher.group();//原本的Unicode字符  
            String newChar = unicode2String(oldChar);//转换为普通字符  
            int index = unicodeStr.indexOf(oldChar);  
               
            sb.append(unicodeStr.substring(count, index));//添加前面不是unicode的字符  
            sb.append(newChar);//添加转换后的字符  
            count = index+oldChar.length();//统计下标移动的位置  
        }  
        sb.append(unicodeStr.substring(count, length));//添加末尾不是Unicode的字符  
        return sb.toString();  
    }  
       
    /** 
     * 字符串转换unicode 
     * @param string 
     * @return 
     */ 
    public static String string2Unicode(String string) {  
        StringBuffer unicode = new StringBuffer();  
        for (int i = 0; i < string.length(); i++) {  
            // 取出每一个字符  
            char c = string.charAt(i);  
            // 转换为unicode  
            unicode.append("\\u" + Integer.toHexString(c));  
        }  
   
        return unicode.toString();  
    }  
   
    /** 
     * unicode 转字符串 
     * @param unicode 全为 Unicode 的字符串 
     * @return 
     */ 
    public static String unicode2String(String unicode) {  
        StringBuffer string = new StringBuffer();  
        String[] hex = unicode.split("\\\\u"); 
        for (int i = 1; i < hex.length; i++) {  
            // 转换出每一个代码点  
            int data = Integer.parseInt(hex[i], 16);  
            // 追加成string  
            string.append((char) data);  
        } 
        return string.toString();  
    } 
}

配置使用:

1、将以上代码打成jar包

此处只需要将src目录下的文件打成jar包即可
鼠标指向src目录->右键选择export->选择java目录下的JARfile->点击next->指定保存目录->finish
程序目录结构

2、flume自定义组件的jar包存放问题

作者:双斜杠少年
来源:CSDN
原文:https://blog.csdn.net/u012373815/article/details/54352177
版权声明:本文为博主原创文章,转载请附上博文链接!

自定义flume 组建后,将项目打成jar 包,关于这个jar 包的管理和使用这里列举了三中方式。这三种方式都可以,随自己喜好使用。

lib 目录

将maven项目打成jar包,将jar 包放到flume的lib 目录下。

此方法简单粗暴,缺点就是jar 包不易管理

官方给定目录

官方建议在flume的 plugins.d (plugins.d 目录需要自己创建)目录下创建 一个自己定义的目录,在自定义的目录下新建 lib 和 libext 文件夹,lib 文件夹为放自定义组件的jar包,libext 文件夹下放 自定义组件的依赖包。

flume-1.7.0/plugins.d/
flume-1.7.0/plugins.d/custom-MysqlSink/
flume-1.7.0/plugins.d/custom-MysqlSink/lib/mysql-sink.jar
flume-1.7.0/plugins.d/custom-MysqlSink/libext/mysql-connector-java-6.0.5.jar

此方法方便,易管理,就是需要自己建目录。

自我管理

把jar包放在自己想放的目录,但是代价就是启动时需要通过 -C 指定jar 包位置。

在启动的时候 直接加载jar 包 ,依赖的jar 包可以用“,”号隔开

./flume-ng agent -c /opt/apps/flume-1.7.0/conf -f /opt/apps/flume-1.7.0/conf/mysqlSink.conf -n agent1 -C /opt/apps/flumeInterceptor.jar  -Dflume.root.logger=INFO,console 

此方法不太方便,需要在启动时指定位置,比较麻烦。

Flume的启动配置文件



第一次编写flume自定义组件,以此纪念。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume高并发优化——(6)开发多.. 下一篇Windows上搭建Flume运行环境

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }