设为首页 加入收藏

TOP

flume相关简介和部署以及自动重启
2019-02-19 13:53:59 】 浏览:178
Tags:flume 相关 简介 部署 以及 自动 重启
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lzlnd/article/details/85059544

简介:

核心是agent

agent的核心:source,channel,sink,一台机器运行跟一个agent

source:处理各种类型、各种格式的日志数据,将数据包装秤event

channel:缓存数据event,对采集到的数据进行简单的缓存

sink:将数据发送到指定的目的地的组件

运行机制

source收集到数据,发送给channel,channel缓存数据,sink发送到指定的目的地。发送成功之后,channel会删除缓存中的数据

这样的运行机制保证了flume的安全和可靠。

end-to-end:收集到数据先保存,数据传输成功后再删除,如果不成功继续发送

Store on failure:当数据接收方崩溃的时候,会保存数据,等接收方恢复后继续发送

Besteffort:数据发送到接收方后不会确认

拦截器:

在source和channel中间,过滤指定的event

广义用法:

支持多级flume的agent

支持扇入(fan-in)和扇出(fan-out)。扇入就是source可以接收多个输入,扇出就是sink可以将数据输出到多个目的地

应用

日志的采集:提供了大量内置的source,channel,sink,不同类型的可以自由组合。

1.整体配置一个agent:

2.详细描述每一个source,channel,sink

3.通过channel连接source和sink

启动agent的shell操作

flume-ng agent -n a1 -c ../conf -f ../conf/example.file -Dflume.root.logger=DEBUG,console

-n:指定agent的名称

-c:指定flume配置文件的目录

-Dflume.root.logger=DEBUG,CONSOLE设置日志等级

发送数据

监听:

NetCat Source指定的网络端口

Spooling Directory Source监听指定目录

Exec Source 监听指定命令 ,在flume不运行或者指令出错的时候容易出现日志丢失

Avro Source监听指定的avro端口

--------------------------------部署flume1.7环境-------------------------------

1.需要安装jdk1.7以上版本。

官网下载http://www.oracle.com/technetwork/java/javase/downloads/index.html安装包

配置环境变量

export JAVA_HOME=/usr/local/java/jdk1.8.0_181

export JRE_HOME=${JAVA_HOME}/jre

export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

export PATH=${JAVA_HOME}/bin:$PATH

然后用java -version测试是否安装成功

2.安装flume

http://archive.apache.org/dist/flume/下载安装包apache-flume-1.7.0-bin.tar

配置环境变量

export FLUME_HOME=/usr/local/flume

export FLUME_CONF_DIR=/usr/local/flume/conf

export PATH=$PATH:$FLUME_HOME/bin

复制cp flume-env.sh.template flume-env.sh

配置conf中的flume-env.sh中的JAVA_HOME

JAVA_HOME=/usr/local/java/jdk1.8.0_181

编辑conf下的配置文件

flume-conf-src-mm7mt.properties

在bin下配置flume-ng

#JAVA_OPTS="-Xmx2048m -Xdebug -Xrunjdwp:transport=dt_socket,address=8787,server=y,suspend=y"

然后用flume-ng version检验是否安装成功

启动flume

flume-ngagent-cconf-f./conf/flume-conf-oedipus-agent.properties-noedipus-Dflume.root.logger=INFO&

如果想要启动日志信息入flume的log里面则启动命令为

flume-ngagent-cconf-f./conf/flume-conf-oedipus-agent.properties-noedipus &

启动遇到的错

1.channel closed

删除file通道的checkdatepoit 和dir目录下的东西

删除/home/用户/.flume/data下的log日志

自动执行的脚本和命令

1.在flume的bin下创建一个listen_process.sh文件

2.在flume的conf下创建listen_process.sh中需要的flume_config文件

3.使用crontab -e命令。往打开的文件中写入。意思是定时没两分钟执行一次

*/2 * * * * cd /usr/local/flume/bin; sh listen_process.sh autostart

注意:crontab -e写入内容的时候一定要用启动flume的那个用户。

listen_process.sh内容为:

#!/bin/sh
source /etc/profile

#set -x
logtime=`date +%Y-%m-%d`
flume_config=/usr/local/flume/conf/flume_config
flume_path=/usr/local/flume/
flume_log=/usr/local/flume/logs/flume_status.log

