设为首页 加入收藏

TOP

vertica-->kafka-->mongodb数据流
2018-12-05 18:30:01 】 浏览:8
Tags:vertica--> kafka--> mongodb 数据流
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010522235/article/details/52121726

此连接件为confluent修改过的jar包,这里不提供下载(涉及公司机密!!)

Kafka Connnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。


由于这里是kafka到mongo,所以这里使用Sink Connector

首先安装kafka
1,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka拓扑结构:


Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
安装zookeeper
配置环境变量,/etc/profile添加以下内容:
[root@dba06 kafka]# export ZOOKEEPER_HOME=/opt/app/kafka/zookeeper-3.4.6
[root@dba06 kafka]# export PATH=$PATH:$ZOOKEEPER_HOME/bin
[root@dba06 kafka]# cd zookeeper-3.4.6/conf/
[root@dba06 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@dba06 conf]# cat zoo_sample.cfg | grep -v '#' > zoo.cfg
[root@dba06 conf]# cat zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
[root@dba06 conf]# ../bin/zkServer.sh start
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
查看是否启动
[root@dba06 conf]# ../bin/zkServer.sh status
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
Server启动之后, 就可以启动client连接server了, 执行脚本:
[root@dba06 conf]# ../bin/zkCli.sh -server localhost:2181


安装kafka
安装kafka server之前需要单独安装zookeeper server,而且需要修改config/server.properties里面的IP信息
[root@dba06 kafka]# cd kafka_2.11-0.9.0.0
[root@dba06 kafka_2.11-0.9.0.0]# cd config/
[root@dba06 config]# vi server.properties
zookeeper.connect=localhost:2181
这里需要修改默认zookeeper.properties配置
[root@dba06 config]# vi zookeeper.properties
保持默认设置
先启动zookeeper,启动前先kill掉之前的zkServer.sh启动的zookeeper服务
[root@dba06 bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &
启动kafka服务
[root@dba06 bin]# ./kafka-server-start.sh ../config/server.properties &
查看kafka进程是否启动:
[root@dba06 bin]# find /. -name 'java'
重新安装Java
[root@dba06 bin]# yum install java-1.7.0
[root@dba06 bin]# yum install java-1.7.0-openjdk java-1.7.0-openjdk-devel
配置环境变量
[root@dba06 bin]# vi ~/.bash_profile
export PATH
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64
[root@dba06 bin]# source ~/.bash_profile
[root@dba06 bin]# jps
835 Kafka
803 QuorumPeerMain
1679 Jps
咱们先看看vertica,推数据到kafka
dbadmin=> select version();
version
------------------------------------
Vertica Analytic Database v7.2.3-0
(1 row)


dbadmin=> \q
这里采用7.2.3版本
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"EUTIME"||'"}' USING PARAMETERS brokers='172.16.20.207:9092',topic='fafatest') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
说明,这里无需在对方创建topic,只需对方有zookeeper进程和kafka进程即可。
报错详解:
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:187], error code: 0, message: Error reading topic metadata for topic fafa01: Broker: Leader not available
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER () FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:323], error code: 0, message: Exception while processing partition row: [0] : [Fatal kafka error: T2:9092/0: Failed to resolve 'T2:9092': Temporary failure in name resolution]
ERROR 5861问题是因为在双方的/etc/hosts下面要配置互相的ip,例如此时:
172.16.57.55 T2
172.16.57.208 bd-dev-vertica2-208 localhost
在kafka上消费测试一下
[root@dba06 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafa01
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-05-01"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
....................
....................
可以看到json格式数据已经消费。


下面尝试kafka到mongo连接
下载kafka到mongo连接件,这里下载的连接件需要再使用confluect上面的kafka才能使用
http://www.confluent.io/product/connectors
这里使用改写的单向的版本,使用普通的kafka
[root@dba06 libs]# rz
z waiting to receive.**B0100000023be50

connect-mongodb-1.0-jar-with-dependencies.jar
将jar报放在lib下面
将配置文件放在config下面
[root@dba06 ~]# cd /opt/app/kafka/kafka_2.11-0.9.0.0/config/
connect-mongo160-sink.properties
connect-standalone.properties
配置kafka到mongo连接件
[root@T2 config]# vi connect-mongo160-sink.properties


name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=1
host=172.16.57.160
port=12001
bulk.size=100
mongodb.database=backbone
mongodb.authorized.database=backbone
skip.message.on.error=true
mongodb.collections=test
mongodb.user=backbone
mongodb.password=Password$1
topics=fafa01


启动服务,连接服务
这里先不管格式将connect-standalone.properties
[root@T2 bin]# vi /opt/app/software/kafka_2.11-0.10.0.0/config/connect-standalone.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
暂时关闭格式控制


[root@T2 bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-mongo160-sink.properties




查看mongo端
mongos> db.test.find()
{ "_id" : NumberLong(2), "message" : "great", "EUTIME" : "2015-10-28" }
{ "_id" : NumberLong(3), "haoshibu" : "4", "EUTIME" : "2015-10-29" }
{ "_id" : NumberLong(4), "haoshibu" : "9", "EUTIME" : "2015-10-31" }
{ "_id" : NumberLong(5), "haoshibu" : "6", "EUTIME" : "2015-11-12" }
{ "_id" : NumberLong(6), "haoshibu" : "10", "EUTIME" : "2015-11-14" }
{ "_id" : NumberLong(7), "haoshibu" : "2", "EUTIME" : "2015-11-17" }
{ "_id" : NumberLong(8), "haoshibu" : "3", "EUTIME" : "2015-11-18" }
{ "_id" : NumberLong(9), "haoshibu" : "1", "EUTIME" : "2015-11-19" }
{ "_id" : NumberLong(10), "haoshibu" : "7", "EUTIME" : "2015-11-24" }
{ "_id" : NumberLong(11), "haoshibu" : "8", "EUTIME" : "2015-11-27" }
{ "_id" : NumberLong(12), "haoshibu" : "5", "EUTIME" : "2015-12-03" }
{ "_id" : NumberLong(13), "haoshibu" : "4", "EUTIME" : "2015-12-09" }
{ "_id" : NumberLong(14), "haoshibu" : "9", "EUTIME" : "2015-12-10" }
{ "_id" : NumberLong(15), "haoshibu" : "6", "EUTIME" : "2015-12-11" }
{ "_id" : NumberLong(16), "haoshibu" : "10", "EUTIME" : "2015-12-18" }
{ "_id" : NumberLong(17), "haoshibu" : "2", "EUTIME" : "2015-12-20" }
{ "_id" : NumberLong(18), "haoshibu" : "3", "EUTIME" : "2015-12-26" }
{ "_id" : NumberLong(19), "haoshibu" : "1", "EUTIME" : "2015-12-27" }
{ "_id" : NumberLong(20), "haoshibu" : "7", "EUTIME" : "2016-01-01" }
{ "_id" : NumberLong(21), "haoshibu" : "8", "EUTIME" : "2016-01-03" }
成功!



编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka->Mongodb->Es 下一篇kafka   Partition分发策略

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }