基于需求,自己写了一个Flume的Interceptor,主要需求如下:
kafka中数据格式为json字符串,需要利用Flume消费kafka中的数据,并按照指定的key的顺序,将value输出,并用指定分隔符分隔
输入:String JsonString = "{'key1':'a','key2':'b','key3':'c','key4':'d','key5':'e','key6':'f'}";
指定的key的顺序:String[] keys = {"key1","key2","key3","key4","key5","key6"};
输出:a\u0001b\u0001c\u0001d\u0001e\u0001f
具体实现代码如下:
1、Interceptor代码:
package com.bonc.flumeInterceptor;
import java.util.HashMap;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.bonc.utils.Json2Map;
import com.bonc.utils.StringUnicodeUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
*
* @author pengzhe
* 自定义的一个flume拦截器,用来将json字符串转化成map,再按照指定顺序将map中的value按照指定格式输出
* 例子:
* String[] keys = {"key1","key2","key3","key4","key5","key6"};
输入:String JsonString = "{'key1':'a','key2':'b','key3':'c','key4':'d','key5':'e','key6':'f'}";
输出:a\u0001b\u0001c\u0001d\u0001e\u0001f
*
*/
public class FlumeJsonInterceptor implements Interceptor{
@Override
public void close() {
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//以keys数组中的元素顺序,输出jsonMap中对应key的value的值
String[] keys = {"key1","key2","key3","key4","key5","key6"};
String JsonString = new String(event.getBody(),Charsets.UTF_8);
//将jsonString转化为jsonMap
HashMap jsonMap = (HashMap) Json2Map.json2Map(JsonString);
System.out.println(jsonMap);
//将\u0001转化成字符串,用于拼接
String separatorStr = StringUnicodeUtil.unicode2String("\u0001");
//声明一个stringBuffer,来拼接字符串
StringBuffer finalStr = new StringBuffer();
//按keys数组的顺序输出jsonMap中的value
for (String keyStr : keys) {
String jsonValue = (String) jsonMap.get(keyStr);
finalStr.append(jsonValue).append(separatorStr);
}
event.setBody(finalStr.toString().getBytes());
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
public static class Builder implements Interceptor.Builder{
@Override
public void configure(Context context) {
}
@Override
public Interceptor build() {
return new FlumeJsonInterceptor();
}
}
}
2、JsonToMap代码
package com.bonc.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
public class Json2Map {
/**
* 将json字符串转为Map结构
* 如果json复杂,结果可能是map嵌套map
* @param jsonStr 入参,json格式字符串
* @return 返回一个map
*/
public static Map<String, Object> json2Map(String jsonStr) {
Map<String, Object> map = new HashMap<>();
if(jsonStr != null && !"".equals(jsonStr)){
//最外层解析
JSONObject json = JSONObject.fromObject(jsonStr);
for (Object k : json.keySet()) {
Object v = json.get(k);
//如果内层还是数组的话,继续解析
if (v instanceof JSONArray) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
Iterator<JSONObject> it = ((JSONArray) v).iterator();
while (it.hasNext()) {
JSONObject json2 = it.next();
list.add(json2Map(json2.toString()));
}
map.put(k.toString(), list);
} else {
map.put(k.toString(), v);
}
}
return map;
}else{
return null;
}
}
}
3、UNICODE与String相互转化代码
package com.bonc.utils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* <p>Title: String 与 Unicode 互相转换的工具类</p>
* <p>Description: </p>
* <p>Company: SCAU@Copyright</p>
* @Copyright 1.0
* @author jodenhe (824923282@qq.com)
* @version 1.0
* @since 2017年8月17日 下午9:42:50
*/
public class StringUnicodeUtil {
/**
* 含有unicode 的字符串转一般字符串
* @param unicodeStr 混有 Unicode 的字符串
* @return
*/
public static String unicodeStr2String(String unicodeStr) {
int length = unicodeStr.length();
int count = 0;
//正则匹配条件,可匹配“\\u”1到4位,一般是4位可直接使用 String regex = "\\\\u[a-f0-9A-F]{4}";
String regex = "\\\\u[a-f0-9A-F]{1,4}";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(unicodeStr);
StringBuffer sb = new StringBuffer();
while(matcher.find()) {
String oldChar = matcher.group();//原本的Unicode字符
String newChar = unicode2String(oldChar);//转换为普通字符
int index = unicodeStr.indexOf(oldChar);
sb.append(unicodeStr.substring(count, index));//添加前面不是unicode的字符
sb.append(newChar);//添加转换后的字符
count = index+oldChar.length();//统计下标移动的位置
}
sb.append(unicodeStr.substring(count, length));//添加末尾不是Unicode的字符
return sb.toString();
}
/**
* 字符串转换unicode
* @param string
* @return
*/
public static String string2Unicode(String string) {
StringBuffer unicode = new StringBuffer();
for (int i = 0; i < string.length(); i++) {
// 取出每一个字符
char c = string.charAt(i);
// 转换为unicode
unicode.append("\\u" + Integer.toHexString(c));
}
return unicode.toString();
}
/**
* unicode 转字符串
* @param unicode 全为 Unicode 的字符串
* @return
*/
public static String unicode2String(String unicode) {
StringBuffer string = new StringBuffer();
String[] hex = unicode.split("\\\\u");
for (int i = 1; i < hex.length; i++) {
// 转换出每一个代码点
int data = Integer.parseInt(hex[i], 16);
// 追加成string
string.append((char) data);
}
return string.toString();
}
}
配置使用:
1、将以上代码打成jar包
此处只需要将src目录下的文件打成jar包即可
鼠标指向src目录->右键选择export->选择java目录下的JARfile->点击next->指定保存目录->finish
2、flume自定义组件的jar包存放问题
作者:双斜杠少年
来源:CSDN
原文:https://blog.csdn.net/u012373815/article/details/54352177
版权声明:本文为博主原创文章,转载请附上博文链接!
自定义flume 组建后,将项目打成jar 包,关于这个jar 包的管理和使用这里列举了三中方式。这三种方式都可以,随自己喜好使用。
lib 目录
将maven项目打成jar包,将jar 包放到flume的lib 目录下。
此方法简单粗暴,缺点就是jar 包不易管理
官方给定目录
官方建议在flume的 plugins.d (plugins.d 目录需要自己创建)目录下创建 一个自己定义的目录,在自定义的目录下新建 lib 和 libext 文件夹,lib 文件夹为放自定义组件的jar包,libext 文件夹下放 自定义组件的依赖包。
flume-1.7.0/plugins.d/
flume-1.7.0/plugins.d/custom-MysqlSink/
flume-1.7.0/plugins.d/custom-MysqlSink/lib/mysql-sink.jar
flume-1.7.0/plugins.d/custom-MysqlSink/libext/mysql-connector-java-6.0.5.jar
此方法方便,易管理,就是需要自己建目录。
自我管理
把jar包放在自己想放的目录,但是代价就是启动时需要通过 -C 指定jar 包位置。
在启动的时候 直接加载jar 包 ,依赖的jar 包可以用“,”号隔开
./flume-ng agent -c /opt/apps/flume-1.7.0/conf -f /opt/apps/flume-1.7.0/conf/mysqlSink.conf -n agent1 -C /opt/apps/flumeInterceptor.jar -Dflume.root.logger=INFO,console
此方法不太方便,需要在启动时指定位置,比较麻烦。
Flume的启动配置文件
第一次编写flume自定义组件,以此纪念。