设为首页 加入收藏

TOP

flume Interceptor
2018-12-14 02:10:21 】 浏览:144
Tags:flume Interceptor

拦截器的介绍与使用

Flume Interceptors

Interceptor类型

说明

Timestamp Interceptor

在event的header中添加一个key叫:timestamp,value为当前的时间戳。

Host Interceptor

在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。

Static Interceptor

可以在event的header中添加自定义的key和value。

UUID Interceptor

用于在每个events header中生成一个UUID字符串,生成的UUID可以在sink中读取并使用

Morphline Interceptor

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
基本相当于一个ETL 工具

Search and Replace Interceptor

Regex Filtering Interceptor

通过正则来清洗或包含匹配的events。

Regex Extractor Interceptor

通过正则表达式来在header中添加指定的key,value则为正则匹配的部分。


转载:Flume中的拦截器(Interceptor)介绍与使用https://blog.csdn.net/jek123456/article/details/65633958


以实际配置文件进行说明,自定义了一个拦截器,消费 Kafka topic1 的数据,生产到topic2 中。

#flume 1.8 version
agentzk.sources = src-1
agentzk.channels = ch-1
agentzk.sinks = sink-1

agentzk.sources.src-1.type = org.apache.flume.source.kafka.KafkaSource
agentzk.sources.src-1.batchSize = 500
agentzk.sources.src-1.batchDurationMillis = 200
agentzk.sources.src-1.kafka.topics = flumeSource1
agentzk.sources.src-1.kafka.bootstrap.servers = 10.120.51.240:21005,10.120.51.143:21005
agentzk.sources.src-1.kafka.consumer.group.id = flume
agentzk.sources.src-1.kafka.consumer.timeout.ms = 100
agentzk.sources.src-1.kafka.enable.auto.commit = false
agentzk.sources.src-1.kafka.consumer.auto.offset.reset = latest
agentzk.sources.src-1.kafka.security.protocol  = PLAINTEXT
agentzk.sources.src-1.channels = ch-1

#多个拦截器按顺序执行,拦截器1用于解决kafka topic 覆盖问题,拦截器2为自定义的
agentzk.sources.src-1.interceptors=i1 i2
agentzk.sources.src-1.interceptors.i1.type = static
agentzk.sources.src-1.interceptors.i1.key = topic
agentzk.sources.src-1.interceptors.i1.preserveExisting = false
agentzk.sources.src-1.interceptors.i1.value = flumeSink1
agentzk.sources.src-1.interceptors.i2.type=com.xxx.bigdata.interceptor.fields.FieldsInterceptor$Builder  
#fullPair:true 满足所有条件, fullPair:false 满足任一条件
agentzk.sources.src-1.interceptors.i2.fullPair=false
#event 分割后,条件i:event[1] == 016; event[1] == 0016
agentzk.sources.src-1.interceptors.i2.index=1|1
agentzk.sources.src-1.interceptors.i2.fields=016|0016
agentzk.sources.src-1.interceptors.i2.splitChar=\\|

agentzk.channels.ch-1.type = memory
agentzk.channels.ch-1.capacity = 10000
agentzk.channels.ch-1.transactionCapacity = 1000
#agentzk.channels.ch-1.byteCapacityBufferPercentage = 50
#agentzk.channels.ch-1.byteCapacity = 100000

agentzk.sinks.sink-1.type = org.apache.flume.sink.kafka.KafkaSink
agentzk.sinks.sink-1.kafka.topic = flumeSink1
agentzk.sinks.sink-1.kafka.producer.acks = 0
agentzk.sinks.sink-1.kafka.bootstrap.servers = 10.120.51.240:21005,10.120.51.143:21005
agentzk.sinks.sink-1.kafka.security.protocol = PLAINTEXT
agentzk.sinks.sink-1.flumeBatchSize = 500
agentzk.sinks.sink-1.channel = ch-1


拦截器1的作用:解决Flume中同时使用Kafka Source和Kafka Sink的Topic覆盖问题。

