设为首页 加入收藏

TOP

大数据--》》flume下
2018-11-13 16:14:22 】 浏览:59
Tags:数据 flume

flume 的搭建:
1.jdk的安装


11.解压jdk:
tar -zxvf jdk-8u102-linux-x64.tar.gz -C ~/bigdata/install/
12.修改环境变量:
vim ~/.bash_profile
JAVA_HOME=/home/flume/bigdata/install/jdk1.8.0_102
export PATH=$JAVA_HOME/bin:$PATH
生效: source ~/.bash_profile


2.flume 的搭建


11.解压
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C ~/bigdata/install/
12.修改环境变量
vim ~/.bash_profile
FLUME_HOME=/home/flume/bigdata/install/apache-flume-1.7.0-bin
export PATH=$FLUME_HOME/bin:$PATH
source ~/.bash_profile
13.修改配置文件
cd conf
cp flume-env.ps1.template flume.env.sh
export JAVA_HOME=/home/soup/bigdata/install/jdk1.8.0_102
flume官网:
http://flume.apache.org


3.flume 测试小案例


11.flume网络端口netcat
文件名:netcat.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


a1.sinks.k1.type = logger


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令:
flume-ng agent --conf conf --conf-file conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console

传输: Event: { headers:{} body: 7A 78 79 33 0D zxy3. }


12.flume对接文本
文件名:netcat.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/bigdata/test/a.txt
a1.sources.r1.shell = /bin/bash -c


a1.sinks.k1.type = logger


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


Event: { headers:{} body: 34 35 36 456 }

13.flume对接文件夹
文件名:netcat.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/bigdata/test

a1.sinks.k1.type = logger


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

--------------------目的地变了-------------------
14.flume对接hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/input/201809


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.flume 拦截器


11.时间
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/input/201809


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Event: { headers:{} body: 7A 78 79 33 0D zxy3. }
Event: { headers:{timestamp=1536222302872} body: 7A 78 79 31 0D zxy1. }

12.host

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

a1.sinks.k1.type = logger
#a1.sinks.k1.type = hdfs

#a1.sinks.k1.hdfs.path=/input/test1/%y-%m-%d
#a1.sinks.k1.hdfs.fileType=DataStream

a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Event: { headers:{host=192.168.126.128} body: 7A 78 79 32 0D zxy2. }


13.基于内容拦截:下去自己了解吧

5.自定义拦截器 :代码开发


工具:eclipse/myeclipse/idea
pom.xml
flume依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>

package com.itstar;

import com.google.common.base.Charsets;
import com.sun.javafx.scene.control.skin.VirtualFlow;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* @AUTHOR COCO
* 2018/9/6
**/
public class flumeInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
//error:123
//info:
String body=new String(event.getBody(), Charsets.UTF_8);
Pattern p=Pattern.compile("error:");
Matcher r=p.matcher(body);
String str="";
if(r.find()){
str=body;
}else {
str="no match";
}
event.setBody(str.getBytes());
return event;
}

@Override
public List<Event> intercept(List<Event> list) {
List<Event> lis=new ArrayList<Event>();
for (Event e:list){
Event event=intercept(e);
// if(event!=null) {
lis.add(event);
// }
}
return lis;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new flumeInterceptor();
}

@Override
public void configure(Context context) {

}
}

}

打包命令
mvn clean package

上传flume/lib

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.itstar.flumeInterceptor$Builder

a1.sinks.k1.type = logger


a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


启动:
flume-ng agent --conf conf --conf-file conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console

测试:
telnet localhost 44444

1111 ------》no match
error:123----》error:123

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume日志报错,同时没能正常收集.. 下一篇flume接收kafka source落地本地

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目