start_server() {
    time=`date +%Y-%m-%d:%X`
    if [ -z "$object" ]; then
        if [ -e ${flume_config} ]; then
            echo "config file is exsit"
            for line in `cat ${flume_config}`
            do
                echo "line is $line"
                num=`awk 'BEGIN {print split("'$line'",config_array,",")}'`
                if [ $num -eq 6 ]; then
                    arr=(${line//,/ }) 
                    agent_name=${arr[0]}
                    addr=${arr[1]}
                    if_tail=${arr[2]}
                    log_name=${arr[3]}
                    if_head=${arr[4]}
                    sum=`ps -aux | grep "flume" | grep $agent_name | grep $addr | grep -v grep | wc -l`
                    if [ $sum -ge 1 ]; then
                        echo "flume server $agent_name is running"
                        if [ $if_tail -eq 1 ]; then
                            sum_tail=`ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | wc -l`
                            if [ $sum_tail -ge 1 ]; then
                                echo "client agent working right"
                            else
                                for abline in `ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | awk '{print $2}'`
				do
                                    kill -9 $abline
				done
                                for feline in `ps -aux | grep "flume" | grep $addr | grep -v grep | awk '{print $2}'`
				do
                                    kill -9 $feline
				done
				echo "$time  $agent_name client tail error, restart client agent" >> $flume_log
 				cd $flume_path
                    		nohup flume-ng agent -c conf -f ./conf/$addr -n $agent_name &
                            fi
                        fi
                    else
			echo "$time  $agent_name server no run, start server" >> $flume_log
 			cd $flume_path
                    	nohup flume-ng agent -c conf -f ./conf/$addr -n $agent_name &
                    fi
                fi
            done
        fi
    else
        echo "object argument is $object"
        con=`cat ${flume_config} | grep $object"," | grep -v grep`
        if [ -z "$con" ]; then
            echo "con : object argument is wrong!"
        else
            num=`awk 'BEGIN {print split("'$con'",config_array,",")}'`
            if [ $num -eq 6 ]; then
                echo "client config num is right"
                arr=(${con//,/ }) 
                agent_name=${arr[0]}
                addr=${arr[1]}
                if_tail=${arr[2]}
                log_name=${arr[3]}
                if_head=${arr[4]}
            fi
            sum=`ps -aux | grep "flume" | grep $object | grep $addr | grep -v grep | wc -l`
            if [ $sum -ge 1 ]; then
                echo "server $object is running"
                if [ $if_tail -eq 1 ]; then
                    sum_tail=`ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | wc -l`
                    if [ $sum_tail -ge 1 ]; then
                        echo "client agent working right"
                    else
                        for abline in `ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | awk '{print $2}'`
			do
                            kill -9 $abline
			done
                        for feline in `ps -aux | grep "flume" | grep $addr | grep $object | grep -v grep | awk '{print $2}'`
			do
                            kill -9 $feline
			done
 			cd $flume_path
                    	nohup flume-ng agent -c conf -f ./conf/$addr -n $agent_name &
                    fi
                fi
            else
 		cd $flume_path
                nohup flume-ng agent -c conf -f ./conf/$addr -n $agent_name &
            fi
        fi
    fi
}



stop_server() {
    
    if [ -z "$object" ]; then
        echo "object argument is empty!"
        for line in `ps -aux | grep "flume" | grep -v grep | awk '{print $2}'`
        do
            echo $line
            kill -9 $line
        done 
	if [ -e ${flume_config} ]; then
            echo "config file is exsit"
            for line in `cat ${flume_config}`
            do
                echo "line is $line"
                num=`awk 'BEGIN {print split("'$line'",config_array,",")}'`
                if [ $num -eq 6 ]; then
                    arr=(${line//,/ })
                    agent_name=${arr[0]}
                    addr=${arr[1]}
                    if_tail=${arr[2]}
                    log_name=${arr[3]}
                    if_head=${arr[4]}
                fi
		if [ $if_tail -eq 1 ]; then
                    for abline in `ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | awk '{print $2}'`
                    do
                        kill -9 $abline
                    done
                fi
	    done
	fi

    else
        con=`cat ${flume_config} | grep $object"," | grep -v grep`
        if [ -z "$con" ]; then
            echo "con : object argument is wrong!"
        else
            if [ $num -eq 6 ]; then
                arr=(${con//,/ }) 
                agent_name=${arr[0]}
                addr=${arr[1]}
                if_tail=${arr[2]}
                log_name=${arr[3]}
            fi
            for line in `ps -aux | grep "flume" | grep $object | grep $addr | grep -v grep | awk '{print $2}'`
            do
                echo $line
                kill -9 $line
                if [ $if_tail -eq 1 ]; then
                    for abline in `ps -aux | grep "tail" | grep $log_name$logtime.cvs | grep -v grep | awk '{print $2}'`
		    do
                        kill -9 $abline
		    done
                fi
            done 
        fi
    fi
}


auto_start() {
    start_server
}



CMD=$1

object=$2


case "$CMD" in
    start)
        echo "start_server-or-client--"
        start_server
        exit 0
        ;;
    stop)
        echo "stop_server-or-client--"
        stop_server
        exit 0
        ;;
    autostart)
        echo "auto_start_server----"
        auto_start
        exit 0
        ;;
esac

flume_config内容为:

mm7mt,flume-conf-src-mm7mt.properties,0,null,0,42400

mm7mtreport,flume-conf-src-mm7mtreport.properties,0,null,0,42401

cmppmt,flume-conf-src-cmppmt.properties,0,null,0,42402

cmppmtreport,flume-conf-src-cmppmtreport.properties,0,null,0,42403

cmppmo,flume-conf-src-cmppmo.properties,0,null,0,42404

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇flume   三大核心组件

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目