设为首页 加入收藏

TOP

flume高并发优化——(6)开发多文件检索source插件
2019-02-11 14:11:05 】 浏览:111
Tags:flume 并发 优化 开发 文件 检索 source 插件
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xvshu/article/details/51306413

最近遇到一个问题,就是我们在exec的命令行中输入 “tail -f /export/home/tomcat/logs/*/*.log” ,没有任何数据录入,为了解决这个问题,我们自己开发了一个flume数据接收端,主要实现的功能是通过检测一个问价夹下所有文件,通过多线程将每个文件通过tail 命令读取到channel中。


1,实现思路




2,flume规则

自定义flume source结构

public class SourceName extends AbstractSource implements EventDrivenSource,
        Configurable {
		
	// 获取配置
    @Override
    public void configure(Context context) {
 
    }
 
	//开始收集数据
    @Override
    public synchronized void start() {
 
    }
 
	//结束收集数据
    @Override
    public synchronized void stop() {
 
    }
}

或者

public class SourceName extends AbstractSource implements Configurable, 
                                PollableSource {
		
	// 获取配置
    @Override
    public void configure(Context context)throws EventDeliveryException {
		return null;
    }
 
	//开始收集数据
    @Override
    public synchronized void start() {
 
    }
 
	//结束收集数据
    @Override
    public synchronized void stop() {
 
    }
}



自定义fluem sink结构

public class SinkName extends AbstractSink implements Configurable {
  private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
  Context c;
  
  @Override
  public void configure(Context arg0) {
    this.c = arg0;
  }
  
  //循环调用输出数据
  @Override
  public Status process() throws EventDeliveryException {
    return Status.READY;
  }
  
  //输出流开始(调用一次)
  @Override
  public synchronized void start() {
    super.start();
  }
  
  //输出流结束(调用一次)
  @Override
  public synchronized void stop() {
    super.stop();
  }
}


3,实现

/*
 * 作者:许恕
 * 时间:2016年5月3日
 * 功能:实现tail 某目录下的所有符合正则条件的文件
 * Email:xvshu1@163.com
 * To detect all files in a folder
 */

package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.tools.HostUtils;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 *  step:
 *    1,config one path
 *    2,find all file with RegExp
 *    3,tail one children file
 *    4,batch to channal
 *
 *  demo:
 *    demo.sources.s1.type = org.apache.flume.source.ExecTailSource
 *    demo.sources.s1.filepath=/export/home/tomcat/logs/
 *    demo.sources.s1.filenameRegExp=(.log{1})$
 */
