设为首页 加入收藏

TOP

flume将kafka中topic数据导入hive中
2018-12-06 18:11:45 】 浏览:7
Tags:flume kafka topic 数据 导入 hive
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_37786447/article/details/80608531

一、首先更加数据的表结构在hive中进行表的创建。

 create table AREA1(unid string,area_punid string,area_no string,area_name string,area_dept_unid string,area_longitude string,area_latitude string,area_sortid string,create_time string) 
clustered by (unid) into 2 buckets 
stored as orc;

注意点: clustered by () into 2 buckets 和 stored as orc 要加上不然会报错,我第一次弄的时候没加,也是在网上找到这样的解决方法。

二、创建完表之后,就可以开始编写flume的配置文件了,这是关键。

在flume的conf目录下创建一个配置文件 叫 kafkatohive.conf。然后进行下面的配置

flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = 192.168.72.129:2181,192.168.72.130:2181,192.168.72.131:2181
flumeagent1.sources.source_from_kafka.topic = oracle-kafka
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000
# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://192.168.72.129:9083
flumeagent1.sinks.hive_sink.hive.database = test
flumeagent1.sinks.hive_sink.hive.table = AREA1
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = unid,area_punid,area_no,area_name,area_dept_unid,area_longitude,area_latitude,area_sortid,create_time
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel

三、执行flume agent命令如下:

[root@hserver1 bin]# flume-ng agent -n flumeagent1 -f ../conf/flumetohive.conf


此时就可以把数据导入到hive中了,为了能实时的导入数据到hive中,可以创建一个agent从其他数据源导数据到kafka其主题中,这样只要设置好deley,应该就能实现准实时的数据流了吧。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇Windows64环境下   使用Flum..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }