设为首页 加入收藏

TOP

Flume开发笔记一
2019-04-24 02:10:41 】 浏览:98
Tags:Flume 开发 笔记

需求:通过配置参数,生成fume配置文件,并且上传至目标服务器,且执行flume命令,完成flume日志采集工作。

1、生成配置文件(使用flume+hbase)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /usr/logTest/1.log
a1.sources.r1.checkperiodic = 1000
a1.sources.r1.channels =c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

a1.sinks.k1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
a1.sinks.k1.channel = c1
a1.sinks.k1.table = flumeLog
a1.sinks.k1.columnFamily = log
a1.sinks.k1.serializer.columns = logTime,logLevel,logContent
a1.sinks.k1.serializer = com.pactera.test.serializer.AsyncHbaseLogEventSerializer

2、实现AsyncHbaseEventSerializer自定义HbaseSink类

public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer {
	private byte[] table;
	private byte[] cf;
	private byte[][] logBytes;
	private byte[][] columnBytes;
	private byte[] incrementColumn;  
	private String rowSuffix;
	private String rowPrefixCol;
	private byte[] incrementRow;
	private KeyType keyType;
	private int i=0;
	Logger log = LoggerFactory.getLogger(AsyncHbaseLogEventSerializer.class);
	public void initialize(byte[] table, byte[] columnFamily) {
		log.info("initialize执行顺序是"+(++i)+"\n");
		this.table = table;
		this.cf = columnFamily;
		log.info("table执行顺序是"+table+"cf"+columnFamily+"\n");

	}
	
	public void configure(Context context) {
		log.info("configure执行顺序是"+(++i)+"\n");

		String hbColumns = context.getString("columns", "pCol");
		rowPrefixCol = context.getString("rowPrefixCol", "logTime");//主键前缀 一般是mac地址
		String suffix = context.getString("suffix", "uuid"); //后缀
		if (hbColumns != null && !hbColumns.isEmpty()) {
			if (suffix.equals("timestamp")) {
				keyType = KeyType.TS;
			} else if (suffix.equals("random")) {
				keyType = KeyType.RANDOM;
			} else if (suffix.equals("nano")) {
				keyType = KeyType.TSNANO;
			} else {
				keyType = KeyType.UUID;
			}

			// 从配置文件中读出column。
			String[] pCols = hbColumns.replace(" ", "").split(",");
			columnBytes = new byte[pCols.length][];
			for (int i = 0; i < pCols.length; i++) {
				// 列名转为小写
				columnBytes[i] = pCols[i].getBytes(
						Charsets.UTF_8);
			}
		}

	} 
	
	public List<PutRequest> getActions() {
		log.info("getActions执行顺序是"+(++i)+"\n");

		List<PutRequest> actions = new ArrayList<PutRequest>();
		if (columnBytes != null) {
			byte[] rowKey;
			try {
				switch (keyType) {
				case TS:
					rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);
					break;
				case TSNANO:
					rowKey = SimpleRowKeyGenerator
							.getNanoTimestampKey(rowSuffix);
					break;
				case RANDOM:
					rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);
					break;
				default:
					rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);
					break;
				}

				// for 循环,提交所有列和对于数据的put请求。
				for (int i = 0; i < this.logBytes.length; i++) {
					PutRequest putRequest = new PutRequest(table, rowKey, cf,
							columnBytes[i], logBytes[i]);
					actions.add(putRequest);
				}

			} catch (Exception e) {
				throw new FlumeException("Could not get row key!", e);
			}
		}
		return actions;
	}

	public List<AtomicIncrementRequest> getIncrements() {
		log.info("getIncrements执行顺序是"+(++i)+"\n");

		List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
		if (incrementColumn != null) {
			AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
					incrementRow, cf, incrementColumn);
			actions.add(inc);
		}
		return actions;
	}

	public void cleanUp() {
		// TODO Auto-generated method stub

	}

	

	public void setEvent(Event event) {
		log.info("setEvent执行顺序是"+(++i)+"\n");

		String strBody = new String(event.getBody());
		//通过日志内容获取日志实体
		log.info("strBody"+strBody);

		LogEntity logEntity = LogParser.getLogParser(strBody);
		log.info(logEntity.toString());
		if(logEntity.toString()!=null){
			String[] subBody = {logEntity.getLogTime(),logEntity.getLogLevel(),logEntity.getLogContent()};
			log.info("setEvent"+subBody[0]);

			if (subBody.length == this.columnBytes.length) {
				this.logBytes = new byte[subBody.length][];
				for (int i = 0; i < subBody.length; i++) {
					this.logBytes[i] = subBody[i].getBytes(Charsets.UTF_8);
					if ((new String(this.columnBytes[i])
							.equals(this.rowPrefixCol))) {
						// rowkey 前缀是某一列的值, 默认情况是mac地址
						this.rowSuffix = subBody[i];
					}
				}
			}
		}
		//String[] subBody = strBody.split(this.columnNamesSplit);
		
	}

	public void configure(ComponentConfiguration conf) {
		// TODO Auto-generated method stub
		
	}

}

3、如何将文件上传至服务器

我这里使用sftp,将自定义的hbaseSink和agent.conf传至目标服务器文件夹下。

package com.pactera.bi.app.utils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.Vector;

import org.apache.commons.io.IOUtils;

import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;

public class JSchUtils {
    private static JSch jsch;
    private static Session session;

    /**
     * 连接到指定的IP
     * 
     * @throws JSchException
     */
    public static void connect(String user, String passwd, String host, int port) throws JSchException {
        jsch = new JSch();// 创建JSch对象
        session = jsch.getSession(user, host, port);// 根据用户名、主机ip、端口号获取一个Session对象
        session.setPassword(passwd);// 设置密码

        Properties config = new Properties();
        config.put("StrictHostKeyChecking", "no");
        session.setConfig(config);// 为Session对象设置properties
        session.setTimeout(5000);// 设置超时
        session.connect();// 通过Session建立连接
    }

    /**
     * 关闭连接
     */
    public static void close() {
        session.disconnect();
    }

    /**
     * 执行相关的命令
     * 
     * @throws JSchException
     */
    public static String execCmd(String command) throws JSchException {
        Channel channel = null;
        try {
            if (command != null) {
                channel = session.openChannel("exec");
                ((ChannelExec) channel).setCommand(command);
                // ((ChannelExec) channel).setErrStream(System.err);
                channel.connect();
                InputStream in = channel.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));  
                String buf = null;  
                StringBuffer sb = new StringBuffer();  
                while ((buf = reader.readLine()) != null) {  
                    sb.append(buf);  
                }  
               /* reader.close();  
                String out = IOUtils.toString(in, "UTF-8");*/
                System.out.println(sb);
                return sb.toString();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (JSchException e) {
            e.printStackTrace();
        } finally {
            channel.disconnect();
        }
        return null;
    }

    /**
     * 上传文件
     *
     * @param directory
     *            上传的目录
     * @param uploadFile
     *            要上传的文件
     * @param sftp
     * @throws JSchException
     * @throws SftpException
     * @throws FileNotFoundException
     */
    public static void upload(String directory, String uploadFile) throws SftpException, JSchException, FileNotFoundException {
        ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");
        channelSftp.connect();
        try {
			channelSftp.cd(directory);
		} catch (SftpException e1) {
			channelSftp.mkdir(directory);
		}
        File file = new File(uploadFile);
		channelSftp.put(new FileInputStream(file), file.getName());
		
        System.out.println("Upload Success!");
    }

    /**
     * 下载文件
     * 
     * @param src
     * @param dst
     * @throws JSchException
     * @throws SftpException
     */
    public static void download(String src, String dst) throws JSchException, SftpException {
        // src linux服务器文件地址,dst 本地存放地址
        ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");
        channelSftp.connect();
        channelSftp.get(src, dst);
        channelSftp.quit();
    }

    /**
     * 删除文件
     *
     * @param directory
     *            要删除文件所在目录
     * @param deleteFile
     *            要删除的文件
     * @param sftp
     * @throws SftpException
     * @throws JSchException
     */
    public void delete(String directory, String deleteFile) throws SftpException, JSchException {
        ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");
        channelSftp.connect();
        channelSftp.cd(directory);
        channelSftp.rm(deleteFile);
    }

    /**
     * 列出目录下的文件
     *
     * @param directory
     *            要列出的目录
     * @param sftp
     * @return
     * @throws SftpException
     * @throws JSchException
     */
    @SuppressWarnings("rawtypes")
    public Vector listFiles(String directory) throws JSchException, SftpException {
        ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");
        channelSftp.connect();
        return channelSftp.ls(directory);
    }

}

4、文件和jar包上传至目标Linux服务器 /app/flume/conf文件夹下

文件夹可以随意放,我这里放在了/app/flume/conf文件下,若没有生成文件夹。

jar包是自定义Sink打包的jar包

执行命令:

nohup flume-ng agent --conf /app/flume/conf --conf-file /app/flume/conf/agent.conf -C /app/flume/conf/AsyncHbaseLogEventSerializer.jar --name a1 -Dflume.root.logger=INFO,console >/dev/null 2>log &

我这里使用了nohup和&命令,目的是让该进程在后台作为一个守护进程运行,因为java中使用Jsch执行command的时候,没办法获取flume执行后的状态,所以使用了nohup,能够让java接口能够执行下去。

最终结果入库Hbase数据库:


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇【Flume二】HDFS sink细说 下一篇大数据求索(6): 使用Flume进行数..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目