目录
- 日志数据
- 创建并编写2个Flume配置文件:kafka_morphline_in.conf、kafka_morphline_out.conf
- 在Flume配置文件存放的同目录下创建并编写Morphline配置文件:morphline.conf
- 启动Zookeeper
- 启动Kafka
- 以kafka_morphline_out.conf配置文件agent2启动Flume
- 以kafka_morphline_in.conf配置文件agnet1启动Flume
- 在HDFS中检查日志文件是否已经保存到HDFS。
- 启动Hive并用命令建以时间分区的表,并导入HDFS中的数据。
1、日志数据
下载地址:
https://stusspueducn-my.sharepoint.com/:u:/g/personal/20154847880_stu_sspu_edu_cn/EcflB0XMZYJNjFq9yz38y48BIsSu-S7tii1t_wd7iRcYVAe=3E0bid
下载后,将该日志文件存放到 /flume/morphline文件夹下
schema文件:
存放到 /flume/schema文件夹下,以便Morphline调用
{
"type" : "record",
"name" : "Line",
"fields" : [
{
"name" : "date",
"type" : "string"
},{
"name" : "time",
"type" : "string"
},{
"name" : "stage",
"type" : "string"
},{
"name" : "stage_status",
"type" : "string"
},{
"name" : "soft_name",
"type" : "string"
},{
"name" : "platform_type",
"type" : "string"
},{
"name" : "version",
"type" : "string"
}
]
}
2、创建并编写2个Flume配置文件:kafka_morphline_in.conf、kafka_morphline_out.conf
kafka_morphline_in.conf :
agent1.sources = source_spool
agent1.channels = channel_kafka
agent1.sources.source_spool.type = spooldir
agent1.sources.source_spool.spoolDir = /flume/morphline
agent1.sources.source_spool.interceptors = i1
agent1.sources.source_spool.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent1.sources.source_spool.interceptors.i1.morphlineFile=/etc/flume-ng/conf/morphline.conf
agent1.sources.source_spool.interceptors.i1.morphlineId=morphline1
agent1.sources.source_spool.channels = channel_kafka
agent1.channels.channel_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel_kafka.brokerList = localhost:9092
agent1.channels.channel_kafka.kafka.bootstrap.servers = localhost:9092
agent1.channels.channel_kafka.zookeeperConnect = localhost:2181
agent1.channels.channel_kafka.kafka.topic = access
agent1.channels.channel_kafka.kafka.consumer.group.id = flume-consumer
agent1.channels.channel_kafka.capacity = 1000
agent1.channels.channel_kafka.transactionCapacoty = 10000
kafka_morphline_out.conf :
agent2.channels = channel_kafka
agent2.sinks = sink_hdfs
agent2.sinks.sink_hdfs.type = hdfs
agent2.sinks.sink_hdfs.filesuffix = .avro
agent2.sinks.sink_hdfs.hdfs.path = /loudacre/morphline/%{date}
agent2.sinks.sink_hdfs.hdfs.rollSize = 524288
agent2.sinks.sink_hdfs.hdfs.rollCount = 0
agent2.sinks.sink_hdfs.hdfs.rollInterval = 0
agent2.sinks.sink_hdfs.hdfs.threadsPoolSize = 30
agent2.sinks.sink_hdfs.hdfs.fileType=DataStream
agent2.sinks.sink_hdfs.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder
agent2.sniks.sink_hdfs.serializer.compressionCodec = snappy
agent2.sinks.sink_hdfs.channel = channel_kafka
agent2.channels.channel_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
agent2.channels.channel_kafka.brokerList = localhost:9092
agent2.channels.channel_kafka.kafka.bootstrap.servers = localhost:9092
agent2.channels.channel_kafka.zookeeperConnect = localhost:2181
agent2.channels.channel_kafka.kafka.topic = access
agent2.channels.channel_kafka.kafka.consumer.group.id = flume-consumer
agent2.channels.channel_kafka.capacity = 1000
agent2.channels.channel_kafka.transactionCapacoty = 10000
3、在Flume配置文件存放的同目录下创建并编写Morphline配置文件:morphline.conf
morphline.conf:
morphlines: [
{
id: morphline1
importCommands: ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
readLine {
charset: UTF-8
}
}
{
split {
inputField: message
outputFields: [date, time, soft_info, version]
separator: " "
isRegex: false
addEmptyStrings: false
trim: true
}
}
{
split {
inputField: soft_info
outputFields: [stage, stage_status, name_platform]
separator: ","
}
}
{
split {
inputField: name_platform
outputFields: [soft_name, platform_type]
separator : ":"
}
}
{
addValues {
flume.avro.schema.url: "file:/flume/schema/schema.avsc"
}
}
{
toAvro {
schemaFile : /flume/schema/schema.avsc
}
}
{
writeAvroToByteArray {
format : containerlessBinary
#codec : snappy
}
}
]
}
]
4、启动Zookeeper
我这里的Zookeeper是安装在 /usr/lib文件夹下面的。
cd /usr/lib/zookeeper/
./bin/zkServer.sh conf/zoo.cfg
5、启动Kafka
同样Kafka也是安装在 /usr/lib文件夹下面的。
$ cd /usr/lib/kafka
$ ./bin/kafka-server-start.sh config/server.properties
server.properties是配置文件
6、以kafka_morphline_out.conf配置文件agent2启动Flume
我这里的flume配置文件目录为 /etc/flume-ng/conf
$ flume-ng agent --conf /etc/flume-ng/conf --conf-file /etc/flume-ng/conf/kafka_morphline_out.conf --name agent2 -Dflume.root.logger=INFO,console
7、以kafka_morphline_in.conf配置文件agnet1启动Flume
$ flume-ng agent --conf /etc/flume-ng/conf --conf-file /etc/flume-ng/conf/kafka_morphline_in.conf --name agent1 -Dflume.root.logger=INFO,console
8、在HDFS中检查日志文件是否已经保存到HDFS
9、启动Hive并用命令建以时间分区的表,并导入HDFS中的数据。
将之前放到 /flume/schema/schema.avsc文件复制一份到HDFS中存放,以便下一步使用
$ cd /flume/schema/
$ hdfs dfs -mkdir -p /loudacre/avro
$ hdfs dfs -put schema.avsc /loudacre/avro/schema.avsc
启动Hive
$ hive
以date_part的String字段分区创建分区表,schema结构使用HDFS中的 /loudacre/avro/schema.avsc文件
CREATE EXTERNAL TABLE mor
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/loudacre/morphline/'
TBLPROPERTIES ( 'avro.schema.url'='/loudacre/avro/schema.avsc' );
将之前放入HDFS中的数据导入到hive中:
alter table mor add partition (date_part='20180804') location '/loudacre/morphline/2018-08-04';
alter table mor add partition (date_part='20180805') location '/loudacre/morphline/2018-08-05';
alter table mor add partition (date_part='20180806') location '/loudacre/morphline/2018-08-06';
查询导入的数据进行测试:
select * from mor where date_part='20180805';