设为首页 加入收藏

TOP

Kafka、Morphline、Hive的配合使用过滤并格式化日志数据
2018-11-13 16:01:28 】 浏览:31
Tags:Kafka Morphline Hive 配合 使用 过滤 格式 日志 数据

目录

  1. 日志数据
  2. 创建并编写2个Flume配置文件:kafka_morphline_in.conf、kafka_morphline_out.conf
  3. 在Flume配置文件存放的同目录下创建并编写Morphline配置文件:morphline.conf
  4. 启动Zookeeper
  5. 启动Kafka
  6. 以kafka_morphline_out.conf配置文件agent2启动Flume
  7. 以kafka_morphline_in.conf配置文件agnet1启动Flume
  8. 在HDFS中检查日志文件是否已经保存到HDFS。
  9. 启动Hive并用命令建以时间分区的表,并导入HDFS中的数据。

1、日志数据

下载地址:

https://stusspueducn-my.sharepoint.com/:u:/g/personal/20154847880_stu_sspu_edu_cn/EcflB0XMZYJNjFq9yz38y48BIsSu-S7tii1t_wd7iRcYVAe=3E0bid

下载后,将该日志文件存放到 /flume/morphline文件夹下

schema文件:

存放到 /flume/schema文件夹下,以便Morphline调用

{
	"type" : "record",
	"name" : "Line",
	"fields" : [
	{
		"name" : "date",
		"type" : "string"
	},{
		"name" : "time",
		"type" : "string"
	},{
		"name" : "stage",
		"type" : "string"
	},{
		"name" : "stage_status",
		"type" : "string"
	},{
		"name" : "soft_name",
		"type" : "string"
	},{
		"name" : "platform_type",
		"type" : "string"
	},{
		"name" : "version",
		"type" : "string"
	}
	]
}

2、创建并编写2个Flume配置文件:kafka_morphline_in.conf、kafka_morphline_out.conf

kafka_morphline_in.conf :

agent1.sources = source_spool
agent1.channels = channel_kafka

agent1.sources.source_spool.type = spooldir
agent1.sources.source_spool.spoolDir = /flume/morphline

agent1.sources.source_spool.interceptors = i1
agent1.sources.source_spool.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent1.sources.source_spool.interceptors.i1.morphlineFile=/etc/flume-ng/conf/morphline.conf
agent1.sources.source_spool.interceptors.i1.morphlineId=morphline1

agent1.sources.source_spool.channels = channel_kafka

agent1.channels.channel_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel_kafka.brokerList = localhost:9092
agent1.channels.channel_kafka.kafka.bootstrap.servers = localhost:9092
agent1.channels.channel_kafka.zookeeperConnect = localhost:2181
agent1.channels.channel_kafka.kafka.topic = access
agent1.channels.channel_kafka.kafka.consumer.group.id = flume-consumer
agent1.channels.channel_kafka.capacity = 1000
agent1.channels.channel_kafka.transactionCapacoty = 10000

kafka_morphline_out.conf :

agent2.channels = channel_kafka
agent2.sinks = sink_hdfs

agent2.sinks.sink_hdfs.type = hdfs
agent2.sinks.sink_hdfs.filesuffix = .avro
agent2.sinks.sink_hdfs.hdfs.path = /loudacre/morphline/%{date}
agent2.sinks.sink_hdfs.hdfs.rollSize = 524288
agent2.sinks.sink_hdfs.hdfs.rollCount = 0
agent2.sinks.sink_hdfs.hdfs.rollInterval = 0
agent2.sinks.sink_hdfs.hdfs.threadsPoolSize = 30
agent2.sinks.sink_hdfs.hdfs.fileType=DataStream
agent2.sinks.sink_hdfs.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder
agent2.sniks.sink_hdfs.serializer.compressionCodec = snappy

agent2.sinks.sink_hdfs.channel = channel_kafka

agent2.channels.channel_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
agent2.channels.channel_kafka.brokerList = localhost:9092
agent2.channels.channel_kafka.kafka.bootstrap.servers = localhost:9092
agent2.channels.channel_kafka.zookeeperConnect = localhost:2181
agent2.channels.channel_kafka.kafka.topic = access
agent2.channels.channel_kafka.kafka.consumer.group.id = flume-consumer
agent2.channels.channel_kafka.capacity = 1000
agent2.channels.channel_kafka.transactionCapacoty = 10000

3、在Flume配置文件存放的同目录下创建并编写Morphline配置文件:morphline.conf

morphline.conf:

morphlines: [
  {
    id: morphline1
    importCommands: ["org.kitesdk.**", "org.apache.solr.**"]
    commands : [
	{
		readLine {
		charset: UTF-8
		}
	}
	{
		split {
			inputField: message
			outputFields: [date, time, soft_info, version]
			separator: " "
			isRegex: false
			addEmptyStrings: false
			trim: true
		}
	}
	{
		split {
			inputField: soft_info
			outputFields: [stage, stage_status, name_platform]
			separator: ","
		}
	}
	{
		split {
			inputField: name_platform
			outputFields: [soft_name, platform_type]
			separator : ":"
		}
	}
	{
		addValues {
			flume.avro.schema.url: "file:/flume/schema/schema.avsc"
		}
	}
	{
		toAvro {
			schemaFile : /flume/schema/schema.avsc
		}	
	}	
	{
		writeAvroToByteArray {
			format : containerlessBinary
			#codec : snappy
		}
	}

    ]
  }
]

4、启动Zookeeper

我这里的Zookeeper是安装在 /usr/lib文件夹下面的。

cd /usr/lib/zookeeper/
./bin/zkServer.sh conf/zoo.cfg

5、启动Kafka

同样Kafka也是安装在 /usr/lib文件夹下面的。

$ cd /usr/lib/kafka
$ ./bin/kafka-server-start.sh config/server.properties

server.properties是配置文件

6、以kafka_morphline_out.conf配置文件agent2启动Flume

我这里的flume配置文件目录为 /etc/flume-ng/conf

$ flume-ng agent --conf /etc/flume-ng/conf --conf-file /etc/flume-ng/conf/kafka_morphline_out.conf --name agent2 -Dflume.root.logger=INFO,console

7、以kafka_morphline_in.conf配置文件agnet1启动Flume

$ flume-ng agent --conf /etc/flume-ng/conf --conf-file /etc/flume-ng/conf/kafka_morphline_in.conf --name agent1 -Dflume.root.logger=INFO,console

8、在HDFS中检查日志文件是否已经保存到HDFS

9、启动Hive并用命令建以时间分区的表,并导入HDFS中的数据。

将之前放到 /flume/schema/schema.avsc文件复制一份到HDFS中存放,以便下一步使用

$ cd /flume/schema/
$ hdfs dfs -mkdir -p /loudacre/avro
$ hdfs dfs -put schema.avsc /loudacre/avro/schema.avsc

启动Hive

$ hive

以date_part的String字段分区创建分区表,schema结构使用HDFS中的 /loudacre/avro/schema.avsc文件

CREATE EXTERNAL TABLE mor
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/loudacre/morphline/'
TBLPROPERTIES ( 'avro.schema.url'='/loudacre/avro/schema.avsc' );

将之前放入HDFS中的数据导入到hive中:

alter table mor add partition (date_part='20180804') location '/loudacre/morphline/2018-08-04';
alter table mor add partition (date_part='20180805') location '/loudacre/morphline/2018-08-05';
alter table mor add partition (date_part='20180806') location '/loudacre/morphline/2018-08-06';

查询导入的数据进行测试:

select * from mor where date_part='20180805';

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用Docker容器模拟分布式flume 下一篇flume系列之Java heap space大小..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目