参见:https://blog.csdn.net/u010170616/article/details/80844686


自定义拦截器,可以参照flume源码里的原生拦截器来写。此处具体逻辑处理涉及到实际业务含义,故不进行赘述。

package com.xxx.bigdata.interceptor.fields;

import java.util.List;

import org.apache.commons.io.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

/**
 * 
 *
 * Sample config:<p>
* 根据 | 分割event,判断 array[index] == fields
* <code>
*   agent.sources.r1.interceptors = i1<p>
*   agent.sources.r1.interceptors.i1.fullPair = true(false)<p>
*   agent.sources.r1.interceptors.i1.index = 0|2<p>
*   agent.sources.r1.interceptors.i1.fields = 016|1602<p>
* </code>
*
*/
public class FieldsInterceptor implements Interceptor{

	private static final Logger logger = LoggerFactory.getLogger(FieldsInterceptor.class);

//	满足所有条件 or 满足任一条件 
	private final boolean fullPair;
	private final String index;
	private final String fields;
	private final String splitChar;
//	满足条件的是否被过滤
//	private final boolean excludeEvents;

	/**
	 * Only {@link FieldsInterceptor.Builder} can build me
	 */
	private FieldsInterceptor(boolean fullPair, String index, String fields, String splitChar) {
		this.fullPair = fullPair;
		this.index = index;
		this.fields = fields;
		this.splitChar = splitChar;
	}
	
	@Override
	public void close() {
	}

	@Override
	public void initialize() {
	}

	@Override
	public Event intercept(Event event) 
	{
		if (index == null || index == "" || fields == null || fields == "")
		{
			return null;
		}
		
		String[] indexArray = index.split(splitChar);
		String[] fieldArray = fields.split(splitChar);

		String body = new String(event.getBody(), Charsets.UTF_8);
		
		String[] bodyArray = body.split(splitChar);
		if (bodyArray.length <= 1)
		{
			logger.info("error body : " + body);
			return null;
		}
		
		// 满足所有条件
		if (fullPair) 
		{
			for (int i = 0; i < indexArray.length; i++) 
			{
				if (!bodyArray[Integer.valueOf(indexArray[i])].equals(fieldArray[i]))
				{
					return null;
				}
			}
			return event;
		}
		else 
		{
			for (int i = 0; i < indexArray.length; i++)
			{
				if (bodyArray[Integer.valueOf(indexArray[i])].equals(fieldArray[i]))
				{
					return event;
				}
			}
return null;
}
	}

	@Override
	public List<Event> intercept(List<Event> events) 
	{
	List<Event> out = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events)
        {
            Event outEvent = intercept(event);
            if (outEvent != null)
            {
            	out.add(outEvent);
            }
        }
        return out;
	}

	public static class Builder implements Interceptor.Builder
	{
		private boolean fullPair;
		private String index;
		private String fields;
		private String splitChar;
		
		@Override
		public void configure(Context context) 
		{
			fullPair = context.getBoolean(Constants.FULLPAIR, Constants.DEFAULT_FULLPAIR);
			index = context.getString(Constants.INDEX);
			fields = context.getString(Constants.FIELDS);
			splitChar = context.getString(Constants.SPLIT_CHAR, Constants.DEFAULT_SPLIT_CHAR);
		}

		@Override
		public Interceptor build() 
		{
			return new FieldsInterceptor(fullPair, index, fields, splitChar);
		}
		
	}
	
	public static class Constants 
	{
		public static final String SPLIT_CHAR = "splitChar";
		public static final String DEFAULT_SPLIT_CHAR = "\\|";
		
		public static final String FULLPAIR = "fullPair";
		public static final boolean DEFAULT_FULLPAIR = false;
		
		public static final String INDEX = "index";
		public static final String FIELDS = "fields";
	}
	
}


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇大数据采集工具,除了Flume,还有.. 下一篇nginx ---->flume ----->ka..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目