Flume插件开发
Flume的插件分为3中类型分别为Source、Channel、Sink,Source不断地产生数据往Channel中发送,Sink不断地从Channel中读取数据,这是一个完整的流程,这3中插件都是可以通过重写插件来实现的,但是我们一般只用到Source和Sink,Channel提供的功能已经可以满足我们的需求了,所以不需要重新开发。本文从插件的加载,插件的编写,再到插件的运行来详细地剖析这一个过程。
1插件的加载过程
1.1概述
Flume的插件编写好了以后,其内部是通过Class.forName(“插件完整类名”),通过多态调用接口的方式来调用插件的,在org.apache.flume.node.AbstractConfigurationProvider中加载了source、channel、sink,以下分别对Flume的插件进行分析
2 插件调用过程
2.1source插件调用过程
在 org.apache.flume.node.AbstractConfigurationProvider中的299行,有一行代码,如下
sourceRunnerMap.put(comp.getComponentName(),SourceRunner.forSource(source));
在org.apache.flume.SourceRunner类中的forSource方法中,在加载插件后,有两种方式调用插件。代码如下
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
一种是继承了EventDrivenSource类,另外一种是实现了PollableSourceRunner接口,继承了EventDrivenSource的source在启动的时候,调用start()方法,代码如下:
public class EventDrivenSourceRunner extends SourceRunner {
@Override
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
lifecycleState = LifecycleState.START;
}
}
source插件中的start()方法只会被调用一次,因此,需要另外开一个线程不断地往Flume中发送数据。
实现了PollableSourceRunner接口接口调用代码如下:
public class PollableSourceRunner extends SourceRunner {
@Override
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
runner = new PollingRunner();
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
}
这里会启动一个PollingRunner线程不断地运行,其中PollingRunner的主要代码如下:
public static class PollingRunner implements Runnable {
private PollableSource source;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;
@Override
public void run() {
while (!shouldStop.get()) {
counterGroup.incrementAndGet("runner.polls");
try {
if (source.process().equals(PollableSource.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
try {
Thread.sleep(source.getMaxBackOffSleepInterval());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
}
在返回Status=BACKOFF的状态时,PollingRunner 线程会休眠,这样实现了持续地调用
2.2 Sink调用过程
FlumeSink插件只有一种类型,那就是继承AbstractSink,AbstractSink中的start方法启动了一个PollingRunner线程不断地调用process方法,代码如下:
public static class PollingRunner implements Runnable {
private SinkProcessor policy;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;
@Override
public void run() {
while (!shouldStop.get()) {
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
counterGroup.incrementAndGet("runner.interruptions");
} catch (Exception e) {
if (e instanceof EventDeliveryException) {
counterGroup.incrementAndGet("runner.deliveryErrors");
} else {
counterGroup.incrementAndGet("runner.errors");
}
try {
Thread.sleep(maxBackoffSleep);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}
}
以上是对source、sink插件地调用过程进行分析,然后下一个小节会对两种插件地编写进行详细讲解
3 插件编写
3.1source插件编写,source插件有两种类型,一种是继承 AbstractSource并且实现 Configurable、EventDrivenSource接口,新建一个线程不断地往Channel中发送数据。另外一种类型是:继承AbstractSource,实现Configurable、PollableSource接口。实现地简要代码如下:
方式一:
public class SpoolDirectorySource extends AbstractSource
implements Configurable, EventDrivenSource{
@Override
public synchronized void configure(Context context) {
}
@Override
public synchronized void start() {
//新建线程,调用getChannelProcessor().processEventBatch(events)往channel发送数据
}
@Override
public synchronized void stop() {
}
}
方式二:
pubic class testsource extends AbstractSource implements Configurable,PollableSource{
@Override
public synchronized void configure(Context context) {
}
@Override
public Status process() throws EventDeliveryException {
//通过调用getChannelProcessor().processEventBatch(events)往channel发送数据
return Status.BACKOFF
}
@Override
public synchronized void start() {
}
@Override
public synchronized void stop() {
}
}
3.2Sink插件编写
Sink插件只有一种类型,继承AbstractSink和实现Configurable接口
public class LoggerSink extends AbstractSink implements Configurable{
@Override
public synchronized void configure(Context context) {
}
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try{
transaction.begin();
for(loop){
event = channel.take();
}
transaction.commit();
}catch(Exception e){
transaction.rollback();
throw new EventDeliveryException("Failed to log event: " + event, ex);
}finally{
transaction.close();
}
return Status.READY
}
@Override
public synchronized void start() {
}
@Override
public synchronized void stop() {
}
}
4插件的运行
把插件打包成jar包以后,把其依赖库放到Flume的lib目录下,然后把配置文件修改成插件的对应类型就可以运行了
需要修改的插件类型主要是type字段,把它改成插件的类名即可。
到这里Flume插件的开发就介绍完毕了,当然,Flume这种使用情况一般很少用到,都是通过Java集成Flume来使用的,但是插件的开发没有发生改变,还是一样按照这种情况来使用。
有什么不明白的,可以该我留言,有空我会给大家回复的