最近遇到一个问题,就是我们在exec的命令行中输入 “tail -f /export/home/tomcat/logs/*/*.log” ,没有任何数据录入,为了解决这个问题,我们自己开发了一个flume数据接收端,主要实现的功能是通过检测一个问价夹下所有文件,通过多线程将每个文件通过tail 命令读取到channel中。
1,实现思路
2,flume规则
自定义flume source结构
public class SourceName extends AbstractSource implements EventDrivenSource,
Configurable {
// 获取配置
@Override
public void configure(Context context) {
}
//开始收集数据
@Override
public synchronized void start() {
}
//结束收集数据
@Override
public synchronized void stop() {
}
}
或者public class SourceName extends AbstractSource implements Configurable,
PollableSource {
// 获取配置
@Override
public void configure(Context context)throws EventDeliveryException {
return null;
}
//开始收集数据
@Override
public synchronized void start() {
}
//结束收集数据
@Override
public synchronized void stop() {
}
}
自定义fluem sink结构
public class SinkName extends AbstractSink implements Configurable {
private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
Context c;
@Override
public void configure(Context arg0) {
this.c = arg0;
}
//循环调用输出数据
@Override
public Status process() throws EventDeliveryException {
return Status.READY;
}
//输出流开始(调用一次)
@Override
public synchronized void start() {
super.start();
}
//输出流结束(调用一次)
@Override
public synchronized void stop() {
super.stop();
}
}
3,实现
/*
* 作者:许恕
* 时间:2016年5月3日
* 功能:实现tail 某目录下的所有符合正则条件的文件
* Email:xvshu1@163.com
* To detect all files in a folder
*/
package org.apache.flume.source;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.tools.HostUtils;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* step:
* 1,config one path
* 2,find all file with RegExp
* 3,tail one children file
* 4,batch to channal
*
* demo:
* demo.sources.s1.type = org.apache.flume.source.ExecTailSource
* demo.sources.s1.filepath=/export/home/tomcat/logs/
* demo.sources.s1.filenameRegExp=(.log{1})$
*/
public class ExecTailSource extends AbstractSource implements EventDrivenSource,
Configurable {
private static final Logger logger = LoggerFactory
.getLogger(ExecTailSource.class);
private SourceCounter sourceCounter;
private ExecutorService executor;
private List<ExecRunnable> listRuners;
private List<Future<>> listFuture;
private long restartThrottle;
private boolean restart;
private boolean logStderr;
private Integer bufferCount;
private long batchTimeout;
private Charset charset;
private String filepath;
private String filenameRegExp;
@Override
public void start() {
logger.info("ExecTail source starting with filepath:{}", filepath);
List<String> listFiles = getFileList(filepath);
if(listFiles==null || listFiles.isEmpty()){
Preconditions.checkState(listFiles != null && !listFiles.isEmpty(),
"The filepath's file not have fiels with filenameRegExp");
}
executor = Executors.newFixedThreadPool(listFiles.size());
listRuners = new ArrayList<ExecRunnable>();
listFuture = new ArrayList<Future<>>();
logger.info("files size is {} ", listFiles.size());
// FIXME: Use a callback-like executor / future to signal us upon failure.
for(String oneFilePath : listFiles){
ExecRunnable runner = new ExecRunnable(getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset,oneFilePath);
listRuners.add(runner);
Future<> runnerFuture = executor.submit(runner);
listFuture.add(runnerFuture);
logger.info("{} is begin running",oneFilePath);
}
/*
* NB: This comes at the end rather than the beginning of the method because
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
sourceCounter.start();
super.start();
logger.debug("ExecTail source started");
}
@Override
public void stop() {
if(listRuners !=null && !listRuners.isEmpty()){
for(ExecRunnable oneRunner : listRuners){
if(oneRunner != null) {
oneRunner.setRestart(false);
oneRunner.kill();
}
}
}
if(listFuture !=null && !listFuture.isEmpty()){
for(Future<> oneFuture : listFuture){
if (oneFuture != null) {
logger.debug("Stopping ExecTail runner");
oneFuture.cancel(true);
logger.debug("ExecTail runner stopped");
}
}
}
executor.shutdown();
while (!executor.isTerminated()) {
logger.debug("Waiting for ExecTail executor service to stop");
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for ExecTail executor service "
+ "to stop. Just exiting.");
Thread.currentThread().interrupt();
}
}
sourceCounter.stop();
super.stop();
}
@Override
public void configure(Context context) {
filepath = context.getString("filepath");
Preconditions.checkState(filepath != null,
"The parameter filepath must be specified");
logger.info("The parameter filepath is {}" ,filepath);
filenameRegExp = context.getString("filenameRegExp");
Preconditions.checkState(filenameRegExp != null,
"The parameter filenameRegExp must be specified");
logger.info("The parameter filenameRegExp is {}" ,filenameRegExp);
restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);
restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
ExecSourceConfigurationConstants.DEFAULT_RESTART);
logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);
charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
ExecSourceConfigurationConstants.DEFAULT_CHARSET));
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
/**
* 获取指定路径下的所有文件列表
*
* @param dir 要查找的目录
* @return
*/
public List<String> getFileList(String dir) {
List<String> listFile = new ArrayList<String>();
File dirFile = new File(dir);
//如果不是目录文件,则直接返回
if (dirFile.isDirectory()) {
//获得文件夹下的文件列表,然后根据文件类型分别处理
File[] files = dirFile.listFiles();
if (null != files && files.length > 0) {
//根据时间排序
Arrays.sort(files, new Comparator<File>() {
public int compare(File f1, File f2) {
return (int) (f1.lastModified() - f2.lastModified());
}
public boolean equals(Object obj) {
return true;
}
});
for (File file : files) {
//如果不是目录,直接添加
if (!file.isDirectory()) {
String oneFileName = file.getName();
if(match(filenameRegExp,oneFileName)){
listFile.add(file.getAbsolutePath());
logger.info("filename:{} is pass",oneFileName);
}
} else {
//对于目录文件,递归调用
listFile.addAll(getFileList(file.getAbsolutePath()));
}
}
}
}else{
logger.info("FilePath:{} is not Directory",dir);
}
return listFile;
}
/**
* @param regex
* 正则表达式字符串
* @param str
* 要匹配的字符串
* @return 如果str 符合 regex的正则表达式格式,返回true, 否则返回 false;
*/
private boolean match(String regex, String str) {
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(str);
return matcher.find();
}
private static class ExecRunnable implements Runnable {
public ExecRunnable( ChannelProcessor channelProcessor,
SourceCounter sourceCounter, boolean restart, long restartThrottle,
boolean logStderr, int bufferCount, long batchTimeout, Charset charset,String filepath) {
this.channelProcessor = channelProcessor;
this.sourceCounter = sourceCounter;
this.restartThrottle = restartThrottle;
this.bufferCount = bufferCount;
this.batchTimeout = batchTimeout;
this.restart = restart;
this.logStderr = logStderr;
this.charset = charset;
this.filepath=filepath;
this.command = command+filepath;
}
private String command="tail -f ";
private final ChannelProcessor channelProcessor;
private final SourceCounter sourceCounter;
private volatile boolean restart;
private final long restartThrottle;
private final int bufferCount;
private long batchTimeout;
private final boolean logStderr;
private final Charset charset;
private Process process = null;
private SystemClock systemClock = new SystemClock();
private Long lastPushToChannel = systemClock.currentTimeMillis();
ScheduledExecutorService timedFlushService;
ScheduledFuture<> future;
private String filepath;
private static String getDomain(String filePath){
String[] strs = filePath.split("/");
String domain ;
domain=strs[strs.length-2];
if(domain==null || domain.isEmpty()){
domain=filePath;
}
return domain;
}
@Override
public void run() {
do {
String exitCode = "unknown";
BufferedReader reader = null;
String line = null;
final List<Event> eventList = new ArrayList<Event>();
timedFlushService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(
"timedFlushExecService" +
Thread.currentThread().getId() + "-%d").build());
try {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(new BufferedReader(
new InputStreamReader(process.getErrorStream(), charset)), logStderr);
stderrReader.setName("StderrReader-[" + command + "]");
stderrReader.setDaemon(true);
stderrReader.start();
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
HashMap body = new HashMap();
body.put("context",line.toString());
body.put("filepath", filepath);
body.put("created", System.currentTimeMillis());
body.put("localHostIp", HostUtils.getLocalHostIp());
body.put("localHostName", HostUtils.getLocalHostName());
body.put("domain", getDomain(filepath));
body.put("command", command);
String context = line.toString();
String bodyjson = JSON.toString(body);
Event oneEvent = EventBuilder.withBody(bodyjson.getBytes(charset));
eventList.add(oneEvent);
if(eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
synchronized (eventList) {
if(!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Failed while running command: " + command, e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
logger.error("Failed to close reader for ExecTail source", ex);
}
}
exitCode = String.valueOf(kill());
}
if(restart) {
logger.info("Restarting in {}ms, exit code {}", restartThrottle,
exitCode);
try {
Thread.sleep(restartThrottle);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
logger.info("Command [" + command + "] exited with " + exitCode);
}
} while(restart);
}
private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
private boolean timeout(){
return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}
private static String[] formulateShellCommand(String shell, String command) {
String[] shellArgs = shell.split("\\s+");
String[] result = new String[shellArgs.length + 1];
System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
result[shellArgs.length] = command;
return result;
}
public int kill() {
if(process != null) {
synchronized (process) {
process.destroy();
try {
int exitValue = process.waitFor();
// Stop the Thread that flushes periodically
if (future != null) {
future.cancel(true);
}
if (timedFlushService != null) {
timedFlushService.shutdown();
while (!timedFlushService.isTerminated()) {
try {
timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for ExecTail executor service "
+ "to stop. Just exiting.");
Thread.currentThread().interrupt();
}
}
}
return exitValue;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return Integer.MIN_VALUE;
}
return Integer.MIN_VALUE / 2;
}
public void setRestart(boolean restart) {
this.restart = restart;
}
}
private static class StderrReader extends Thread {
private BufferedReader input;
private boolean logStderr;
protected StderrReader(BufferedReader input, boolean logStderr) {
this.input = input;
this.logStderr = logStderr;
}
@Override
public void run() {
try {
int i = 0;
String line = null;
while((line = input.readLine()) != null) {
if(logStderr) {
// There is no need to read 'line' with a charset
// as we do not to propagate it.
// It is in UTF-16 and would be printed in UTF-8 format.
logger.info("StderrLogger[{}] = '{}'", ++i, line);
}
}
} catch (IOException e) {
logger.info("StderrLogger exiting", e);
} finally {
try {
if(input != null) {
input.close();
}
} catch (IOException ex) {
logger.error("Failed to close stderr reader for ExecTail source", ex);
}
}
}
}
}
总结:
在开源的路上,我们越来越多喜悦,因为我们发现我们的自我定制功能可以最大程度达到我们的要求,而且对于企业而言,依赖性变小,可控性增强,风险降低,现在我们就可以窥见,不远的将来,我们的生活会异常幸福!