; temp.add(new JSONObject(fields).toJSONString());
}
//3)Json obj 拼接
outputBoday=String.join("\n",temp).getBytes();
}catch (Exception e){
System.out.println("输入数据:"+inputeBody);
e.printStackTrace();
}
event.setBody(outputBoday);
return event;
}
/**
* 解析一批event
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
//输出---一批Event
ArrayList<Event> result = new ArrayList<>();
//输入---一批Event
try{
for (Event event : events) {
//一条条解析
Event interceptedEvent = intercept(event);
byte[] interceptedEventBody = interceptedEvent.getBody();
if(interceptedEventBody.length!=0){
String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);
String[] multiEventArr = multiEvent.split("\n");
for (String needEvent : multiEventArr) {
SimpleEvent simpleEvent = new SimpleEvent();
simpleEvent.setBody(needEvent.getBytes());
result.add(simpleEvent);
}
}
}
}catch (Exception e){
e.printStackTrace();
}
return result;
}
/**
* 实现内部类接口
*/
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ParseLogByRule();
}
@Override
public void configure(Context context) {
}
}
}
插件打包上传
编译打包拦截器插件,然后将打包后的插件和依赖的fastjson一起上传到flume lib目录
配置agent
TAILDIR Source ===>file Channel ===>Kafka Sink
cat agent.conf
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
&n