设为首页 加入收藏

TOP

flume自定义file sink,实现按时间创建文件夹,分隔文件以及自定义文件名
2018-12-02 18:38:03 】 浏览:155
Tags:flume 定义 file sink 实现 时间 创建 文件夹 分隔 文件 以及

1.flume版本:1.8

2.配置文件:

agent.sinks.csvSink.type = com.xxx.xxx.RollingFileSink
agent.sinks.csvSink.sink.directory = /data
#此处配置不使用flume默认的滚动
agent.sinks.csvSink.sink.rollInterval = 0
agent.sinks.csvSink.batchSize = 10000

3.pom配置:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>  
<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
</dependencies>

4.代码:

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.PathManager;
import org.apache.flume.formatter.output.PathManagerFactory;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;


public class RollingFileSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(RollingFileSink.class);
    private static final long defaultRollInterval = 30;
    private static final int defaultBatchSize = 100;

    private int batchSize = defaultBatchSize;

    private File directory;
    private long rollInterval;
    private OutputStream outputStream;
    private ScheduledExecutorService rollService;

    private String serializerType;
    private Context serializerContext;
    private SimpleDateFormat yearFormat;
    private SimpleDateFormat monthFormat;
    private SimpleDateFormat dayFormat;
    private SimpleDateFormat hourFormat;

    private EventSerializer serializer;

    private SinkCounter sinkCounter;

    private PathManager pathController;
    private String dir;
    private volatile boolean shouldRotate;

    public RollingFileSink() {
        shouldRotate = false;
    }

    @Override
    public void configure(Context context) {

        String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
        dir = context.getString("sink.directory");
        String rollInterval = context.getString("sink.rollInterval");

        serializerType = context.getString("sink.serializer", "TEXT");
        serializerContext =
                new Context(context.getSubProperties("sink." +
                        EventSerializer.CTX_PREFIX));

        Context pathManagerContext =
                new Context(context.getSubProperties("sink." +
                        PathManager.CTX_PREFIX));
        pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);

        Preconditions.checkArgument(dir != null, "Directory may not be null");
        Preconditions.checkNotNull(serializerType, "Serializer type is undefined");

        if (rollInterval == null) {
            this.rollInterval = defaultRollInterval;
        } else {
            this.rollInterval = Long.parseLong(rollInterval);
        }

        batchSize = context.getInteger("sink.batchSize", defaultBatchSize);

        this.directory = new File(dir);
        if (sinkCounter == null) {
            sinkCounter = new SinkCounter(getName());
        }

        //格式化时间
        yearFormat = new SimpleDateFormat("yyyy");
        monthFormat = new SimpleDateFormat("yyyyMM");
        dayFormat = new SimpleDateFormat("yyyyMMdd");
        hourFormat = new SimpleDateFormat("yyyyMMddHH");
    }


    @Override
    public void start() {
        logger.info("Starting {}...", this);

        sinkCounter.start();
        super.start();

        pathController.setBaseDirectory(directory);
        if (rollInterval > 0) {

            rollService = Executors.newScheduledThreadPool(
                    1,
                    new ThreadFactoryBuilder().setNameFormat(
                            "rollingFileSink-roller-" +
                                    Thread.currentThread().getId() + "-%d").build());

      /*
       * Every N seconds, mark that it's time to rotate. We purposefully do NOT
       * touch anything other than the indicator flag to avoid error handling
       * issues (e.g. IO exceptions occuring in two different threads.
       * Resist the urge to actually perform rotation in a separate thread!
       */
            rollService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    logger.debug("Marking time to rotate file {}",
                            pathController.getCurrentFile());
                    shouldRotate = true;
                }

            }, rollInterval, rollInterval, TimeUnit.SECONDS);
        } else {
            logger.info("RollInterval is not valid, file rolling will not happen.");
        }
        logger.info("RollingFileSink {} started.", getName());
    }


    @Override
    public Status process() throws EventDeliveryException {
        Date date = new Date();
        String year = yearFormat.format(date);
        String month = monthFormat.format(date);
        String day = dayFormat.format(date);
        String hour = hourFormat.format(date);

        //创建文件夹
        String dirName = dir + File.separator + year + File.separator + month + File.separator + day;
        File file = new File(dirName);
        if (!file.exists()) {
            file.mkdirs();
            logger.info("create dir" + dirName);
        }

        //创建文件
        File currentFile = new File(dirName + File.separator + "data_" + hour + ".csv");
        if (!currentFile.exists()) {
            try {
                currentFile.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


        if (outputStream == null) {

            logger.debug("Opening output stream for file {}", currentFile);
            try {
                // 注意这里的true,代表append到文件中
                outputStream = new BufferedOutputStream(
                        new FileOutputStream(currentFile,true));
                serializer = EventSerializerFactory.getInstance(
                        serializerType, serializerContext, outputStream);
                serializer.afterCreate();
                sinkCounter.incrementConnectionCreatedCount();
            } catch (IOException e) {
                sinkCounter.incrementConnectionFailedCount();
                throw new EventDeliveryException("Failed to open file "
                        + currentFile + " while delivering event", e);
            }
        }


        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        Status result = Status.READY;
        try {
            transaction.begin();
            int eventAttemptCounter = 0;
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {
                    sinkCounter.incrementEventDrainAttemptCount();
                    eventAttemptCounter++;
                    serializer.write(event);
                } else {
                    // No events found, request back-off semantics from runner
                    result = Status.BACKOFF;
                    break;
                }
            }
            serializer.flush();
            outputStream.flush();
            transaction.commit();
            sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
            // 关闭流,时间到了的时候会写入另一个文件,否则会一直写入一个文件。
            if (outputStream != null) {
                logger.debug("Closing file {}", currentFile);
                try {
                    serializer.beforeClose();
                    outputStream.close();
                    sinkCounter.incrementConnectionClosedCount();
                } catch (IOException e) {
                    sinkCounter.incrementConnectionFailedCount();
                    throw new EventDeliveryException("Unable to rotate file "
                            + currentFile + " while delivering event", e);
                } finally {
                    serializer = null;
                    outputStream = null;
                }
            }
        } catch (Exception ex) {
            transaction.rollback();
            throw new EventDeliveryException("Failed to process transaction", ex);
        } finally {
            transaction.close();
        }

        return result;
    }

    @Override
    public void stop() {
        logger.info("RollingFile sink {} stopping...", getName());
        sinkCounter.stop();
        super.stop();

        if (outputStream != null) {
            logger.debug("Closing file {}", pathController.getCurrentFile());

            try {
                serializer.flush();
                serializer.beforeClose();
                outputStream.close();
                sinkCounter.incrementConnectionClosedCount();
            } catch (IOException e) {
                sinkCounter.incrementConnectionFailedCount();
                logger.error("Unable to close output stream. Exception follows.", e);
            } finally {
                outputStream = null;
                serializer = null;
            }
        }
        if (rollInterval > 0) {
            rollService.shutdown();

            while (!rollService.isTerminated()) {
                try {
                    rollService.awaitTermination(1, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    logger.debug("Interrupted while waiting for roll service to stop. " +
                            "Please report this.", e);
                }
            }
        }
        logger.info("RollingFile sink {} stopped. Event metrics: {}",
                getName(), sinkCounter);
    }

    public File getDirectory() {
        return directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    public long getRollInterval() {
        return rollInterval;
    }

    public void setRollInterval(long rollInterval) {
        this.rollInterval = rollInterval;
    }
}

5.打包成jar包,jar包放到flume的lib目录下

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume 1.5.2 日志中出现agent-sh.. 下一篇Flume Sink Group

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目