设为首页 加入收藏

TOP

Flume插件开发详解--从详解插件调用到编写插件再到运行
2018-12-13 17:38:52 】 浏览:53
Tags:Flume 插件 开发 详解 用到 编写 运行
Flume插件开发
Flume的插件分为3中类型分别为Source、Channel、Sink,Source不断地产生数据往Channel中发送,Sink不断地从Channel中读取数据,这是一个完整的流程,这3中插件都是可以通过重写插件来实现的,但是我们一般只用到Source和Sink,Channel提供的功能已经可以满足我们的需求了,所以不需要重新开发。本文从插件的加载,插件的编写,再到插件的运行来详细地剖析这一个过程。

1插件的加载过程
1.1概述
Flume的插件编写好了以后,其内部是通过Class.forName(“插件完整类名”),通过多态调用接口的方式来调用插件的,在org.apache.flume.node.AbstractConfigurationProvider中加载了source、channel、sink,以下分别对Flume的插件进行分析

2 插件调用过程
2.1source插件调用过程
在 org.apache.flume.node.AbstractConfigurationProvider中的299行,有一行代码,如下

     sourceRunnerMap.put(comp.getComponentName(),SourceRunner.forSource(source));

在org.apache.flume.SourceRunner类中的forSource方法中,在加载插件后,有两种方式调用插件。代码如下

     if (source instanceof PollableSource) {
          runner = new PollableSourceRunner();
          ((PollableSourceRunner) runner).setSource((PollableSource) source);
        } else if (source instanceof EventDrivenSource) {
          runner = new EventDrivenSourceRunner();
          ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
        } else {
          throw new IllegalArgumentException("No known runner type for source "
              + source);
        }
   
        return runner;

一种是继承了EventDrivenSource类,另外一种是实现了PollableSourceRunner接口,继承了EventDrivenSource的source在启动的时候,调用start()方法,代码如下:

   public class EventDrivenSourceRunner extends SourceRunner {
    	@Override
    	public void start() {
    		Source source = getSource();
    		ChannelProcessor cp = source.getChannelProcessor();
    		cp.initialize();
    		source.start();
    		lifecycleState = LifecycleState.START;
    	}  
    }

source插件中的start()方法只会被调用一次,因此,需要另外开一个线程不断地往Flume中发送数据。
实现了PollableSourceRunner接口接口调用代码如下:

public class PollableSourceRunner extends SourceRunner {
  @Override
  public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    runner = new PollingRunner();
    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;
    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();
    lifecycleState = LifecycleState.START;
  }
}

这里会启动一个PollingRunner线程不断地运行,其中PollingRunner的主要代码如下:

  public static class PollingRunner implements Runnable {

    private PollableSource source;
    private AtomicBoolean shouldStop;
    private CounterGroup counterGroup;

    @Override
    public void run() {

      while (!shouldStop.get()) {
        counterGroup.incrementAndGet("runner.polls");
        try {
          if (source.process().equals(PollableSource.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");
            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (EventDeliveryException e) {
          counterGroup.incrementAndGet("runner.deliveryErrors");
        } catch (Exception e) {
          counterGroup.incrementAndGet("runner.errors");
          try {
            Thread.sleep(source.getMaxBackOffSleepInterval());
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
    }

  }

在返回Status=BACKOFF的状态时,PollingRunner 线程会休眠,这样实现了持续地调用
2.2 Sink调用过程
FlumeSink插件只有一种类型,那就是继承AbstractSink,AbstractSink中的start方法启动了一个PollingRunner线程不断地调用process方法,代码如下:

 public static class PollingRunner implements Runnable {

    private SinkProcessor policy;
    private AtomicBoolean shouldStop;
    private CounterGroup counterGroup;

    @Override
    public void run() {
      while (!shouldStop.get()) {
        try {
          if (policy.process().equals(Sink.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");
            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (Exception e) {
          if (e instanceof EventDeliveryException) {
            counterGroup.incrementAndGet("runner.deliveryErrors");
          } else {
            counterGroup.incrementAndGet("runner.errors");
          }
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

  }

以上是对source、sink插件地调用过程进行分析,然后下一个小节会对两种插件地编写进行详细讲解

3 插件编写

3.1source插件编写,source插件有两种类型,一种是继承 AbstractSource并且实现 Configurable、EventDrivenSource接口,新建一个线程不断地往Channel中发送数据。另外一种类型是:继承AbstractSource,实现Configurable、PollableSource接口。实现地简要代码如下:
方式一:

public class SpoolDirectorySource extends AbstractSource
    implements Configurable, EventDrivenSource{
 
 	@Override
 	public synchronized void configure(Context context) {
	}
	
	 @Override
	 public synchronized void start() {
	 //新建线程,调用getChannelProcessor().processEventBatch(events)往channel发送数据
	 }
	 
	 @Override
	 public synchronized void stop() {
	  }
	  
}

方式二:

pubic class testsource extends AbstractSource implements Configurable,PollableSource{

	@Override
 	public synchronized void configure(Context context) {
	}
	
	@Override
  	public Status process() throws EventDeliveryException {
  		//通过调用getChannelProcessor().processEventBatch(events)往channel发送数据
  		return Status.BACKOFF
  	}
  	
  	@Override
	public synchronized void start() {
	 }
	 
	@Override
	public synchronized void stop() {
	 }
	 
}

3.2Sink插件编写
Sink插件只有一种类型,继承AbstractSink和实现Configurable接口

public class LoggerSink  extends AbstractSink implements Configurable{
	
	@Override
 	public synchronized void configure(Context context) {
	}
	
	@Override
  	public Status process() throws EventDeliveryException {
		Channel channel = getChannel();
		Transaction transaction = channel.getTransaction();
		Event event = null;
		try{
			transaction.begin();
			for(loop){
				event = channel.take();
			}
     		transaction.commit();
		}catch(Exception e){
			transaction.rollback();
			throw new EventDeliveryException("Failed to log event: " + event, ex);
		}finally{
			transaction.close();
		}
		
  		return Status.READY
  	}
  	
  	@Override
	public synchronized void start() {
	 }
	 
	@Override
	public synchronized void stop() {
	 }
 }

4插件的运行

把插件打包成jar包以后,把其依赖库放到Flume的lib目录下,然后把配置文件修改成插件的对应类型就可以运行了
Flume插件配置

需要修改的插件类型主要是type字段,把它改成插件的类名即可。

到这里Flume插件的开发就介绍完毕了,当然,Flume这种使用情况一般很少用到,都是通过Java集成Flume来使用的,但是插件的开发没有发生改变,还是一样按照这种情况来使用。
有什么不明白的,可以该我留言,有空我会给大家回复的

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Windows64环境下   使用Flum.. 下一篇flume   三大核心组件

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目