public class ExecTailSource extends AbstractSource implements EventDrivenSource,
Configurable {

  private static final Logger logger = LoggerFactory
      .getLogger(ExecTailSource.class);

  private SourceCounter sourceCounter;
  private ExecutorService executor;
  private List<ExecRunnable> listRuners;
  private List<Future<>> listFuture;
  private long restartThrottle;
  private boolean restart;
  private boolean logStderr;
  private Integer bufferCount;
  private long batchTimeout;
  private Charset charset;
  private String filepath;
  private String filenameRegExp;

  @Override
  public void start() {
    logger.info("ExecTail source starting with filepath:{}", filepath);

    List<String> listFiles = getFileList(filepath);
    if(listFiles==null || listFiles.isEmpty()){
      Preconditions.checkState(listFiles != null && !listFiles.isEmpty(),
              "The filepath's file not have fiels with filenameRegExp");
    }

    executor = Executors.newFixedThreadPool(listFiles.size());

    listRuners = new ArrayList<ExecRunnable>();
    listFuture = new ArrayList<Future<>>();

    logger.info("files size is {} ", listFiles.size());
    // FIXME: Use a callback-like executor / future to signal us upon failure.
    for(String oneFilePath : listFiles){
      ExecRunnable runner = new ExecRunnable(getChannelProcessor(), sourceCounter,
              restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset,oneFilePath);
      listRuners.add(runner);
      Future<> runnerFuture = executor.submit(runner);
      listFuture.add(runnerFuture);
      logger.info("{} is begin running",oneFilePath);
    }

    /*
     * NB: This comes at the end rather than the beginning of the method because
     * it sets our state to running. We want to make sure the executor is alive
     * and well first.
     */
    sourceCounter.start();
    super.start();

    logger.debug("ExecTail source started");
  }

  @Override
  public void stop() {

    if(listRuners !=null && !listRuners.isEmpty()){
      for(ExecRunnable oneRunner : listRuners){
        if(oneRunner != null) {
          oneRunner.setRestart(false);
          oneRunner.kill();
        }
      }
    }


    if(listFuture !=null && !listFuture.isEmpty()){
      for(Future<> oneFuture : listFuture){
        if (oneFuture != null) {
          logger.debug("Stopping ExecTail runner");
          oneFuture.cancel(true);
          logger.debug("ExecTail runner stopped");
        }
      }
    }

    executor.shutdown();
    while (!executor.isTerminated()) {
      logger.debug("Waiting for ExecTail executor service to stop");
      try {
        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        logger.debug("Interrupted while waiting for ExecTail executor service "
            + "to stop. Just exiting.");
        Thread.currentThread().interrupt();
      }
    }

    sourceCounter.stop();
    super.stop();


  }

  @Override
  public void configure(Context context) {

    filepath = context.getString("filepath");
    Preconditions.checkState(filepath != null,
        "The parameter filepath must be specified");
    logger.info("The parameter filepath is {}" ,filepath);

    filenameRegExp = context.getString("filenameRegExp");
    Preconditions.checkState(filenameRegExp != null,
            "The parameter filenameRegExp must be specified");
    logger.info("The parameter filenameRegExp is {}" ,filenameRegExp);

    restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
        ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);

    restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
        ExecSourceConfigurationConstants.DEFAULT_RESTART);

    logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
        ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);

    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);

    batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
        ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);

    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
        ExecSourceConfigurationConstants.DEFAULT_CHARSET));


    if (sourceCounter == null) {
      sourceCounter = new SourceCounter(getName());
    }
  }

  /**
   * 获取指定路径下的所有文件列表
   *
   * @param dir 要查找的目录
   * @return
   */
  public  List<String> getFileList(String dir) {
    List<String> listFile = new ArrayList<String>();
    File dirFile = new File(dir);
    //如果不是目录文件,则直接返回
    if (dirFile.isDirectory()) {
      //获得文件夹下的文件列表,然后根据文件类型分别处理
      File[] files = dirFile.listFiles();
      if (null != files && files.length > 0) {
        //根据时间排序
        Arrays.sort(files, new Comparator<File>() {
          public int compare(File f1, File f2) {
            return (int) (f1.lastModified() - f2.lastModified());
          }

          public boolean equals(Object obj) {
            return true;
          }
        });
        for (File file : files) {
          //如果不是目录,直接添加
          if (!file.isDirectory()) {
            String oneFileName = file.getName();
            if(match(filenameRegExp,oneFileName)){
              listFile.add(file.getAbsolutePath());
              logger.info("filename:{} is pass",oneFileName);
            }
          } else {
            //对于目录文件,递归调用
            listFile.addAll(getFileList(file.getAbsolutePath()));
          }
        }
      }
    }else{
      logger.info("FilePath:{} is not Directory",dir);
    }
    return listFile;
  }

  /**
   * @param regex
   * 正则表达式字符串
   * @param str
   * 要匹配的字符串
   * @return 如果str 符合 regex的正则表达式格式,返回true, 否则返回 false;
   */
  private boolean match(String regex, String str) {
    Pattern pattern = Pattern.compile(regex);
    Matcher matcher = pattern.matcher(str);
   return matcher.find();
  }


  private static class ExecRunnable implements Runnable {

    public ExecRunnable( ChannelProcessor channelProcessor,
        SourceCounter sourceCounter, boolean restart, long restartThrottle,
        boolean logStderr, int bufferCount, long batchTimeout, Charset charset,String filepath) {

      this.channelProcessor = channelProcessor;
      this.sourceCounter = sourceCounter;
      this.restartThrottle = restartThrottle;
      this.bufferCount = bufferCount;
      this.batchTimeout = batchTimeout;
      this.restart = restart;
      this.logStderr = logStderr;
      this.charset = charset;
      this.filepath=filepath;
      this.command = command+filepath;

    }


    private  String command="tail -f ";
    private final ChannelProcessor channelProcessor;
    private final SourceCounter sourceCounter;
    private volatile boolean restart;
    private final long restartThrottle;
    private final int bufferCount;
    private long batchTimeout;
    private final boolean logStderr;
    private final Charset charset;
    private Process process = null;
    private SystemClock systemClock = new SystemClock();
    private Long lastPushToChannel = systemClock.currentTimeMillis();
    ScheduledExecutorService timedFlushService;
    ScheduledFuture<> future;
    private String filepath;

    private static String getDomain(String filePath){
      String[] strs = filePath.split("/");
      String domain ;
      domain=strs[strs.length-2];
      if(domain==null || domain.isEmpty()){
        domain=filePath;
      }
      return domain;
    }

    @Override
    public void run() {
      do {
        String exitCode = "unknown";
        BufferedReader reader = null;
        String line = null;
        final List<Event> eventList = new ArrayList<Event>();

        timedFlushService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat(
                "timedFlushExecService" +
                Thread.currentThread().getId() + "-%d").build());
        try {
          String[] commandArgs = command.split("\\s+");
          process = new ProcessBuilder(commandArgs).start();
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));
          // StderrLogger dies as soon as the input stream is invalid
          StderrReader stderrReader = new StderrReader(new BufferedReader(
              new InputStreamReader(process.getErrorStream(), charset)), logStderr);
          stderrReader.setName("StderrReader-[" + command + "]");
          stderrReader.setDaemon(true);
          stderrReader.start();
          future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (eventList) {
                    if(!eventList.isEmpty() && timeout()) {
                      flushEventBatch(eventList);
                    }
                  }
                } catch (Exception e) {
                  logger.error("Exception occured when processing event batch", e);
                  if(e instanceof InterruptedException) {
                      Thread.currentThread().interrupt();
                  }
                }
              }
          },
          batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
          while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();

              HashMap body = new HashMap();
              body.put("context",line.toString());
              body.put("filepath", filepath);
              body.put("created", System.currentTimeMillis());
              body.put("localHostIp", HostUtils.getLocalHostIp());
              body.put("localHostName", HostUtils.getLocalHostName());
              body.put("domain", getDomain(filepath));
              body.put("command", command);
              String context = line.toString();
              String bodyjson = JSON.toString(body);
              Event oneEvent = EventBuilder.withBody(bodyjson.getBytes(charset));
              eventList.add(oneEvent);
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

          synchronized (eventList) {
              if(!eventList.isEmpty()) {
                flushEventBatch(eventList);
              }
          }
        } catch (Exception e) {
          logger.error("Failed while running command: " + command, e);
          if(e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
          }
        } finally {
          if (reader != null) {
            try {
              reader.close();
            } catch (IOException ex) {
              logger.error("Failed to close reader for ExecTail source", ex);
            }
          }
          exitCode = String.valueOf(kill());
        }
        if(restart) {
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
              exitCode);
          try {
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("Command [" + command + "] exited with " + exitCode);
        }
      } while(restart);
    }

    private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

    private boolean timeout(){
      return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
    }

    private static String[] formulateShellCommand(String shell, String command) {
      String[] shellArgs = shell.split("\\s+");
      String[] result = new String[shellArgs.length + 1];
      System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
      result[shellArgs.length] = command;
      return result;
    }

    public int kill() {
      if(process != null) {
        synchronized (process) {
          process.destroy();

          try {
            int exitValue = process.waitFor();

            // Stop the Thread that flushes periodically
            if (future != null) {
                future.cancel(true);
            }

            if (timedFlushService != null) {
              timedFlushService.shutdown();
              while (!timedFlushService.isTerminated()) {
                try {
                  timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                  logger.debug("Interrupted while waiting for ExecTail executor service "
                    + "to stop. Just exiting.");
                  Thread.currentThread().interrupt();
                }
              }
            }
            return exitValue;
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
        return Integer.MIN_VALUE;
      }
      return Integer.MIN_VALUE / 2;
    }
    public void setRestart(boolean restart) {
      this.restart = restart;
    }
  }
  private static class StderrReader extends Thread {
    private BufferedReader input;
    private boolean logStderr;

    protected StderrReader(BufferedReader input, boolean logStderr) {
      this.input = input;
      this.logStderr = logStderr;
    }



    @Override
    public void run() {
      try {
        int i = 0;
        String line = null;
        while((line = input.readLine()) != null) {
          if(logStderr) {
            // There is no need to read 'line' with a charset
            // as we do not to propagate it.
            // It is in UTF-16 and would be printed in UTF-8 format.
            logger.info("StderrLogger[{}] = '{}'", ++i, line);
          }
        }
      } catch (IOException e) {
        logger.info("StderrLogger exiting", e);
      } finally {
        try {
          if(input != null) {
            input.close();
          }
        } catch (IOException ex) {
          logger.error("Failed to close stderr reader for ExecTail source", ex);
        }
      }
    }
  }
}


总结:

在开源的路上,我们越来越多喜悦,因为我们发现我们的自我定制功能可以最大程度达到我们的要求,而且对于企业而言,依赖性变小,可控性增强,风险降低,现在我们就可以窥见,不远的将来,我们的生活会异常幸福!








】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇nginx ---->flume ----->ka.. 下一篇自定义Flume的Interceptor,编写..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目