设为首页 加入收藏

TOP

Flume连接oracle实时推送数据到kafka
2018-11-28 18:08:10 】 浏览:99
Tags:Flume 连接 oracle 实时 推送 数据 kafka
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chongxin1/article/details/76104075

版本号:

RedHat6.5 JDK1.8 flume-1.6.0 kafka_2.11-0.8.2.1

flume安装

RedHat6.5安装单机flume1.6:RedHat6.5安装单机flume1.6

kafka安装

RedHat6.5安装kafka集群 :RedHat6.5安装kafka集群

1、下载flume-ng-sql-source-1.4.3.jar

CSDN下载地址:http://download.csdn.net/detail/chongxin1/9892184

flume-ng-sql-source-1.4.3.jar是flume用于连接数据库的重要支撑jar包。

2、把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下


3、把oracle(此处用的是oracle库)的驱动包放到flume的lib目录下

oracle的jdbc驱动包,放在oracle安装目录下,路径为:D:\app\product\11.2.0\dbhome_1\jdbc\lib

如图:

把ojdbc5.jar放到flume的lib目录下,如图:

4、新建flume-sql.conf

在conf目录新建flume-sql.conf:
  1. touch/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
  2. sudogedit/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
flume-sql.conf输入以下内容:

  1. agentOne.channels=channelOne
  2. agentOne.sources=sourceOne
  3. agentOne.sinks=sinkOne
  4. ###########sqlsource#################
  5. #Foreachoneofthesources,thetypeisdefined
  6. agentOne.sources.sourceOne.type=org.keedio.flume.source.SQLSource
  7. agentOne.sources.sourceOne.hibernate.connection.url=jdbc:oracle:thin:@192.168.168.100:1521/orcl
  8. #HibernateDatabaseconnectionproperties
  9. agentOne.sources.sourceOne.hibernate.connection.user=flume
  10. agentOne.sources.sourceOne.hibernate.connection.password=1234
  11. agentOne.sources.sourceOne.hibernate.connection.autocommit=true
  12. agentOne.sources.sourceOne.hibernate.dialect=org.hibernate.dialect.Oracle10gDialect
  13. agentOne.sources.sourceOne.hibernate.connection.driver_class=oracle.jdbc.driver.OracleDriver
  14. agentOne.sources.sourceOne.run.query.delay=10000
  15. agentOne.sources.sourceOne.status.file.path=/tmp
  16. agentOne.sources.sourceOne.status.file.name=sqlSource.status
  17. #Customquery
  18. agentOne.sources.sourceOne.start.from=0
  19. agentOne.sources.sourceOne.custom.query=selectsysdatefromdual
  20. agentOne.sources.sourceOne.batch.size=1000
  21. agentOne.sources.sourceOne.max.rows=1000
  22. agentOne.sources.sourceOne.hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider
  23. agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
  24. agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
  25. ##############################
  26. agentOne.channels.channelOne.type=memory
  27. agentOne.channels.channelOne.capacity=10000
  28. agentOne.channels.channelOne.transactionCapacity=10000
  29. agentOne.channels.channelOne.byteCapacityBufferPercentage=20
  30. agentOne.channels.channelOne.byteCapacity=800000
  31. agentOne.sinks.sinkOne.type=org.apache.flume.sink.kafka.KafkaSink
  32. agentOne.sinks.sinkOne.topic=test
  33. agentOne.sinks.sinkOne.brokerList=192.168.168.200:9092
  34. agentOne.sinks.sinkOne.requiredAcks=1
  35. agentOne.sinks.sinkOne.batchSize=20
  36. agentOne.sinks.sinkOne.channel=channelOne
  37. agentOne.sinks.sinkOne.channel=channelOne
  38. agentOne.sources.sourceOne.channels=channelOne

5、flume-ng启动flume-sql.conf和测试


  1. cd/usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ngagent--confconf--conf-fileconf/flume-sql.conf--nameagentOne-Dflume.root.logger=INFO,console

运行成功日志如下:

  1. 2017-07-0800:12:55,393(lifecycleSupervisor-1-1)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitoredcountergroupfortype:SINK,name:sinkOne:SuccessfullyregisterednewMBean.
  2. 2017-07-0800:12:55,394(lifecycleSupervisor-1-1)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Componenttype:SINK,name:sinkOnestarted
  3. 2017-07-0800:12:55,463(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Fetchingmetadatafrombrokerid:0,host:localhost,port:9092withcorrelationid0for1topic(s)Set(test)
  4. 2017-07-0800:12:55,528(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Connectedtolocalhost:9092forproducing
  5. 2017-07-0800:12:55,551(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Disconnectingfromlocalhost:9092
  6. 2017-07-0800:12:55,582(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Connectedtoslave2:9092forproducing

启动kafka的消费者,监听topic主题:

  1. kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest

运行成功日志如下:

  1. [root@masterkafka_2.11-0.9.0.0]#kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest
  2. "2017-07-0800:28:53.0"
  3. "2017-07-0800:29:03.0"
  4. "2017-07-0800:29:13.0"
  5. "2017-07-0800:29:23.0"
  6. "2017-07-0800:29:33.0"
  7. "2017-07-0800:29:43.0"
  8. "2017-07-0800:29:53.0"
  9. "2017-07-0800:30:03.0"

6、常见报错解决办法

  1. 2017-06-2716:26:01,293(C3P0PooledConnectionPoolManager[identityToken->1hgey889o1sjxqn51anc3fr|29938ba5]-AdminTaskTimer)[WARN-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector.run(ThreadPoolAsynchronousRunner.java:759)]com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@2d6227f3--APPARENTDEADLOCK!!!CompleteStatus:
连接超时,造成死锁,仔细检查jdbc:oracle:thin:@192.168.168.100:1521/orcl,用户名/密码是否正确;
如果正确,还是连接不上,请检查oralce数据库是否开启了防火墙,如果是,添加入站规则或直接关闭防火墙。
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume学习(六):如何使用event .. 下一篇日志传输工具-Flume实现原理及应用

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目